close

 

RabbitMQ is a popular open source message queuing system developed in the erlang language.  RabbitMQ is a standard implementation of AMQP (Advanced Message Queuing Protocol).

Add maven dependency

  1. dependency  >

  2.       groupId  >  org.springframework.boot  </  groupId  >

  3.       artifactId  >  spring-boot-starter-amqp  </  artifactId  >

  4. </  dependency  >

Simple implementation

Configuration

Add the following configuration in application.properties

  1. Spring.rabbitmq.addresses=127.0.0.1:5672

  2. Spring.rabbitmq.username=guest

  3. Spring.rabbitmq.password=guest

  4. Spring.rabbitmq.publisher-confirms=true

  5. Spring.rabbitmq.virtual-host=/

Rabbitmq port description: 5672-amqp, 25672-clustering, 61613-stomp, 1883-mqtt

News producer

  1. Package   com.rabbitmq.send;

  2.  

  3. Import   org.springframework.amqp.rabbit.core.RabbitTemplate;

  4. Import   org.springframework.beans.factory.annotation.Autowired;

  5. Import   org.springframework.stereotype.Component;

  6.  

  7. @Component

  8. Public   class   Sender {

  9.  

  10.     @Autowired

  11.     Private   RabbitTemplate rabbitTemplate;

  12.  

  13.     Public   void   send(String msg) {

  14.         This  .rabbitTemplate.convertAndSend(  "foo"  , msg);

  15.     }

  16. }

Message listener

  1. Package   com.rabbitmq.listener;

  2.  

  3. Import   org.slf4j.Logger;

  4. Import   org.slf4j.LoggerFactory;

  5. Import   org.springframework.amqp.core.Queue;

  6. Import   org.springframework.amqp.rabbit.annotation.RabbitHandler;

  7. Import   org.springframework.amqp.rabbit.annotation.RabbitListener;

  8. Import   org.springframework.context.annotation.Bean;

  9. Import   org.springframework.context.annotation.Configuration;

  10. Import   org.springframework.messaging.handler.annotation.Payload;

  11.  

  12. @Configuration

  13. @RabbitListener  (queues =   "foo"  )

  14. Public   class   Listener {

  15.  

  16.     Private   static   final   Logger LOGGER = LoggerFactory.getLogger(Listener.  class  );

  17.  

  18.     @Bean

  19.     Public   Queue fooQueue() {

  20.         Return   new   Queue(  "foo"  );

  21.     }

  22.  

  23.     @RabbitHandler

  24.     Public   void   process(  @Payload   String foo) {

  25.         LOGGER.info(  "Listener: "   + foo);

  26.     }

  27. }

Test Controller

  1. Package   com.rabbitmq.controller;

  2.  

  3. Import   com.rabbitmq.send.Sender;

  4. Import   org.springframework.beans.factory.annotation.Autowired;

  5. Import   org.springframework.web.bind.annotation.GetMapping;

  6. Import   org.springframework.web.bind.annotation.RestController;

  7.  

  8. Import   javax.servlet.http.HttpServletRequest;

  9.  

  10. @RestController

  11. Public   class   RabbitmqController {

  12.  

  13.     @Autowired

  14.     Private   Sender sender;

  15.  

  16.     @GetMapping  (  "/send"  )

  17.     Public   String send(HttpServletRequest request, String msg) {

  18.         Sender.send(msg);

  19.         Return   "Send OK."  ;

  20.     }

  21. }

Test

Start the service, enter http://127.0.0.1:8080/send?msg=this%20is%20a%20test in the browser, click Enter to see the output in the console

  1. INFO 5559 - [cTaskExecutor-1] c.rabbitmq.listener.Listener : Listener: this is a test

  2. [SimpleAsyncTaskExecutor-1] INFO c.rabbitmq.listener.Listener – Listener: this is a test

Use with ConfirmCallback

Added callback processing, which no longer uses the default configuration of application.properties and will display the configuration information in the use file in the program.

