配景
我们直接跑到最底层,看看kafka的网络层处理惩罚是怎么处理惩罚的。因为Java的NIO照旧偏底层,不能直接用来做应用开拓,所以一般都利用像netty的框架可能凭据本身的需要封装一些nio,让上层业务不消体贴网络处理惩罚的细节,只需要建设处事监听端口、接管请求、处理惩罚请求、写返回就可以了。我在看netty、thrift等涉及到网络的Java框架时较量喜欢去看他们的nio是怎么封装的,这里也是可以或许浮现作者程度的处所。java nio的根基元素为Selector、Channel、ByteBuffer。
我们从server和client两头别离阐明。
kafka server端在org.apache.kafka.common.network中举办了封装。
就像package.html内里写的。
> The network server for kafka. No application specific code here, just general network server stuff. The classes Receive and Send encapsulate the incoming and outgoing transmission of bytes. A Handler is a mapping between a Receive and a Send, and represents the users hook to add logic for mapping requests to actual processing code. Any uncaught exceptions in the reading or writing of the transmissions will result in the server logging an error and closing the offending socket. As a result it is the duty of the Handler implementation to catch and serialize any application-level errors that should be sent to the client. This slightly lower-level interface that models sending and receiving rather than requests and responses is necessary in order to allow the send or receive to be overridden with a non-user-space writing of bytes using FileChannel.transferTo.
启动进程
网络层的启动在SocketServer.kafka中, 属于KafkaServer启动进程中的一部门
首先看一下server.properties中的网络相关设置
昆山软件定制开拓 持续发送多个应用层的请求" class="aligncenter size-large wp-image-28814" title="multi-reactor" src="/uploads/allimg/c180521/152D4B0C3P-1B08.png" />
# The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The number of queued request allowed before blocking the network threads #queued.max.requests # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600
SocketServer
这个类上的注释叙述了kafka server的io线程模子
这个类上的注释叙述了kafka server的io线程模子 /** * An NIO socket server. The threading model is * 1 Acceptor thread that handles new connections * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. */
一共三种线程。一个Acceptor线程认真处理惩罚新毗连请求,会有N个Processor线程,每个都有本身的Selector,认真从socket中读取请求和将返回功效写回。然后会有M个Handler线程,认真处理惩罚请求,而且将功效返回给Processor。
将Acceptor和Processor线程分隔的目标是为了制止读写频繁影响新毗连的吸收。
SocketServer初始化
SockerServer建设的时候通过server.properties和默认的设置中获取设置,如numNetworkThread(num.network.threads,也就是线程模子中的N)、
建设processor数组、acceptorMap(因为大概会在多个Endpoint吸收请求)、memoryPool(SimpleMemoryPool里主要做的工作是统计监控ByteBuffer的利用)、requestChanne等 。
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap private val numProcessorThreads = config.numNetworkThreads private val maxQueuedRequests = config.queuedMaxRequests private val totalProcessorThreads = numProcessorThreads * endpoints.size private val maxConnectionsPerIp = config.maxConnectionsPerIp private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides this.logIdent = "[Socket Server on Broker " + config.brokerId + "], " private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization") private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics") memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) private val processors = new Array[Processor](totalProcessorThreads) private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _