欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: 刘正阳

Kafka broker上对付produce出产者出产动静的处理惩罚

Kafka Server处理惩罚生成者请求

进口在KafkaApis.scala, 通过request.header.apikey判定动静范例

def handle(request: RequestChannel.Request) {
try {
  trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
    format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
  ApiKeys.forId(request.header.apiKey) match {
    case ApiKeys.PRODUCE => handleProduceRequest(request)

出产动静则挪用replicaManager.appendRecords

// call the replica manager to append messages to the replicas
  replicaManager.appendRecords(
    timeout = produceRequest.timeout.toLong,
    requiredAcks = produceRequest.acks,
    internalTopicsAllowed = internalTopicsAllowed,
    isFromClient = true,
    entriesPerPartition = authorizedRequestInfo,
    responseCallback = sendResponseCallback)
  // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
  // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
  produceRequest.clearPartitionRecords()

ReplicaManager.scala
appendRecords 先写动静到partition的leader上,假如requireAcks==-1说明需要所有isr都写入乐成才返回response,昆山软件开发
,而isr同样作为leader的消费者来拉取的

/**
  * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  * the callback function will be triggered either when timeout or the required acks are satisfied;
  * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
  */
 def appendRecords(timeout: Long,
                   requiredAcks: Short,
                   internalTopicsAllowed: Boolean,
                   isFromClient: Boolean,
                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                   delayedProduceLock: Option[Object] = None) {
   if (isValidRequiredAcks(requiredAcks)) {
     val sTime = time.milliseconds
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
     val produceStatus = localProduceResults.map { case (topicPartition, result) =>
       topicPartition ->
               ProducePartitionStatus(
                 result.info.lastOffset + 1, // required offset
                 new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
     }
     // 1. required acks = -1
     // 2. there is data to append
     // 3. at least one partition append was successful (fewer errors than partitions)
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
       // create delayed produce operation
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
       // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed produce operation is being created, new
       // requests may arrive and hence make this operation completable.
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
     } else {
       // we can respond immediately
       val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
       responseCallback(produceResponseStatus)
     }
   } else {
     // If required.acks is outside accepted range, something is wrong with the client
     // Just return an error and don't handle the request at all
     val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
       topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
         LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP)
     }
     responseCallback(responseStatus)
   }
 }

追加动静到当地log中

/**
  * Append the messages to the local replica logs
  */
 private def appendToLocalLog(internalTopicsAllowed: Boolean,
                              isFromClient: Boolean,
                              entriesPerPartition: Map[TopicPartition, MemoryRecords],
                              requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
   trace("Append [%s] to local log ".format(entriesPerPartition))
   entriesPerPartition.map { case (topicPartition, records) =>
     brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
     brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
     // reject appending to internal topics if it is not allowed
     if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
       (topicPartition, LogAppendResult(
         LogAppendInfo.UnknownLogAppendInfo,
         Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
     } else {
       try {
         val partitionOpt = getPartition(topicPartition)
         val info = partitionOpt match {
           case Some(partition) =>
             if (partition eq ReplicaManager.OfflinePartition)
               throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
             partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
           case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
             .format(topicPartition, localBrokerId))
         }
         val numAppendedMessages =
           if (info.firstOffset == -1L || info.lastOffset == -1L)
             0
           else
             info.lastOffset - info.firstOffset + 1
         // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
         brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
         brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
         brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
         brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
           .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
         (topicPartition, LogAppendResult(info))
       } catch {
         // NOTE: Failed produce requests metric is not incremented for known exceptions
         // it is supposed to indicate un-expected failures of a broker in handling a produce request
         case e@ (_: UnknownTopicOrPartitionException |
                  _: NotLeaderForPartitionException |
                  _: RecordTooLargeException |
                  _: RecordBatchTooLargeException |
                  _: CorruptRecordException |
                  _: KafkaStorageException |
                  _: InvalidTimestampException) =>
           (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
         case t: Throwable =>
           brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
           brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
           error("Error processing append operation on partition %s".format(topicPartition), t)
           (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
       }
     }
   }
 }