博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ实例教程:发布/订阅者消息队列
阅读量:6717 次
发布时间:2019-06-25

本文共 4325 字,大约阅读时间需要 14 分钟。

消息交换机(Exchange)

  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。

  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。

1
channel.exchangeDeclare(
"logs"
"fanout"
);

  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。

  交换机命名

  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。

1
2
channel.basicPublish(
""
"hello"
, null, message.getBytes()); 
//
空字符串交换机
channel.basicPublish( 
"logs"
""
, null, message.getBytes()); 
//logs
交换机

  临时队列(Temporary Queues)

  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。

  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:

  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。

  其次,当消息消费者失去连接时,队列应该自动删除。

  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。

1
String queueName = channel.queueDeclare().getQueue();

  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。

  消息绑定(Bindings)

  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName, 
"logs"
""
);

  将交换机和消息绑定放在一起


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。

  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package 
com.favccxx.favrabbit;
import 
com.rabbitmq.client.Channel;
import 
com.rabbitmq.client.Connection;
import 
com.rabbitmq.client.ConnectionFactory;
public 
class 
EmitLog {
 
private 
static 
final 
String EXCHANGE_NAME = 
"logs"
;
 
public 
static 
void 
main(String[] argv) 
throws 
Exception {
  
ConnectionFactory factory = 
new 
ConnectionFactory();
  
factory.setHost(
"localhost"
);
  
Connection connection = factory.newConnection();
  
Channel channel = connection.createChannel();
  
channel.exchangeDeclare(EXCHANGE_NAME, 
"fanout"
);
  
String[] sendMsgs = {
"I"
"saw"
"a"
"dog"
};
  
String message = getMessage(sendMsgs);
  
channel.basicPublish(EXCHANGE_NAME, 
""
null
, message.getBytes(
"UTF-8"
));
  
System.out.println(
" [x] Sent '" 
+ message + 
"'"
);
  
channel.close();
  
connection.close();
 
}
 
private 
static 
String getMessage(String[] strings) {
  
if 
(strings.length < 
1
)
   
return 
"info: Hello World!"
;
  
return 
joinStrings(strings, 
" "
);
 
}
 
private 
static 
String joinStrings(String[] strings, String delimiter) {
  
int 
length = strings.length;
  
if 
(length == 
0
)
   
return 
""
;
  
StringBuilder words = 
new 
StringBuilder(strings[
0
]);
  
for 
(
int 
i = 
1
; i < length; i++) {
   
words.append(delimiter).append(strings[i]);
  
}
  
return 
words.toString();
 
}
}

  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。

  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package 
com.favccxx.favrabbit;
import 
java.io.IOException;
import 
com.rabbitmq.client.AMQP;
import 
com.rabbitmq.client.Channel;
import 
com.rabbitmq.client.Connection;
import 
com.rabbitmq.client.ConnectionFactory;
import 
com.rabbitmq.client.Consumer;
import 
com.rabbitmq.client.DefaultConsumer;
import 
com.rabbitmq.client.Envelope;
public 
class 
ReceiveLogs {
 
private 
static 
final 
String EXCHANGE_NAME = 
"logs"
;
 
public 
static 
void 
main(String[] argv) 
throws 
Exception {
  
ConnectionFactory factory = 
new 
ConnectionFactory();
  
factory.setHost(
"localhost"
);
  
Connection connection = factory.newConnection();
  
Channel channel = connection.createChannel();
  
channel.exchangeDeclare(EXCHANGE_NAME, 
"fanout"
);
  
String queueName = channel.queueDeclare().getQueue();
  
channel.queueBind(queueName, EXCHANGE_NAME, 
""
);
  
System.out.println(
" [*] Waiting for messages. To exit press CTRL+C"
);
  
Consumer consumer = 
new 
DefaultConsumer(channel) {
   
@Override
   
public 
void 
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
     
byte
[] body) 
throws 
IOException {
    
String message = 
new 
String(body, 
"UTF-8"
);
    
System.out.println(
" [x] Received '" 
+ message + 
"'"
);
   
}
  
};
  
channel.basicConsume(queueName, 
true
, consumer);
 
}
}

  测试数据

  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。

1
 
[x] Received 
'I saw a dog'
本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者
你可能感兴趣的文章
设置input标签placeholder字体颜色
查看>>
跳出面向对象思想(一) 继承
查看>>
01 聚类算法 - 大纲
查看>>
为什么说“上云就上阿里云”
查看>>
tomcat配置文件详解
查看>>
iOS NSURLSession DownloadTask(下载任务)
查看>>
vue解决字段类型为数字导致单选不正确的问题
查看>>
Prometheus 2.0正式推出 性能提升带来质的飞跃
查看>>
WPF实现抽屉效果
查看>>
http2-浏览器支持的情况
查看>>
去除百度置顶的广告,优化百度搜索
查看>>
设计模式(六)适配器模式
查看>>
GTK+重拾--04 菜单栏使用
查看>>
设计模式(十七) 迭代器模式
查看>>
线性回归 极大似然 参差满足高斯分布
查看>>
持续集成之测试自动化
查看>>
多字符串拼接
查看>>
后台登录——实验吧
查看>>
表格存储如何在控制台使用多元索引(SearchIndex)功能
查看>>
Java并发编程艺术----读书笔记(一)
查看>>