0x23【RabbitMQ】消费端限流
1、什么是消费端限流?
- 假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
- 巨量信息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据!
- 可能就会导致消费端服务崩溃等一系列问题。
- RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置的Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize,ushort prefetchCount,bool dlobal)- prefetchSize:0(消息大小限制,一般设置为0,不做限制);
- prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多余N个消息,即一旦有N个消息还没有ACK,则consumer将block掉,直到有消息ack;
- global:true/false设置级别
- true:在Channel级别上去限制,但是一般一个Channel有许多消费者,不建议。
- false:在Consumer级别上限制,一般选择该模式。
- ==注意:==
- prefetchSize和global这两项,rabbitmq没有实现,暂且不研究。prefetchCount在no_ack=false的情况下生效,即在自动应答的情况下这两个值是不生效的。
2、测试代码
自定义消费者
public class LimitConsumer extends DefaultConsumer {
private Channel channel;
public LimitConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("===========自定义消费者===========");
System.err.println("consumerTag:"+consumerTag);
System.err.println("envelope:"+envelope);
System.err.println("properties:"+properties);
System.err.println("body:"+new String(body));
System.err.println();
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
消费端
public class LimitConsu {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_qos_exchange";
String exchangeType = "topic";
String queueName = "test_qos_queue";
String routingKey = "qos.*";
// 声明交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明队列
channel.queueDeclare(queueName,true,false,false,null);
// 建立绑定关系
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicQos(1);
// 注意:关闭自动应答
channel.basicConsume(queueName,false,new LimitConsumer(channel));
}
}
生产端
public class LimitProd {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String routingKey1 = "qos.save";
// 发送
String msg = "RabbitMQ test QOS msg";
for(int i=0;i<5;i++) {
channel.basicPublish(exchangeName, routingKey1, null, (msg + "1").getBytes());
}
channel.close();
connection.close();
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果