欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: 刘正阳

Producer

Producer是出产者的接口界说
常用的要领有

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public void flush();
public void close();

KafkaProducer是异步的,昆山软件开发,挪用send要领后,kafka并没有当即发送给broker,而是先放在buffer缓冲池中就当即返回,靠山的IO线程来认真把动静记录转换成请求发送给kafka集群。

buffer巨细通过batch.size设置置顶,producer维护每个partition的没有发送记录的buffer。
默认环境下不满的buffer也是可以发送的,可以通过linger.ms来配置期待时间淘汰请求数量,跟TCP中的Nagle算法是一个原理。
producer的总的buffer巨细可以通过buffer.memory节制,假如出产太快来不及发送高出了这个值则会block住,block的最大时间通过max.block.ms,超时后会抛出TimeoutException
key.serializevalue.serializer节制如何把Java工具转换成byte数组传输给kafka集群。
acks节制producer什么时候认为写乐成了,数量是需要leader得到的ack的数量。acks=0时producer把动静记录放到socket buffer中就认为乐成了;acks=1时,需要leader乐成写到当地就返回,可是不需要期待follower的ack。acks=all是,劳务派遣管理系统,需要所有的in-sync replica都返回ack才认为是发送乐成,这样只要有一个in-sync replica存勾当静就没有丢。

Partitioner认真抉择将哪一个动静写入到哪一个partition, 有一些场景但愿特定的key发送到特定的partition时可以指定本身实现的Paritioner。
默认的Partitioner是随机负载平衡的。

 

 

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
       int numPartitions = partitions.size();
       if (keyBytes == null) {
           int nextValue = nextValue(topic);
           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
           if (!availablePartitions.isEmpty()) {
               int part = Utils.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
           } else {
               // no partitions are available, give a non-available partition
               return Utils.toPositive(nextValue) % numPartitions;
           }
       } else {
           // hash the keyBytes to choose a partition
           return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
       }
   }
   private int nextValue(String topic) {
       AtomicInteger counter = topicCounterMap.get(topic);
       if (null == counter) {
           counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
           AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
           if (currentCounter != null) {
               counter = currentCounter;
           }
       }
       return counter.getAndIncrement();
   }

ProducerRecord

ProducerRecord包括了发送给Broker需要的内容

class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

KafkaProducer构建进程

 

 

// 建设partitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 设置序列化
if (keySerializer == null) {
    this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                             Serializer.class));
    this.keySerializer.configure(config.originals(), true);
} else {
    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
    this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
    this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                               Serializer.class));
    this.valueSerializer.configure(config.originals(), false);
} else {
    config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
    this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
        true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config);
int retries = configureRetries(config, transactionManager != null);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null);
this.apiVersions = new ApiVersions();
// RecordAccumulator中实现了累加和期待的逻辑
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
        this.totalMemorySize,
        this.compressionType,
        config.getLong(ProducerConfig.LINGER_MS_CONFIG),
        retryBackoffMs,
        metrics,
        time,
        apiVersions,
        transactionManager);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
// 高层的网络处理惩罚,封装了send、poll等接口
NetworkClient client = new NetworkClient(
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                this.metrics, time, "producer", channelBuilder),
        this.metadata,
        clientId,
        maxInflightRequests,
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMs,
        time,
        true,
        apiVersions,
        throttleTimeSensor);
// 认真实际发送请求给kafka集群的靠山线程
this.sender = new Sender(client,
        this.metadata,
        this.accumulator,
        maxInflightRequests == 1,
        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
        acks,
        retries,
        this.metrics,
        Time.SYSTEM,
        this.requestTimeoutMs,
        config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
        this.transactionManager,
        apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");

KafkaProducer#send