KafkaProducer处理数据到发送批处理队列
- 开始
- 封装消息的TOPIC、KEY 、MESSAGE到 ProducerRecord类型对象
- 调用KafkaProducer类型的 send方法
- 执行拦截器 ProducerInterceptors的的 onSend方法
- 调用KafkaProducer类型的 doSend方法
- 如果生产者sender对象已经关闭则抛出异常直接结束
- 本地MetadataCache缓存中查询集群信息Cluster waitOnMetadata方法
- invalidTopics过滤
- 本地内存集群信息Cluster缓存中获取Topic的分区数这个时候本地是没有分区的
- **更新元数据:**调用ProducerMetadata的requestUpdate 改变标记变量needUpdate = true;与请求版本号等待网络IO的执行来更新元数据
- 唤醒IO执行元数据请求:调用Sender对象的wakeup()方法唤醒网络IO
- 调用到NetworkClient的wakeup(),
- 再调用Selectable的wakeup()
- java.nio.channels.Selector nioSelector的wakeup()
- mac电脑执行KQueueSelectorImpl的wakeup()执行逻辑
- 执行IOUtil.write1(fd1, (byte)0); 这个是个native方法
- mac电脑执行KQueueSelectorImpl的wakeup()执行逻辑
- java.nio.channels.Selector nioSelector的wakeup()
- 再调用Selectable的wakeup()
- 调用到NetworkClient的wakeup(),
- 等待元数据的IO请求结果:调用ProducerMetadata类型的awaitUpdate等待60秒(配置为max.block.ms默认60秒)obj.wait等待元数据请求(IO线程会拉取元数据)
- IO线程:拉取元数据Metadata中的handleMetadataResponse方法处理元数据的响应将数据放到本地缓存
- IO线程:调用生产者元数据对象的notifyAll方法唤醒发送线程
- 序列化KEY: 调用keySerializer的serialize方法序列化KEY
- **序列化VALUE: **调用valueSerializer对象的serialize方法序列化VALUE
- 对KEY执行分区计算:调用分区器Partitioner的partition方法对KEY进行分区默认是Hash取模
- 发送记录大小校验
- 预估发送数据大小计算方式为: magic + keySize + valueSize
- 发送的数据长度不能超过请求大小max.request.size配置
- 发送的数据长度不能超过内存大小buffer.memory配置
- 数据存放在记录累加器RecordAccumulator中(内存池与内存队列批量发送使用)
- 为当前主题分区TopicPartition创建对应的批处理队列Deque
- 批处理队列与分区信息映射关系存储:映射结果存放在分区记录器的成员变量ConcurrentMap<TopicPartition, Deque
> batches中 - 内存池分配:调用BufferPool类型对象free的allocate方法 (使用ByteBuffer来创建的)
- 将请求加入批处理队列中:创建批处理对象ProducerBatch
- 将数据通过ByteBufferOutputStream写入ByteBuffer中
- 封装结果到RecordAccumulator.RecordAppendResult类型中
- 为当前主题分区TopicPartition创建对应的批处理队列Deque
- 唤醒网络IO: 批处理已满或者首次创建批处理对象则调用Sender类型的wakeup方法唤醒网络IO
- Sender类型的wakeup
- KafkaClient的wakeup
- Selectable类型的wakeup
- java.nio.channels.Selector的wakeup
- mac电脑执行KQueueSelectorImpl的wakeup()执行逻辑
- IOUtil.write1(fd1, (byte)0);
- mac电脑执行KQueueSelectorImpl的wakeup()执行逻辑
- java.nio.channels.Selector的wakeup
- Selectable类型的wakeup
- KafkaClient的wakeup
- Sender类型的wakeup
- 返回一个Future对象可以同步等待结果
- 结束
KafkaProducer类型的doSend方法如下:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
Kafka15, Kafka源码15