Kafka消费者客户端从Kafka cluster中读打动静并处理惩罚。
Kafka消费者可以手动绑定本身到某个topic的某些partition上可能通过subscribe要领监听某个topic自动绑定。Kafka消费者绑定到某个parition后就和这个partition的leader毗连,然后发出fetch request, 获打动静后举办处理惩罚。
offset打点
kafka的消费模子是一个partition最多被一个consumer消费,而offset可以有consumer节制,譬喻通过seek前进或退却到某个offset位置。
首次毗连时,可以通过KafkaConsumer设置参数里的auto.offset.reset参数抉择是从最新的位置(默认)照旧从就早的位置开始消费。
默认环境下, enable.auto.commit参数是true,即KafkaConsumer客户端会按时commit offset,所有要留意的一点是假如poll函数获得ConsumerRecords后假如处理惩罚是异步的,则大概呈现消费处理惩罚还没有完成可是却commit offset了,昆山软件开发,这时假如历程挂掉则重启后则会产生丢动静的环境。这里有两种办理方案,1是poll后的处理惩罚是同步的,这样下一次poll会实验commit offset,则能担保at least one语义。2是封锁enable.auto.commit, 然后通过KafkaConsumer.commitSync要领来手动commit offset。
max.poll.interval.ms参数用于配置kafka消费者处理惩罚一次poll的消费功效的最大时间(默认300s),假如高出了这个时间则consumer被认为挂了会从头rebalance。
Consumer线程相关
消费者多线程处理惩罚有几种方法
KafkaConsumer.subscribe
监听某个topic
subscribe(Collection topics, ConsumerRebalanceListener listener)
当消费者利用kafka cluster来打点group membership时,ConsumerRebalanceListener会在consumer rebalance时挪用,consumer rebalance产生在消费者或消费干系变革的时候
这个Listener的常见用途是生存这个partition的最新消费offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里生存当前的partition和offset到数据库中。然后reassign完成后,昆山软件开发,void onPartitionsAssigned(java.util.Collection partitions)中从数据库读取之前的消费位置,通过seek要领配置消费位置继承消费。
Kafka.poll
public ConsumerRecords<K, V> poll(long timeout) { // KafkaConsumer不是线程安详的 acquireAndEnsureOpen(); try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) client.pollNoWakeup(); if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } }
pollOnce处理惩罚
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { client.maybeTriggerWakeup(); // 协调者举办一次poll,内里会按照auto.commit.interval.ms抉择是否自动提交offset coordinator.poll(time.milliseconds(), timeout); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // 假如已经有record数据了直接返回 // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 发送一次fetch请求 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); // 期待fetch请求功效 client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); // 返回fetch功效 return fetcher.fetchedRecords(); }