Configuration

  1. Package   com.rabbitmq.config;

  2.  

  3. Import   org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

  4. Import   org.springframework.amqp.rabbit.connection.ConnectionFactory;

  5. Import   org.springframework.amqp.rabbit.core.RabbitTemplate;

  6. Import   org.springframework.beans.factory.annotation.Value;

  7. Import   org.springframework.beans.factory.config.ConfigurableBeanFactory;

  8. Import   org.springframework.context.annotation.Bean;

  9. Import   org.springframework.context.annotation.Configuration;

  10. Import   org.springframework.context.annotation.Scope;

  11.  

  12. @Configuration

  13. Public   class   AmqpConfig {

  14.  

  15.     Public   static   final   String FOO_EXCHANGE =   "callback.exchange.foo"  ;

  16.     Public   static   final   String FOO_ROUTINGKEY =   "callback.routingkey.foo"  ;

  17.     Public   static   final   String FOO_QUEUE =   "callback.queue.foo"  ;

  18.  

  19.     @Value  (  "${spring.rabbitmq.addresses}"  )

  20.     Private   String addresses;

  21.     @Value  (  "${spring.rabbitmq.username}"  )

  22.     Private   String username;

  23.     @Value  (  "${spring.rabbitmq.password}"  )

  24.     Private   String password;

  25.     @Value  (  "${spring.rabbitmq.virtual-host}"  )

  26.     Private   String virtualHost;

  27.     @Value  (  "${spring.rabbitmq.publisher-confirms}"  )

  28.     Private   boolean   publisherConfirms;

  29.  

  30.     @Bean

  31.     Public   ConnectionFactory connectionFactory() {

  32.         CachingConnectionFactory connectionFactory =   new   CachingConnectionFactory();

  33.         connectionFactory.setAddresses(addresses);

  34.         connectionFactory.setUsername(username);

  35.         connectionFactory.setPassword(password);

  36.         connectionFactory.setVirtualHost(virtualHost);

  37.         /** If you want to make a message callback, this must be set to true */

  38.         connectionFactory.setPublisherConfirms(publisherConfirms);

  39.         Return   connectionFactory;

  40.     }

  41.  

  42.     @Bean

  43.     /** Because you want to set the callback class, it should be the prototype type. If it is the singleton type, the callback class is set to the last time. */

  44.     @Scope  (ConfigurableBeanFactory.SCOPE_PROTOTYPE)

  45.     Public   RabbitTemplate rabbitTemplate() {

  46.         RabbitTemplate template =   new   RabbitTemplate(connectionFactory());

  47.         Return   template;

  48.     }

  49.  

  50. }

News producer

  1. Package   com.rabbitmq.send;

  2.  

  3. Import   com.rabbitmq.config.AmqpConfig;

  4. Import   org.slf4j.Logger;

  5. Import   org.slf4j.LoggerFactory;

  6. Import   org.springframework.amqp.rabbit.core.RabbitTemplate;

  7. Import   org.springframework.amqp.rabbit.support.CorrelationData;

  8. Import   org.springframework.beans.factory.annotation.Autowired;

  9. Import   org.springframework.stereotype.Component;

  10.  

  11. Import   java.util.UUID;

  12.  

  13. @Component

  14. Public   class   Sender   implements   RabbitTemplate.ConfirmCallback {

  15.  

  16.     Private   static   final   Logger LOGGER = LoggerFactory.getLogger(Sender.  class  );

  17.  

  18.     Private   RabbitTemplate rabbitTemplate;

  19.  

  20.     @Autowired

  21.     Public   Sender(RabbitTemplate rabbitTemplate) {

  22.         This  .rabbitTemplate = rabbitTemplate;

  23.         This  .rabbitTemplate.setConfirmCallback(  this  );

  24.     }

  25.  

  26.     Public   void   send(String msg) {

  27.         CorrelationData correlationData =   new   CorrelationData(UUID.randomUUID().toString());

  28.         LOGGER.info(  "send: "   + correlationData.getId());

  29.         This  .rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE, AmqpConfig.FOO_ROUTINGKEY, msg, correlationData);

  30.     }

  31.  

  32.     /** Callback method */

  33.     @Override

  34.     Public   void   confirm(CorrelationData correlationData,   boolean   ack, String cause) {

  35.         LOGGER.info(  "confirm: "   + correlationData.getId());

  36.     }

  37. }

