0x22【RabbitMQ】消费端ACK与重回队列
1、消费端的手工ACK和NACK
- 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
- 如果由于服务器宕机等严重问题,那我们就需要手工ACK保证消费端消费成功!
2、消费端的重回队列
- 消费端重回队列是为了解决没有处理成功的消息,把消息重新传递给Broker!
- 一般我们在实际应用中,都会关闭重回队列,也就是设置为False.
3、测试代码
生产端
public class AckProd {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey1 = "ack.save";
// 发送
String msg = "RabbitMQ test ACK msg";
for(int i=0;i<5;i++) {
Map<String,Object> headers = new HashMap<>();
headers.put("num",i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
channel.basicPublish(exchangeName, routingKey1, properties, (msg + i).getBytes());
}
channel.close();
connection.close();
}
}
消费端
public class AckConsu {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_ack_exchange";
String exchangeType = "topic";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
// 声明交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明队列
channel.queueDeclare(queueName,true,false,false,null);
// 建立绑定关系
channel.queueBind(queueName,exchangeName,routingKey);
// 注意这里,如果开启qos的话,重回队列时只会重回到perfetchCount条消息的后面,而不是整个消息的队尾
channel.basicQos(2);
// 注意:关闭自动应答
channel.basicConsume(queueName,false,new AckConsumer(channel));
}
}
自定义消费者
public class AckConsumer extends DefaultConsumer {
private Channel channel;
public AckConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("===========consumer message===============");
System.err.println("body:"+new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num")==0){
channel.basicNack(envelope.getDeliveryTag(),false,true);
}else{
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果