理解Confirm消息确认机制

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

确认机制流程图

8H7d7n.png

Confirm确认消息实现

  • 第一步:在channel上开启确认模式:channel.confirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送或者记录日志等后续处理。

代码编写

生产者

public class ConfirmProduct {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        // 指定消息投递模式:消息确认模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "00ave.ss";

        String msg = "hello RabbitMQ Confirm message";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());

        // 添加一个 确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------------ack----------------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------------nack----------------");
            }
        });

        // 不关闭channel和connection连接,关闭了就没法监听响应了
    }
}

消费者

public class DirectorConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        // 声明
        String exchangeName = "test_director_exchange";
        String exchangeType = "direct";
        String routingKey = "test.director";
        String queueName = "test_director_queue";

        // 表示声明了一个交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 表示声明了一个队列
        channel.queueDeclare(queueName,true,false,false,null);
        // 建立一个绑定关系
        channel.queueBind(queueName,exchangeName,routingKey);

        //
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 参数:队列名称,是否自动ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        // 循环获取消息
        while(true){
            // 获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:"+msg);
        }
    }
}
  • 声明队列和交换机的代码放在生产者和消费者中都可以,它会自行判断,如果已经存在了就不会再次创建了,不过如果之前声明的和最新声明的配置有变化,会报错,注意这一点
  • ==Confirm+可靠性投递进行结合就是一个解决方案==