更新元数据
- 消费者协调器执行poll逻辑
- 消费者协调器查询最少连接节点
开始
调用KafkaConsumer的poll方法
获取轻量级锁然后检测消费者开启状态
消费者监控指标KafkaConsumerMetrics记录poll开始执行时间
如果 SubscriptionState 的subscriptionType类型为SubscriptionType.NONE则抛出异常
执行未超时则循环fetch处理逻辑
ConsumerNetworkClient唤醒状态判断
updateAssignmentMetadataIfNeeded尝试更新分配元数据,但不需要阻止加入组的计时器
Fetcher触发发送逻辑调用的ConsumerNetworkClient类型的sendFetches
prepareFetchRequests为在飞行队列中没有现有请求的分区分配的所有节点创建获取请求。 。
ConsumerNetworkClient的send方法开始发送当前topic分区数据
KafkaClient的.wakeup()唤醒客户端,以防它在轮询中阻塞,以便我们可以发送排队的请求
ConsumerNetworkClient的poll
调用KafkaClient的send将给定请求排队,以便在后续轮询(长)调用中发送
selector.poll执行IO事件
处理响应结果
结束
Kafka15, Kafka源码15