• RabbitAdmin
  • SpringAMQP 声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

RabbitAdmin

  • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可

    8XZ4Te.png

  • 注意:==autoStartup必须要设置为true==,否则Spring容器不会加载RabbitAdmin类。

  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明;

  • 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作。

    • 例如:添加一个交换机,删除一个绑定,清空一个队列里的消息等等;

测试代码

新建Spring项目

配置pom

<!--可以不配置这个,spring-boot-starter-amqp中有,配的话注意版本问题应当和当前的Spring版本相适配-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQConfig

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

测试

@Test
    public void RabbitAdminTest() {
        rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic",true,false));
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout",true,false));
        rabbitAdmin.declareQueue(new Queue("test.direct.queue",true,false,false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue",true,false,false));
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue",true));

        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct","direct",new HashMap<>()));

        rabbitAdmin.declareBinding(BindingBuilder
                .bind(new Queue("test.topic.queue",true,false,false))
                .to(new TopicExchange("test.topic",true,false))
                .with("user.#"));

        // FanoutExchange中的with没有了
        rabbitAdmin.declareBinding(BindingBuilder
                .bind(new Queue("test.fanout.queue",true,false,false))
                .to(new FanoutExchange("test.fanout",true,false)));

        // 清空队列
        rabbitAdmin.purgeQueue("test.direct.queue",false);
    }

8X38BR.png

SpringAMQP

使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

在RabbitMQConfig中添加:

/**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     *  FanoutExchange:将消息分发到所有的绑定队列,无routingkey概念
     *  HeadersExchange:通过添加属性key-value匹配
     *  DirectExchange:按照routingKey分发到指定队列
     *  TopicExchange:多关键字匹配
     *
     */
    @Bean
    public TopicExchange exchange001(){
        return new TopicExchange("topic001",true,false);
    }

    @Bean
    public Queue queue001(){
        return new Queue("queue001",true,false,false);
    }

    @Bean
    public Binding binding001(){
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }


    @Bean
    public TopicExchange exchange002(){
        return new TopicExchange("topic002",true,false);
    }

    @Bean
    public Queue queue002(){
        return new Queue("queue002",true,false,false);
    }

    @Bean
    public Binding binding002(){
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
    }


    @Bean
    public TopicExchange exchange003(){
        return new TopicExchange("topic003",true,false);
    }

    @Bean
    public Queue queue003(){
        return new Queue("queue003",true,false,false);
    }

    @Bean
    public Binding binding003(){
        return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*");
    }

    @Bean
    public Queue queue_image(){
        return new Queue("image_queue",true);
    }

    @Bean
    public Queue queue_pdf(){
        return new Queue("pdf_queue",true);
    }

消息模板-RabbitTemplate

RabbitTemplate:即消息模板

  • 我们在与SpringAMQP整合的时候进行发送消息的关键类;
  • 该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口==ConfirmCallback==、返回值确认接口==ReturnCallback==等等。同样我们需要进行注入到Spring容器中,然后直接使用。
  • 在与Spring整合时需要实例化,但在与SpringBoot整合时在配置文件里添加配置即可;

RabbitTemplate测试代码

RabbitMQConfig文件

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    return rabbitTemplate;
}

测试文件

	@Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void testsendMessage1(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","消息描述");
        messageProperties.getHeaders().put("type","自定义消息类型");
        Message message = new Message("Hello RabbitMQ".getBytes(),messageProperties);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 发送消息时对原来的消息进行进一步的加工,加工之后在发送
                System.err.println("=========添加额外的配置=========");
                message.getMessageProperties().getHeaders().put("desc","额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("atter","额外新加的属性");

                return message;
            }
        });
    }

    @Test
    public void testsendMessage2(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234".getBytes(),messageProperties);

        rabbitTemplate.send("topic001","spring.abc",message);

        rabbitTemplate.convertAndSend("topic002","rabbit.save","hello object message send!");
        rabbitTemplate.convertAndSend("topic003","mq.abc","hello object message send!");
    }