Message listener

  1. Package   com.rabbitmq.listener;

  2.  

  3. Import   com.rabbitmq.config.AmqpConfig;

  4. Import   org.slf4j.Logger;

  5. Import   org.slf4j.LoggerFactory;

  6. Import   org.springframework.amqp.core.Binding;

  7. Import   org.springframework.amqp.core.BindingBuilder;

  8. Import   org.springframework.amqp.core.DirectExchange;

  9. Import   org.springframework.amqp.core.Queue;

  10. Import   org.springframework.amqp.rabbit.annotation.RabbitHandler;

  11. Import   org.springframework.amqp.rabbit.annotation.RabbitListener;

  12. Import   org.springframework.context.annotation.Bean;

  13. Import   org.springframework.context.annotation.Configuration;

  14. Import   org.springframework.messaging.handler.annotation.Payload;

  15.  

  16. @Configuration

  17. @RabbitListener  (queues = AmqpConfig.FOO_QUEUE)

  18. Public   class   Listener {

  19.  

  20.     Private   static   final   Logger LOGGER = LoggerFactory.getLogger(Listener.  class  );

  21.  

  22.     /** Set switch type */

  23.     @Bean

  24.     Public   DirectExchange defaultExchange() {

  25.         /**

  26.          * DirectExchange: Distribution to the specified queue according to the routingkey

  27.          * TopicExchange: Multiple Keyword Matching

  28.          * FanoutExchange: Distribution of messages to all binding queues, no concept of routingkey

  29.          * HeadersExchange : Matching by adding attribute key-value

  30.          */

  31.         Return   new   DirectExchange(AmqpConfig.FOO_EXCHANGE);

  32.     }

  33.  

  34.     @Bean

  35.     Public   Queue fooQueue() {

  36.         Return   new   Queue(AmqpConfig.FOO_QUEUE);

  37.     }

  38.  

  39.     @Bean

  40.     Public   Binding binding() {

  41.         /** Bind the queue to the switch */

  42.         Return   BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);

  43.     }

  44.  

  45.     @RabbitHandler

  46.     Public   void   process(  @Payload   String foo) {

  47.         LOGGER.info(  "Listener: "   + foo);

  48.     }

  49. }

Or use the following code instead of the process method of the @RabbitHandler annotation

  1. @Bean

  2. Public   SimpleMessageListenerContainer messageContainer() {

  3.     SimpleMessageListenerContainer container =   new   SimpleMessageListenerContainer(connectionFactory());

  4.     container.setQueues(fooQueue());

  5.     container.setExposeListenerChannel(  true  );

  6.     container.setMaxConcurrentConsumers(  1  );

  7.     container.setConcurrentConsumers(  1  );

  8.     container.setAcknowledgeMode(AcknowledgeMode.MANUAL);   //Set confirm mode manual confirmation

  9.     container.setMessageListener(  new   ChannelAwareMessageListener() {

  10.  

  11.         @Override

  12.         Public   void   onMessage(Message message, Channel channel)   throws   Exception {

  13.             Byte  [] body = message.getBody();

  14.             LOGGER.info(  "Listener onMessage : "   +   new   String(body));

  15.             channel.basicAck(message.getMessageProperties().getDeliveryTag(),   false  );   //Confirm that the message was successfully consumed

  16.         }

  17.     });

  18.     Return   container;

  19. }

 

USEFUL RESOURCES:

 

Spring boot rabbitmq example

 

Spring boot official

 

 

 

arrow
arrow
    文章標籤
    Springboot Rabbitmq Spring
    全站熱搜

    saravanagumar 發表在 痞客邦 留言(0) 人氣()