在上一篇文章中,先容了利用RabbitMQ
的Hello World例子, 以及如何建设一个work queue
。在work queue的例子中每条动静都只会被通报到一个work queue中。 在这篇文章中我们将会进修另一种完全差异的通报动静的方法——每条动静将会被通报给所有的consumer,这种模式一般被称为“宣布/订阅”。
宣布/订阅(Publish/Subscribe)
为了说明这种模式,我们将建设一个简朴的log系统,它由两部门构成——第一部门认真发送log动静,第二部门认真吸收而且将动静打印出来。 在我们的log系统中每个运行着的吸收措施城市吸收到动静,在这种方法下我们可以有一个consumer认真将log耐久化到磁盘, 同时由另一个consumer来将log打印到节制台。本质上,发送log动静是对所有动静吸收者的广播。
Exchange
在之前的部门我们都是通过queue来发送和吸收动静,此刻是时候来先容RabbitMQ完整的动静模子了。先让我们来快速地回首一下之前先容过的几个观念:
producer
是用户应用认真发送动静queue
是存储动静的缓冲(buffer)consumer
是用户应用认真吸收动静RabbitMQ的动静模子的焦点思想是producer永远不会直接发送任何动静到queue中,实际上,在许多环境下producer基础不知道一条动静是否被发送到了哪个queue中。
在RabbitMQ中,producer只能将动静发送到一个exchange
中。要领略exchange也很是简朴,它一边认真吸收producer发送的动静, 另一边将动静推送到queue中。exchange必需清楚的知道在收到动静之后该如何举办下一步的处理惩罚,好比是否应该将这条动静发送到某个queue中? 照旧应该发送到多个queue中?照旧应该直接扬弃这条动静等等。用官方文档上的一张图可以更清楚地相识RabbitMQ的动静模子。
RabbitMQ中的exchange范例有这么几种:direct
,topic
,劳务派遣管理系统,headers
以及fanout
。这一小节将会主要先容最后一种范例——fanout
。 利用RabbitMQ的client来建设一个fanout
范例的exchange,呼吁为logs
:
channel.exchangeDeclare("logs","fanout");
fanout范例的exchange很是简朴,从名字也可以揣摩出来,它会向所有的queue广播所有收到的动静。这正是我们的log系统需要的。
在之前的部门我们对exchange一无所知,可是我们仍然可以将动静发送到queue中,这是因为我们利用了默认的exchange,图纸加密,在代码中利用空字符串(“”)暗示。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数暗示exchange的名字,利用空字符串暗示利用默认的无名的exchange:假如有的话,动静将按照routingKey
被发送到指定的queue中。
此刻,可以将动静发送到之前已经声明过的exchange中
channel.basicPublish( "logs", "", null, message.getBytes());
姑且行列
在之前的小节中利用queue都是指定了名字的(hello和task_queue),给queue定名长短常重要的,因为我们需要将的workers指定到沟通的queue上, 而且在consumer与producer之间也需要指定沟通的queue。
可是这对我们的log系统来说不是必需的,我们需要监听所有的log动静,而不是个中的一部门。我们也只体贴此刻的动静而不存眷以前的动静, 为了办理这个问题我们需要做两件工作。
首先,无论何时毗连到RabbitMQ server上都需要一个新的、空的queue。为了做到这一点需要可以或许利用一个随机的名字来建设queue, 更好的方法是由server来为我们选择一个随机的名字。
其次,一旦我们与consumer断开毗连,queue应该被自动删除。
在Java client中,提供了一个无参数的queueDeclare()
方来来建设一个非耐久化的、独占的而且是自动删除的已定名的queue。
String queueName = channel.queueDeclare().getQueue();
queueName
会包括一个随机的queue名字,大概看起来雷同amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
绑定
我们已经建设了一个fanout范例的exchange和一个queue。此刻我们需要汇报exchange将动静发送到我们的queue中。 这种exchange和queue的干系称为绑定(binding
)。
channel.queueBind(queueName, "logs", "");
之后logs exchange将会把动静发送到我们的queue中。