RabbitMQ is a popular open source message queuing system developed in the erlang language. RabbitMQ is a standard implementation of AMQP (Advanced Message Queuing Protocol).
Add maven dependency
< dependency >
< groupId > org.springframework.boot </ groupId >
< artifactId > spring-boot-starter-amqp </ artifactId >
</ dependency >
Simple implementation
Configuration
Add the following configuration in application.properties
Spring.rabbitmq.addresses=127.0.0.1:5672
Spring.rabbitmq.username=guest
Spring.rabbitmq.password=guest
Spring.rabbitmq.publisher-confirms=true
Spring.rabbitmq.virtual-host=/
Rabbitmq port description: 5672-amqp, 25672-clustering, 61613-stomp, 1883-mqtt
News producer
Package com.rabbitmq.send;
Import org.springframework.amqp.rabbit.core.RabbitTemplate;
Import org.springframework.beans.factory.annotation.Autowired;
Import org.springframework.stereotype.Component;
@Component
Public class Sender {
@Autowired
Private RabbitTemplate rabbitTemplate;
Public void send(String msg) {
This .rabbitTemplate.convertAndSend( "foo" , msg);
}
}
Message listener
Package com.rabbitmq.listener;
Import org.slf4j.Logger;
Import org.slf4j.LoggerFactory;
Import org.springframework.amqp.core.Queue;
Import org.springframework.amqp.rabbit.annotation.RabbitHandler;
Import org.springframework.amqp.rabbit.annotation.RabbitListener;
Import org.springframework.context.annotation.Bean;
Import org.springframework.context.annotation.Configuration;
Import org.springframework.messaging.handler.annotation.Payload;
@Configuration
@RabbitListener (queues = "foo" )
Public class Listener {
Private static final Logger LOGGER = LoggerFactory.getLogger(Listener. class );
@Bean
Public Queue fooQueue() {
Return new Queue( "foo" );
}
@RabbitHandler
Public void process( @Payload String foo) {
LOGGER.info( "Listener: " + foo);
}
}
Test Controller
Package com.rabbitmq.controller;
Import com.rabbitmq.send.Sender;
Import org.springframework.beans.factory.annotation.Autowired;
Import org.springframework.web.bind.annotation.GetMapping;
Import org.springframework.web.bind.annotation.RestController;
Import javax.servlet.http.HttpServletRequest;
@RestController
Public class RabbitmqController {
@Autowired
Private Sender sender;
@GetMapping ( "/send" )
Public String send(HttpServletRequest request, String msg) {
Sender.send(msg);
Return "Send OK." ;
}
}
Test
Start the service, enter http://127.0.0.1:8080/send?msg=this%20is%20a%20test in the browser, click Enter to see the output in the console
INFO 5559 - [cTaskExecutor-1] c.rabbitmq.listener.Listener : Listener: this is a test
[SimpleAsyncTaskExecutor-1] INFO c.rabbitmq.listener.Listener – Listener: this is a test
Use with ConfirmCallback
Added callback processing, which no longer uses the default configuration of application.properties and will display the configuration information in the use file in the program.
Configuration
Package com.rabbitmq.config;
Import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
Import org.springframework.amqp.rabbit.connection.ConnectionFactory;
Import org.springframework.amqp.rabbit.core.RabbitTemplate;
Import org.springframework.beans.factory.annotation.Value;
Import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Import org.springframework.context.annotation.Bean;
Import org.springframework.context.annotation.Configuration;
Import org.springframework.context.annotation.Scope;
@Configuration
Public class AmqpConfig {
Public static final String FOO_EXCHANGE = "callback.exchange.foo" ;
Public static final String FOO_ROUTINGKEY = "callback.routingkey.foo" ;
Public static final String FOO_QUEUE = "callback.queue.foo" ;
@Value ( "${spring.rabbitmq.addresses}" )
Private String addresses;
@Value ( "${spring.rabbitmq.username}" )
Private String username;
@Value ( "${spring.rabbitmq.password}" )
Private String password;
@Value ( "${spring.rabbitmq.virtual-host}" )
Private String virtualHost;
@Value ( "${spring.rabbitmq.publisher-confirms}" )
Private boolean publisherConfirms;
@Bean
Public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** If you want to make a message callback, this must be set to true */
connectionFactory.setPublisherConfirms(publisherConfirms);
Return connectionFactory;
}
@Bean
/** Because you want to set the callback class, it should be the prototype type. If it is the singleton type, the callback class is set to the last time. */
@Scope (ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
Return template;
}
}
News producer
Package com.rabbitmq.send;
Import com.rabbitmq.config.AmqpConfig;
Import org.slf4j.Logger;
Import org.slf4j.LoggerFactory;
Import org.springframework.amqp.rabbit.core.RabbitTemplate;
Import org.springframework.amqp.rabbit.support.CorrelationData;
Import org.springframework.beans.factory.annotation.Autowired;
Import org.springframework.stereotype.Component;
Import java.util.UUID;
@Component
Public class Sender implements RabbitTemplate.ConfirmCallback {
Private static final Logger LOGGER = LoggerFactory.getLogger(Sender. class );
Private RabbitTemplate rabbitTemplate;
@Autowired
Public Sender(RabbitTemplate rabbitTemplate) {
This .rabbitTemplate = rabbitTemplate;
This .rabbitTemplate.setConfirmCallback( this );
}
Public void send(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
LOGGER.info( "send: " + correlationData.getId());
This .rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE, AmqpConfig.FOO_ROUTINGKEY, msg, correlationData);
}
/** Callback method */
@Override
Public void confirm(CorrelationData correlationData, boolean ack, String cause) {
LOGGER.info( "confirm: " + correlationData.getId());
}
}
Message listener
Package com.rabbitmq.listener;
Import com.rabbitmq.config.AmqpConfig;
Import org.slf4j.Logger;
Import org.slf4j.LoggerFactory;
Import org.springframework.amqp.core.Binding;
Import org.springframework.amqp.core.BindingBuilder;
Import org.springframework.amqp.core.DirectExchange;
Import org.springframework.amqp.core.Queue;
Import org.springframework.amqp.rabbit.annotation.RabbitHandler;
Import org.springframework.amqp.rabbit.annotation.RabbitListener;
Import org.springframework.context.annotation.Bean;
Import org.springframework.context.annotation.Configuration;
Import org.springframework.messaging.handler.annotation.Payload;
@Configuration
@RabbitListener (queues = AmqpConfig.FOO_QUEUE)
Public class Listener {
Private static final Logger LOGGER = LoggerFactory.getLogger(Listener. class );
/** Set switch type */
@Bean
Public DirectExchange defaultExchange() {
/**
* DirectExchange: Distribution to the specified queue according to the routingkey
* TopicExchange: Multiple Keyword Matching
* FanoutExchange: Distribution of messages to all binding queues, no concept of routingkey
* HeadersExchange : Matching by adding attribute key-value
*/
Return new DirectExchange(AmqpConfig.FOO_EXCHANGE);
}
@Bean
Public Queue fooQueue() {
Return new Queue(AmqpConfig.FOO_QUEUE);
}
@Bean
Public Binding binding() {
/** Bind the queue to the switch */
Return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);
}
@RabbitHandler
Public void process( @Payload String foo) {
LOGGER.info( "Listener: " + foo);
}
}
Or use the following code instead of the process method of the @RabbitHandler annotation
@Bean
Public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(fooQueue());
container.setExposeListenerChannel( true );
container.setMaxConcurrentConsumers( 1 );
container.setConcurrentConsumers( 1 );
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //Set confirm mode manual confirmation
container.setMessageListener( new ChannelAwareMessageListener() {
@Override
Public void onMessage(Message message, Channel channel) throws Exception {
Byte [] body = message.getBody();
LOGGER.info( "Listener onMessage : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); //Confirm that the message was successfully consumed
}
});
Return container;
}
USEFUL RESOURCES: