Kafka broker上对付produce出产者出产动静的处理惩罚
Kafka Server处理惩罚生成者请求
进口在KafkaApis.scala, 通过request.header.apikey判定动静范例
def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.header.apiKey) match { case ApiKeys.PRODUCE => handleProduceRequest(request)
出产动静则挪用replicaManager.appendRecords
// call the replica manager to append messages to the replicas replicaManager.appendRecords( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log produceRequest.clearPartitionRecords()
ReplicaManager.scala
appendRecords 先写动静到partition的leader上,假如requireAcks==-1说明需要所有isr都写入乐成才返回response,昆山软件开发,而isr同样作为leader的消费者来拉取的
/** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. */ def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Object] = None) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status } // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP) } responseCallback(responseStatus) } }
追加动静到当地log中
/** * Append the messages to the local replica logs */ private def appendToLocalLog(internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(entriesPerPartition)) entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) } else { try { val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => if (partition eq ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") partition.appendRecordsToLeader(records, isFromClient, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes) brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) (topicPartition, LogAppendResult(info)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException | _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error("Error processing append operation on partition %s".format(topicPartition), t) (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } } } }