死信队列

DLX : Dead-Letter-Exchange 死信交换机

  • 利用DLX,当信息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;
  • 一个消息没有被任何消费者消费,就会被传到死信队列里;
  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何指定队列上被指定,实际上就是设置某个队列的属性;
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

消息变成的死信的情况

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false;
  • 消息TTL过期;
  • 队列达到最大长度;

死信队列的设置

  1. 首先要设置死信队列的exchange和queue,然后进行绑定:
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey: #
  2. 然后我们进行正常的声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
  3. 这样消息在过期,requeue=false、队列达到最大长度时,消息就可以直接通过死信交换机路由到死信队列。

测试代码

生产端

public class DLXProd {  
	public static void main(String[] args) throws Exception {  
		ConnectionFactory connectionFactory = new ConnectionFactory();  
		connectionFactory.setHost(DemoUseConstant.RABBIT_MQ_IP);  
		connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT);  
		connectionFactory.setVirtualHost("/");  
		  
		Connection connection = connectionFactory.newConnection();  
		Channel channel = connection.createChannel();  
		  
		String exchangeName = "order_exchange";  
		String routingKey = "order.save";  
		  
		String msg = "Hello RabbitMQ DLX Message";  
		  
		// 向订单队列中发送消息  
		for (int i = 0; i < 5; i++) {  
			channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());  
			Thread.sleep(1000L * RandomUtil.randomInt(5, 10));  
		}  
		  
		channel.close();  
		connection.close();  
	}  
}

消费端

public class DLXCons {  
	public static void main(String[] args) throws Exception {  
		ConnectionFactory connectionFactory = new ConnectionFactory();  
		connectionFactory.setHost(DemoUseConstant.RABBIT_MQ_IP);  
		connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT);  
		connectionFactory.setVirtualHost("/");  
		  
		Connection connection = connectionFactory.newConnection();  
		Channel channel = connection.createChannel();  
		  
		String exchangeName = "order_exchange";  
		String routingKey = "order.#";  
		String queueName = "order_queue";  
		  
		// 声明正常交换机(订单交换机)  
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);  
		Map<String, Object> arguments = new HashMap<>();  
		// 将丢弃的消息发送到死信交换机中  
		arguments.put("x-dead-letter-exchange", "dlx.exchange");  
		// 设置消息的过期时间  
		arguments.put("x-message-ttl", 10000);  
		// 声明正常队列  
		channel.queueDeclare(queueName, true, false, false, arguments);  
		// 绑定正常队列和交换机  
		channel.queueBind(queueName, exchangeName, routingKey);  
		  
		  
		// 声明死信交换机  
		String dlxExchangeName = "dlx.exchange";  
		channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);  
		// 声明死信队列  
		String orderDlxQueue = "order_dlx_queue";  
		channel.queueDeclare(orderDlxQueue, true, false, false, null);  
		// 绑定死信队列和死信交换机  
		channel.queueBind(orderDlxQueue, dlxExchangeName, "order.#");  
		// 消费死信队列  
		channel.basicConsume(orderDlxQueue, new DefaultConsumer(channel) {  
			@Override  
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
				System.out.println("-------------------死信队列消费时间:" + new Date() + "----------------------");  
				System.out.println("consumerTag:" + consumerTag);  
				System.out.println("死信队列内容body:" + new String(body));  
				channel.basicAck(envelope.getDeliveryTag(), false);  
			}  
		});  
	}  
}