欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

所有要注意的一点是如果poll函 CAD加密 数得到ConsumerRecords后如果处理是异步的

点击: 次  来源:昆山软开发 时间:2018-06-03

原文出处: 刘正阳

Kafka消费者客户端从Kafka cluster中读打动静并处理惩罚。

Kafka消费者可以手动绑定本身到某个topic的某些partition上可能通过subscribe要领监听某个topic自动绑定。Kafka消费者绑定到某个parition后就和这个partition的leader毗连,然后发出fetch request, 获打动静后举办处理惩罚。

offset打点

kafka的消费模子是一个partition最多被一个consumer消费,而offset可以有consumer节制,譬喻通过seek前进或退却到某个offset位置。

首次毗连时,可以通过KafkaConsumer设置参数里的auto.offset.reset参数抉择是从最新的位置(默认)照旧从就早的位置开始消费。

默认环境下, enable.auto.commit参数是true,即KafkaConsumer客户端会按时commit offset,所有要留意的一点是假如poll函数获得ConsumerRecords后假如处理惩罚是异步的,则大概呈现消费处理惩罚还没有完成可是却commit offset了,昆山软件开发,这时假如历程挂掉则重启后则会产生丢动静的环境。这里有两种办理方案,1是poll后的处理惩罚是同步的,这样下一次poll会实验commit offset,则能担保at least one语义。2是封锁enable.auto.commit, 然后通过KafkaConsumer.commitSync要领来手动commit offset。

max.poll.interval.ms参数用于配置kafka消费者处理惩罚一次poll的消费功效的最大时间(默认300s),假如高出了这个时间则consumer被认为挂了会从头rebalance。

Consumer线程相关

消费者多线程处理惩罚有几种方法

  1. 每个consumer只由一个线程处理惩罚,利益是能担保partition内有序和实现简朴,缺点是并发本领受限于partition的数量
  2. 将consumption和process进程疏散,即consumer拉到一个动静后通报给另一个线程或线程池处理惩罚,这里提高了并发本领可是需要留意多线程处理惩罚中的顺序问题不再担保以及大概呈现consumer提交了offset而线程池没处理惩罚完的环境,昆山软件开发,别的线程池要留意处理惩罚慢导致的内存行列积存问题。

KafkaConsumer.subscribe

监听某个topic

subscribe(Collection topics, ConsumerRebalanceListener listener)
当消费者利用kafka cluster来打点group membership时,ConsumerRebalanceListener会在consumer rebalance时挪用,consumer rebalance产生在消费者或消费干系变革的时候

  • 某个消费历程挂掉
  • 新消费历程插手
  • partition数量产生变革时
  • 这个Listener的常见用途是生存这个partition的最新消费offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里生存当前的partition和offset到数据库中。然后reassign完成后,昆山软件开发,void onPartitionsAssigned(java.util.Collection partitions)中从数据库读取之前的消费位置,通过seek要领配置消费位置继承消费。

    Kafka.poll

    public ConsumerRecords<K, V> poll(long timeout) {
    		// KafkaConsumer不是线程安详的
           acquireAndEnsureOpen();
           try {
               if (timeout < 0)
                   throw new IllegalArgumentException("Timeout must not be negative");
               if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                   throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
               // poll for new data until the timeout expires
               long start = time.milliseconds();
               long remaining = timeout;
               do {
                   Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                   if (!records.isEmpty()) {
                       // before returning the fetched records, we can send off the next round of fetches
                       // and avoid block waiting for their responses to enable pipelining while the user
                       // is handling the fetched records.
                       //
                       // NOTE: since the consumed position has already been updated, we must not allow
                       // wakeups or any other errors to be triggered prior to returning the fetched records.
                       if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                           client.pollNoWakeup();
                       if (this.interceptors == null)
                           return new ConsumerRecords<>(records);
                       else
                           return this.interceptors.onConsume(new ConsumerRecords<>(records));
                   }
                   long elapsed = time.milliseconds() - start;
                   remaining = timeout - elapsed;
               } while (remaining > 0);
               return ConsumerRecords.empty();
           } finally {
               release();
           }
       }

    pollOnce处理惩罚

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            client.maybeTriggerWakeup();
            // 协调者举办一次poll,内里会按照auto.commit.interval.ms抉择是否自动提交offset
            coordinator.poll(time.milliseconds(), timeout);
            // fetch positions if we have partitions we're subscribed to that we
            // don't know the offset for
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions());
            // 假如已经有record数据了直接返回
            // if data is available already, return it immediately
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
            if (!records.isEmpty())
                return records;
            // 发送一次fetch请求
            // send any new fetches (won't resend pending fetches)
            fetcher.sendFetches();
            long now = time.milliseconds();
            long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
            // 期待fetch请求功效
            client.poll(pollTimeout, now, new PollCondition() {
                @Override
                public boolean shouldBlock() {
                    // since a fetch might be completed by the background thread, we need this poll condition
                    // to ensure that we do not block unnecessarily in poll()
                    return !fetcher.hasCompletedFetches();
                }
            });
            // after the long poll, we should check whether the group needs to rebalance
            // prior to returning data so that the group can stabilize faster
            if (coordinator.needRejoin())
                return Collections.emptyMap();
            // 返回fetch功效
            return fetcher.fetchedRecords();
        }