0x10【RabbitMQ】死信队列
死信队列
DLX : Dead-Letter-Exchange 死信交换机
- 利用DLX,当信息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;
- 一个消息没有被任何消费者消费,就会被传到死信队列里;
- DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何指定队列上被指定,实际上就是设置某个队列的属性;
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。
消息变成的死信的情况
- 消息被拒绝(basic.reject/basic.nack)并且requeue=false;
- 消息TTL过期;
- 队列达到最大长度;
死信队列的设置
- 首先要设置死信队列的exchange和queue,然后进行绑定:
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey: #
- 然后我们进行正常的声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:
arguments.put("x-dead-letter-exchange","dlx.exchange"); - 这样消息在过期,requeue=false、队列达到最大长度时,消息就可以直接通过死信交换机路由到死信队列。
测试代码
生产端
public class DLXProd {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(DemoUseConstant.RABBIT_MQ_IP);
connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "order_exchange";
String routingKey = "order.save";
String msg = "Hello RabbitMQ DLX Message";
// 向订单队列中发送消息
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Thread.sleep(1000L * RandomUtil.randomInt(5, 10));
}
channel.close();
connection.close();
}
}
消费端
public class DLXCons {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(DemoUseConstant.RABBIT_MQ_IP);
connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "order_exchange";
String routingKey = "order.#";
String queueName = "order_queue";
// 声明正常交换机(订单交换机)
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> arguments = new HashMap<>();
// 将丢弃的消息发送到死信交换机中
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// 设置消息的过期时间
arguments.put("x-message-ttl", 10000);
// 声明正常队列
channel.queueDeclare(queueName, true, false, false, arguments);
// 绑定正常队列和交换机
channel.queueBind(queueName, exchangeName, routingKey);
// 声明死信交换机
String dlxExchangeName = "dlx.exchange";
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
// 声明死信队列
String orderDlxQueue = "order_dlx_queue";
channel.queueDeclare(orderDlxQueue, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(orderDlxQueue, dlxExchangeName, "order.#");
// 消费死信队列
channel.basicConsume(orderDlxQueue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------------死信队列消费时间:" + new Date() + "----------------------");
System.out.println("consumerTag:" + consumerTag);
System.out.println("死信队列内容body:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果