过了个春节,回到公司的成小胖酿成了成大胖。可是你们千万别觉得他谁人大肚子内里装的都是肥肉,内里的墨水也多了不少嘞,究竟成小胖操作春节的半个月时间专心进修并研究了 ActiveMQ,嘿嘿……
这不,为了检讨下本身的进修成就,上班的第一天成小胖就去找架构师老王交换 ActiveMQ 相关的常识,还顺便向老王讨了个红包,可把成小胖给兴奋坏了。
“来,按照你的了讲解下 ActiveMQ 是什么。”
“这个简朴,ActiveMQ 是一个 MOM,详细来说是一个实现了 JMS 类型的系统间长途通信的动静署理。它……”
“等等,先表明下什么是 MOM。”
“好。MOM 就是面向动静中间件(Message-oriented middleware),是用于以漫衍式应用或系统中的异步、松耦合、靠得住、可扩展和安详通信的一类软件。MOM 的总体思想是它作为动静发送器和动静吸收器之间的动静中介,这种中介提供了一个全新程度的松耦合。”
“JMS呢?”
成小胖是个追求极致的人,为了表明得更通俗易懂,索性搬来一块白板边画边说。
“JMS 叫做 Java 动静处事(Java Message Service),是 Java 平台上有关面向 MOM 的技能类型,旨在通过提供尺度的发生、发送、吸收和处理惩罚动静的 API 简化企业应用的开拓,雷同于 JDBC 和干系型数据库通信方法的抽象。”
“嗯,很好。下面的这些观念你也需要出格领略下”:
“你来说说这个中 P2P 和 Pub/Sub 的区别吧”,老王给成小胖抛出了一个问题。
成小胖可不是吃素的,究竟要是吃素的话他也吃不到这么胖……这些根基观念对他来说都是小事一桩:
P2P (点对点)动静域利用 queue 作为 Destination,动静可以被同步或异步的发送和吸收,每个动静只会给一个 Consumer 传送一次。
Consumer 可以利用 MessageConsumer.receive() 同步地吸收动静,也可以通过利用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步吸收。
多个 Consumer 可以注册到同一个 queue 上,但一个动静只能被一个 Consumer 所吸收,然后由该 Consumer 来确认动静。而且在这种环境下,Provider 对所有注册的 Consumer 以轮询的方法发送动静。
Pub/Sub(宣布/订阅,Publish/Subscribe)动静域利用 topic 作为 Destination,宣布者向 topic 发送动静,订阅者注册吸收来自 topic 的动静。发送到 topic 的任何动静都将自动通报给所有订阅者。吸收方法(同步和异步)与 P2P 域沟通。
除非显式指定,不然 topic 不会为订阅者保存动静。虽然,这可以通过耐久化(Durable)订阅来实现动静的生存。这种环境下,当订阅者与 Provider 断开时,Provider 会为它存储动静。当耐久化订阅者从头毗连时,将会受到所有的断连期间未消费的动静。
“嗯,总结的很不错,上面的这些常识是进修 ActiveMQ 的理论基本,是必需要把握的。”
“既然 JMS 是一个通用的类型,那么利用它建设应用措施必定也有一个通用的步调吧?”老王追问道。
“有的有的。要不您来说说这个通用步调?就当我考考您,哈哈!”成小胖故作智慧,自觉得老王作为架构师不会存眷这些太详细的实现细节。
然而老王平日里亲力亲为,至今还经常撸代码,怎么会被这种小 case 所难倒?于是老王分分钟给出谜底:
“66666,锋利啊我的王哥!”成小胖的小智慧被老王击得毁坏!
“你嘴皮子耍够了吧,照旧多动动手吧。此刻你手写上面步调对应的代码实现吧”,老王给了成小胖一个眼神,让他本身逐步体会……
成小胖也不是省油的灯,顿时擦清洁白板,现场撸了起来(是撸代码,撸代码,撸代码,重要的工作说三遍):
public class JMSDemo { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; MessageConsumer consumer; Message message; boolean useTransaction = false; try { Context ctx = new InitialContext(); connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName"); //利用ActiveMQ时:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker)); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TEST.QUEUE"); //出产者发送动静 producer = session.createProducer(destination); message = session.createTextMessage("this is a test"); //消费者同步吸收 consumer = session.createConsumer(destination); message = (TextMessage) consumer.receive(1000); System.out.println("Received message: " + message); //消费者异步吸收 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null) { doMessageEvent(message); } } }); } catch (JMSException e) { ... } finally { producer.close(); session.close(); connection.close(); } }
老王满足的点颔首:“还算不赖哈~ JMS 通用的类型咱们都聊完了,下面就来聊点 ActiveMQ 更详细点的对象咯。”