1、什么是消费端限流?

  • 假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
    • 巨量信息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据!
    • 可能就会导致消费端服务崩溃等一系列问题。
  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置的Qos的值)未被确认前,不进行消费新的消息。
  • void BasicQos(uint prefetchSize,ushort prefetchCount,bool dlobal)
    • prefetchSize:0(消息大小限制,一般设置为0,不做限制);
    • prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多余N个消息,即一旦有N个消息还没有ACK,则consumer将block掉,直到有消息ack;
    • global:true/false设置级别
      • true:在Channel级别上去限制,但是一般一个Channel有许多消费者,不建议。
      • false:在Consumer级别上限制,一般选择该模式。
  • ==注意:==
    • prefetchSize和global这两项,rabbitmq没有实现,暂且不研究。prefetchCount在no_ack=false的情况下生效,即在自动应答的情况下这两个值是不生效的。

2、测试代码

自定义消费者

public class LimitConsumer extends DefaultConsumer {

    private Channel channel;

    public LimitConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("===========自定义消费者===========");
        System.err.println("consumerTag:"+consumerTag);
        System.err.println("envelope:"+envelope);
        System.err.println("properties:"+properties);
        System.err.println("body:"+new String(body));
        System.err.println();

        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

消费端

public class LimitConsu {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 声明
        String exchangeName = "test_qos_exchange";
        String exchangeType = "topic";
        String queueName = "test_qos_queue";
        String routingKey = "qos.*";

        // 声明交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        // 建立绑定关系
        channel.queueBind(queueName,exchangeName,routingKey);
        channel.basicQos(1);
        // 注意:关闭自动应答
        channel.basicConsume(queueName,false,new LimitConsumer(channel));

    }
}

生产端

public class LimitProd {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.235.24.110");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection =  connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingKey1 = "qos.save";

        // 发送
        String msg = "RabbitMQ test QOS  msg";
        for(int i=0;i<5;i++) {
            channel.basicPublish(exchangeName, routingKey1, null, (msg + "1").getBytes());
        }
        channel.close();
        connection.close();

    }
}