在上一小节中我们改造了log系统,由于利用fanout
范例的exchange只能举办全局的广播,因此我们利用direct
范例的exchange做了取代, 使得我们可以选择性的吸收动静。尽量利用fanout exchange改造了log系统,但它仍然有限制——不能基于多个条件做路由。
Topics
在log系统中大概不可是基于差异的日志级别作订阅,软件开发,也大概会基于日志的来历。你也许听过Unix下名为syslog
的东西, 它把日志凭据严重级别(info/warn/crit…)和设备(auth/cron/ker…)举办路由。
这会给我们很多的机动性,也许我们只想监听’cron’中的’critical’级此外错误日志,以及所有’kern’中的日志。 为了实现这种日志系统,我们需要进修一个更巨大的topic
范例的exchange。
Topic exchange
发送到topic exchange中的动静不能有一个任意的routing_key
——它必需是一个利用点脱离的单词列表。单词可以是任意的, 可是凡是会指定动静的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的长度限制为255个字节数。
binding key也必需是沟通的形式。topic exchange背后的逻辑雷同于direct——一条利用特定的routing key发送的动静将会被通报至所有利用与该routing key沟通的binding key举办绑定的行列中。 然而,对binding key来说有两种非凡的环境:
利用一张图可以很简朴地来说明:
在图中,我们将要发送被描写的动物的动静。动静的routing key将由三个单词构成(通过两个点脱离)。routing key中的第一个单词将描写速度, 第二个是颜色,第三个是物种:"<speed>.<colour>.<species>"
。
我们建设三个绑定:Q1利用binding key"*.orange.*"
来绑定,Q2利用"*.*.rabbit"
以及lazy.#
绑定。
这些绑定可以被总结为:
一条利用routing key"quick.orange.rabbit"
发送的动静将被同时通报到两个行列中。动静"lazy.orange.elephant"
同样如此。 另一方面,"quick.orange.fox"
只会被第一个queue吸收,"lazy.brown.fox"
只会被第二个queue吸收。 "lazy.pink.rabbit"
只会被通报到Q2一次,纵然它对两个binding key都匹配。"quick.brown.fox"
与两个queue的binding key都不匹配, 因此将被扬弃。
假如冲破我们的约定,利用一个单词可能四个单词的routing key譬喻"orange"
,"quick.orange.male.rabbit"
发送动静将会产生什么? 这些动静不会匹配任何绑定,因此会丢失。
可是对付"lazy.orange.male.rabbit"
,纵然它有四个单词,可是它与第二个queue的binding key匹配,因此将会被发送到第二个queue中。
当一个queue利用"#"
(hash)作为binding key,那么它将会吸收所有的动静,忽略routing key,就仿佛利用了fanout exchange。 当非凡字符”*“(star)和”#“(hash)在绑定中没有用到,topic exchange将会与direct exchange的行为沟通。
相识了topic exchange之后,我们将它用在我们的log系统中,我们界说的routing key将会有两个单词构成:"<facility>.<severity>"
。
完成的EmitLogTopic.java
:
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... }
完整的ReceiveLogsTopic.java
:
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
运行的时候从呼吁行中输入binding key来举办绑定,吸收差异的动静。
Remote procedure call (RPC)