0x08【RabbitMQ】Confirm 确认消息
理解Confirm消息确认机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
- 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。
确认机制流程图

Confirm确认消息实现
- 第一步:在channel上开启确认模式:
channel.confirmSelect() - 第二步:在channel上添加监听:
addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送或者记录日志等后续处理。
代码编写
生产者:
public class ConfirmProduct {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 指定消息投递模式:消息确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "00ave.ss";
String msg = "hello RabbitMQ Confirm message";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
// 添加一个 确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------------ack----------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------------nack----------------");
}
});
// 不关闭channel和connection连接,关闭了就没法监听响应了
}
}
消费者
public class DirectorConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_director_exchange";
String exchangeType = "direct";
String routingKey = "test.director";
String queueName = "test_director_queue";
// 表示声明了一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 表示声明了一个队列
channel.queueDeclare(queueName,true,false,false,null);
// 建立一个绑定关系
channel.queueBind(queueName,exchangeName,routingKey);
//
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称,是否自动ACK,Consumer
channel.basicConsume(queueName,true,consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:"+msg);
}
}
}
- 声明队列和交换机的代码放在生产者和消费者中都可以,它会自行判断,如果已经存在了就不会再次创建了,不过如果之前声明的和最新声明的配置有变化,会报错,注意这一点。
- ==Confirm+可靠性投递进行结合就是一个解决方案==
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果