0x31【RabbitMQ】消息生产和消费
1、简介
- ConnectionFactory:获取连接工厂
- Connection:一个连接
- Channel:数据通信信道,可发送和接受消息
- Queue:具体的消息存储队列
- Producer & Consumer:生产者和消费者
2、示例代码
生产者
public class Producter {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个ConnectionFactory,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 使用连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test0001";
channel.queueDeclare(queueName,true,false,false,null);
//4 通过Channel发送数据
for(int i=0;i<5;i++){
channel.basicPublish("",queueName,null,"Hello world".getBytes());
}
//5 记得关闭相关连接
channel.close();
connection.close();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.235.24.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 使用连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test0001";
// channel.queueDeclare(queueName,true,false,false,null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
channel.basicConsume(queueName,true,queueingConsumer);
while(true){
//7 获取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者:"+msg);
}
}
}
3、说明
生产者发消息时必须指定exchange,如果不指定或者exchange为空的话,会走第一个exchange–>(AMQP default),不指定路由规则,他就会按照你的Routing Key怎么写的去 Queue中去找,如果能找到完全匹配的才能路由过去,如果找不到,就路由不过去;
-
AMQP default的说明:

队列的独占参数是为了顺序消费
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果