简介

  • Exchange:接收消息,并根据路由键转发消息到所绑定的队列

    8fKpa8.png

  • 交换机属性

    • Name:交换机名称
    • Type:交换机类型
      • direct:精确匹配routing key
      • topic:支持通配符模糊匹配routing key
      • fanout:忽略routing key,广播消息到所有绑定的队列
      • headers:使用消息头的多个属性进行匹配
    • Durability:是否需要持久化,true为持久化
    • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange;
    • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
      • 只有在使用erlang编程,使交换机内部关联时,设为true。所以一般不会使用,默认即可;
    • Arguments:扩展参数,用于扩展AMQP协议自制定化使用。

交换机类型

Director Exchange

简介

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

==注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。==

模型图

8fQAH0.png

理解

8h6Hl4.png

Topic Exchange

简介

  • 所有发送到Topic Exchange的消息被转发到所有关心==RouteKey==中指定Topic的Queue上

  • Exchange将==RouteKey==和某==Topic==进行模糊匹配,此时队列需要绑定一个Topic

  • Topic的汉译:主题;

  • ==注意:可以使用通配符进行匹配==

    847aSs.png

模型图

847rwT.png

理解

847cY4.png

  • RouteKey根据BindingKey的绑定规则来决定将消息发送到哪个队列中。
  • 这里的BindingKey就是绑定Queue时的RouteKey。这里这样写是为了便于理解。
  • 发送消息时的RouteKey和绑定队列时的RouteKey不是一个东西。(我是这样理解的:一个是特例(发)。一个是普遍(绑))

Fanout Exchange

简介

  • 不处理路由键,只需要简单的将队列绑定到交换机上。
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有对列上。
  • Fanout交换机转发消息是最快的。

模型图

84Ok5T.png

理解

将消息发到所有和该Exchange绑定的队列之中去。

代码

生产者

public class FanoutProduct {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_fanout_exchange";
        String routingKey = ""; // 不设置RouttingKey

        for(int i=0;i<5;i++){
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者

public class FanoutConsumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        // 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue1";
        String queueName2 = "test_fanout_queue2";
        String routingKey = ""; // 不设置RouttingKey

        channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);
        channel.queueBind(queueName2,exchangeName,routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:"+msg);
        }
    }
}

Header Exchange

简介

header类型的ExchangeRouting有点类似,不同的是,header类型的Exchange取消了Routing Key,使用key/value来匹配队列。

header的匹配有两种方式,一种是all另一种是any。这两种方式是在接收端必须要用键值"x-mactch"来定义。

  • all:代表必须匹配Consumer端的所有键值对
  • any:代表只要匹配 Consumer 端的任意一个键值对即可