发送器Sender类型的的sendProducerData 模版方法


  • 元数据相关:
    • 集群信息缓存查询
    • 主题分区缓存查询
    • 无leader主题分区更新元数据
  • 批处理数据相关:
    • 遍历准备发送数据的节点
    • 批处理记录的创建
    • 所有请求添加到飞行窗口
  • 批处理关联网络通道
    • 遍历批处理batches列表转换为内存记录列表
    • 将本批次的内存记录列表数据放入飞行集合中
    • 请求头信息放入对应IO通道KafkaChannel

下面是发送器Sender类型的的sendProducerData 模版方法源码直接看的话会比较枯燥可以结合代码大纲进行了解,主要分为三大部分:元数据处理批处理记录与飞行窗口处理批处理存放到对应网络通道

private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",

    // remove any nodes we aren't ready to send to
  //遍历准备发送数据的节点集合Set<Node> readyNodes
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node =;
        if (!this.client.ready(node, now)) {
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
  // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
  //飞行集合: Map<TopicPartition, List<ProducerBatch>> inFlightBatches
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)

  //遍历inFlightBatches集合 查看消息是否超过了deliveryTimeoutMs
    List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
  //遍历原始的批处理集合batches: ConcurrentMap<TopicPartition, Deque<ProducerBatch>>查看消息是否超过了deliveryTimeoutMs
    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);

    // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
    // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
    // we need to reset the producer id here.
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    for (ProducerBatch expiredBatch : expiredBatches) {
        String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
            + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
        failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            // This ensures that no new batches are drained until the current in flight batches are fully resolved.

    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
    // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
    // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
    // that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        pollTimeout = 0;
    sendProduceRequests(batches, now);
    return pollTimeout;




private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
    for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());



