欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: 谢晞鸣

1. Client端,三种发送方法

RocketMQ 支持常见的三种发送方法,

  • SYNC
  • producer.send(msg)

    同步的发送方法,会期待发送功效后才返回。可以用 send(msg, timeout) 的方法指定期待时间,假如不指定,就是默认的 3000ms. 这个timeout 最终会被配置到 ResponseFuture 里,昆山软件开发,再发送完动静后,用 countDownLatch 去 await timeout的时间,假如逾期,就会抛出异常。

  • ASYNC
  • producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            System.out.printf("%-10d Exception %s %n", index, e);
            e.printStackTrace();
        }
    });

    异步的发送方法,发送完后,立即返回。Client 在拿到 Broker 的响应功效后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的 3000ms.

  • ONEWAY
  • producer.sendOneway(msg);

    较量简朴,发出去后,什么都不管直接返回。

    对付每种方法,Producer 还提供了可以指定 MessageQueue, MessageQueueSelector的API,这属于稍微高端一点的玩法,一般用它提供的默认的计策选择 MessageQueue 就可以了。

    2. Client端发送进程

    下面以 SYNC 方法为例,看下整个动静的发送进程,其他方法略有差别,昆山软件公司,总体流程雷同。

    1. 按照 Topic 找到指定的 TopicPublishInfo

    先去当地 map 找,假如没有,就去 Namesrv fetch, 假如 Namesrv 里也没有,则用默认的 Topic 再去 fetch TopicRouteData. 对用用默认 Topic 的这种环境,Client 拿到数据后,会去构建 TopicPublishInfo, 然后用当前的 Topic 作为 key 放到当地 map 里。Broker 在吸收到动静的时候,会去更新它当地的设置,然后在 registerBroker 的时候会去更新 namesrv 中的 TopicRouteData 信息,这样 Namesrv 中就会有这样一份设置了。虽然,也可以事先在 Namesrv 增加该设置,许多公司内部都有这样定制的平台来打点MQ的接入设置。

    public class TopicPublishInfo {
        private boolean orderTopic = false;
        private boolean haveTopicRouterInfo = false;
        private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
        private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
        private TopicRouteData topicRouteData;
    }
    
    public class TopicRouteData {
        private String orderTopicConf;
        private List<QueueData> queueDatas;
        private List<BrokerData> brokerDatas;
        private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    }

    QueueData 界说了这个 read 和 write 的 queue的数量,Client 在拿到 TopicRouteData 后,会按照这里配的数量去构建响应数目标messageQueue,即 messageQueueList. brokerDatas 生存了各个 broker 的相关信息。

    2. 从 messageQueueList 中选择一个 MessageQueue

    假如没有 enable latencyFaultTolerance,就用递增取模的方法选择。假如 enable 了,在递增取模的基本上,再过滤掉 not available 的。这里所谓的 latencyFaultTolerance, 是指对之前失败的,按必然的时间做退避:

    long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    举个例子,假如上次请求的 latency 高出 550L ms, 就退避 3000L ms;高出 1000L,就退避 60000L.

    以上就是 Producer 到 Broker 的简朴的负载平衡。

    3. 发送动静

    到这一步,我们已经拿到了这些要害数据:

  • Message, 要发送的动静
  • MessageQueue,这内里包罗 topic/brokerName/queueId
  • CommunicationMode, 发送方法, SYNC/ASYNC/ONEWAY
  • TopicPublishInfo
  • 有了这些数据,就可以构建 RequestHeader 了,大部门字段意思都很明明(虽然,前提是对RocketMQ的源码有所熟悉),个体字段见注释。

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    //系统Flag, 用于判定走什么逻辑。标识是否压缩,事务的差异TYPE(prepare/rollback/commit/not transaction) 等
    requestHeader.setSysFlag(sysFlag); 
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    //动静Flag, 最终会落地
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    //TODO,暂不知道这个字段是干嘛用的
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);

    最后用这些 header 字段,以及 message body 构建 RemotingCommand,通过 remoting 模块发给 broker.