1、消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
  • 如果由于服务器宕机等严重问题,那我们就需要手工ACK保证消费端消费成功!

2、消费端的重回队列

  • 消费端重回队列是为了解决没有处理成功的消息,把消息重新传递给Broker!
  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为False.

3、测试代码

生产端

public class AckProd {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey1 = "ack.save";

        // 发送
        String msg = "RabbitMQ test ACK msg";
        for(int i=0;i<5;i++) {
            Map<String,Object> headers = new HashMap<>();
            headers.put("num",i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            channel.basicPublish(exchangeName, routingKey1, properties, (msg + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费端

public class AckConsu {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        // 声明
        String exchangeName = "test_ack_exchange";
        String exchangeType = "topic";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        // 声明交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        // 建立绑定关系
        channel.queueBind(queueName,exchangeName,routingKey);
        // 注意这里,如果开启qos的话,重回队列时只会重回到perfetchCount条消息的后面,而不是整个消息的队尾
        channel.basicQos(2);
        // 注意:关闭自动应答
        channel.basicConsume(queueName,false,new AckConsumer(channel));
    }
}

自定义消费者

public class AckConsumer extends DefaultConsumer {

    private Channel channel;

    public AckConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("===========consumer message===============");
        System.err.println("body:"+new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num")==0){
            channel.basicNack(envelope.getDeliveryTag(),false,true);
        }else{
            channel.basicAck(envelope.getDeliveryTag(),false);
        }
    }
}