private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
		//内存记录内部通过字节缓冲区ByteBuffer来对记录进行存储(ByteBuffer有两种一种是堆字节缓冲,直接是直接内存字节缓冲区) 这里内存记录使用堆缓冲区HeapByteBuffer
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    // find the minimum magic version used when creating the record sets
    byte minUsedMagic = apiVersions.maxUsableProduceMagic();
    for (ProducerBatch batch : batches) {
        if (batch.magic() < minUsedMagic)
            minUsedMagic = batch.magic();
    for (ProducerBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        MemoryRecords records = batch.records();

        // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
        // that the producer starts building the batch and the time that we send the request, and we may have
        // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
        // the new message format, but found that the broker didn't support it, so we need to down-convert on the
        // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
        // not all support the same message format version. For example, if a partition migrates from a broker
        // which is supporting the new magic version to one which doesn't, then we will need to convert.
        if (!records.hasMatchingMagic(minUsedMagic))
            records = batch.records().downConvert(minUsedMagic, 0, time).records();
       //主题分区-内存记录映射 key:主题分区,value: 内存记录
        produceRecordsByPartition.put(tp, records);
       //主题分区-批处理映射  key:主题分区,value: 生产者批处理
        recordsByPartition.put(tp, batch);

    String transactionalId = null;
    if (transactionManager != null && transactionManager.isTransactional()) {
        transactionalId = transactionManager.transactionalId();
  //请求构建器 用于创建生产者请求对象ProduceRequest
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
  //客户端请求对象创建 ClientRequest类型对象
    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);

KafkaClient 是一个封装客户端的一个接口,仅有的实现类型为NetworkClient


public void send(ClientRequest request, long now) {
    doSend(request, false, now);



private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
    String nodeId = clientRequest.destination();
    AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
    try {
        //版本兼容处理 ...
        // The call to build may also throw UnsupportedVersionException, if there are essential
        // fields that cannot be represented in the chosen version.
        doSend(clientRequest, isInternalRequest, now,;
    } catch (UnsupportedVersionException unsupportedVersionException) {
        // If the version is not supported, skip sending the request over the wire.


private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    //目的地和请求头转发送对象 序列化为NetworkSend类型的网络发送实体对象
    Send send = request.toSend(destination, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
   //内部的飞行集合为requests对象类型为ap<String, Deque<NetworkClient.InFlightRequest>> key为destination也就是节点id,值

kafka Selector类型的的send方法队列给定的请求

public void send(Send send) {
    String connectionId = send.destination();
   //获取当前连接id的通道 Selector中的channels中缓存 每个连接id都会对应一个KafkaChannel
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
    } else {
        try {
        } catch (Exception e) {


public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;



public void addInterestOps(int ops) {
    key.interestOps(key.interestOps() | ops);




  • 开始调用sendProducerData方法
  • 集群信息查询:MetadataCache中获取集群元数据信息Cluster
  • 主题分区查询:获取准备发送数据的分区列表,记录累加器中获取准备准备发送数据的主题分区
    • 查询遍历缓存的主题分区与批处理队列集合ConcurrentMap<TopicPartition, Deque> batches
      • 从主题分区信息中获取leader节点信息:Node leader
      • 根据批处理队列Deque 中的第一个元素判断是否需要发送如果需要发送则将leader节点对象添加到准备发送节点集合Set readyNodes中然后将其返回
  • 无leader主题分区更新元数据:如果存在主题分区对应的leader为空则强制更新无leader的主题分区对应的元数据
  • 遍历准备发送数据的节点:集合Set readyNodes
    • 连接移除或者无法正常建立连接的节点则直接移除
    • 如果连接状态建立正常则直接返回true继续
    • 如果还未建立连接状态则执行initiateConnect方法设置连接IO事件SelectionKey.OP_CONNECT
      • 调用Selectable selector的connect方法
        • 配置SocketChannel
          • configureBlocking(false)
          • socket.setKeepAlive(true);
          • socket.setSendBufferSize(sendBufferSize);
          • socket.setReceiveBufferSize(receiveBufferSize);
          • socket.setTcpNoDelay(true);
        • channel.connect(address);
        • registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
          • socketChannel.register(nioSelector, interestedOps); 这里仅仅执行了事件的添加并未执行网络IO
            • interestedOps存放在SelectorImpl类型的INTERESTOPS对象中
        • ChannelBuilder进行调用buildChannel方法进行构建KafkaChannel
          • 创建PlaintextTransportLayer类型对象
          • 创建PlaintextAuthenticator类型对象
          • 创建KafkaChannel类型对象
        • 将KafkaChannel对象绑定在SelectionKey对象中
        • 将节点id nodeConnectionId与KafkaChannel存入通道集合Map<String, KafkaChannel> channels
        • IdleExpiryManager 连接过期管理器对当前连接进行管理
          • 将连接的connectionId和连接创建时间存入IdleExpiryManager的LRU集合中Map<String, Long> lruConnections
  • 批处理记录的创建:RecordAccumulator记录累加器调用drain方法进行记录的创建
    • 遍历当前所有准备好的节点集合Set readyNodes;
      • 从元数据中获取当前节点id的分区信息:List parts,然后循环分区数据
        • 开始创建主题分区对象TopicPartition
        • 获取当前分区对应的批处理队列Deque 从集合ConcurrentMap<TopicPartition, Deque> batches中获取
        • 获取第一个ProducerBatch
        • batch.close();
        • 将批处理存放在List ready集合中
        • 最终放到节点id映射批处理集合中进行返回:Map<Integer, List> batches
  • 所有请求添加到飞行窗口:将所有请求添加到飞行窗口addToInflightBatches(batches);
    • 遍历批量处理列表
    • 获取当前分区的飞行窗口批处理列表,如果不存在则创建一个List inflightBatchList
    • 飞行窗口中存放当前分区的批处理列表 Map<TopicPartition, List> inFlightBatches
  • 遍历批处理batches列表:发送的请求处理调用sendProduceRequests方法**开始遍历批处理batches列表 每个节点对应的每个批处理队列Map<Integer, List> batches
    • 处理当前节点的批处理队列 List batches
    • 遍历当前节点的批处理队列 List batches
      • 处理当前批处理消息ProducerBatch batch
      • KafkaClient创建请求对象ClientRequest
      • KafkaClient调用send方法发送ClientRequest
      • 调用NetworkClient的doSend执行发送逻辑
        • 根据版本、ACK、和分区内存数据等变量创建ProduceRequest对象
        • 创建请求头RequestHeader
        • 将节点id destination变量和请求头RequestHeader对象封装为NetworkSend对象
        • 再将数据封装为InFlightRequest inFlightRequest对象
        • 将其添加到InFlightRequests inFlightRequests集合中
        • 调用Selectable的send方法
          • 获取当前连接的KafkaChannel
          • 将待发送的任务NetworkSend对象存放在KafkaChannel通道中
          • TransportLayer触发SelectionKey的interestOps设置OP_WRITE
  • 结束