0x33【RabbitMQ】消费端自定义监听
1、介绍
- 我们一般就是在代码中编写while循环,调用
consumer.nextDelivery方法获取下一条信息,然后进行消费处理。 - 但是我们实际使用自定义的Consumer更加的方便,解耦性更加的强,也是实际工作中最常用的工作方式。
2、代码编写
自定义消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("===========自定义消费者===========");
System.err.println("consumerTag:"+consumerTag);
System.err.println("envelope:"+envelope);
System.err.println("properties:"+properties);
System.err.println("body:"+new String(body));
System.err.println();
}
}
消费端
public class MyConsu {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_consumer_exchange";
String exchangeType = "topic";
String queueName = "test_consumer_queue";
String routingKey = "consumer.*";
// 声明交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
// 声明队列
channel.queueDeclare(queueName,true,false,false,null);
// 建立绑定关系
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicConsume(queueName,true,new MyConsumer(channel));
}
}
生产端
public class MyProd {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey1 = "consumer.save";
// 发送
String msg = "RabbitMQ test consumer msg";
for(int i=0;i<5;i++) {
channel.basicPublish(exchangeName, routingKey1, null, (msg + "1").getBytes());
}
channel.close();
connection.close();
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果