RocketMQ 的 Java API
RocketMQ 是用 Java 语言开拓的,因此,其 Java API 相对是较量富厚的,虽然也有部门原因是 RocketMQ 自己提供的成果就较量多。RocketMQ API 提供的成果包罗,
单看成果的话,纵然不算事务动静,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个成果。RocketMQ 的代码示例在 rocketmq-example 中,留意,代码是不能直接运行的,因为所有的代码都少了配置 name server 的部门,需要本身手动加上,譬喻,producer.setNamesrvAddr("192.168.232.23:9876");
。
先来看一下出产者的 API,较量简朴,只有一种,如下,
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import java.util.List; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.232.23:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID188",// key ("RocketMQ "+String.format("%05d", i)).getBytes());// body SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, i)); System.out.println(String.format("%05d", i)+sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
可以发明,对比 Kafka 的 API,只多了 Tag,但实际上行为有很大差异。Kafka 的出产者客户端,有同步和异步两种模式,但都是阻塞模式,send
要领返回发送状态的 Future
,可以通过 Future
的 get
要领阻塞得到发送状态。而 RocketMQ 回收的是同步非阻塞模式,发送之后立即返回发送状态(而不是 Future
)。正常环境下,两者利用上不同不大,可是在高可用场景中产生主备切换的时候,Kafka 的同步可以期待切换完成并重连,最后返回;而 RocketMQ 只能立即报错,昆山软件开发,由出产者选择是否重发。所以,在出产者的 API 上,劳务派遣管理系统,其实 Kafka 是要强一些的。
别的,RocketMQ 可以通过指定 MessageQueueSelector
类的实现来指定将动静发送到哪个分区去,Kafka 是通过指定出产者的 partitioner.class
参数来实现的,机动性上 RocketMQ 略胜一筹。
再来看消费者的API,由于 RocketMQ 的成果较量多,我们先看 Pull 模式消费的API,如下,
import java.util.HashMap; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.store.OffsetStore; import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { long offset = consumer.fetchConsumeOffset(mq, true); PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (null != pullResult.getMsgFoundList()) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { System.out.print(new String(messageExt.getBody())); System.out.print(pullResult); System.out.println(messageExt); } } putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: // TODO break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } }