•开始
•消费者协调器 ConsumerCoordinator调用poll方法
•poll轮询协调器事件。这确保了协调器是已知的,并且消费者已加入该组(如果使用组管理)。启用了周期性偏移提交,这也会处理它们。
•更新元数据内存快照信息
•调用偏移量量提交的回调方法OffsetCommitCallback的onComplete方法
•始终更新心跳上次轮询时间,以便心跳线程不会由于应用程序不活动而主动离开组,即使(比如)找不到协调器。
•检查心跳线程状态如果需要,唤醒心跳线程
•重置心跳时间
•调用 的 joinGrouAbstractCoordinatorpIfNeeded来决定是否需要加入组
•调用ConsumerCoordinator 的commitOffsetsAsync处理异步提交偏移量
•调用KafkaConsumer的updateFetchPositions方法根据配置的策略获取偏移量位置将提取位置设置为提交位置(如果有),或使用用户配置的偏移重置策略将其重置。
•根据当前的leader和epoch验证每个分区。如果我们看到新的元数据版本,请检查所有分区
•获取所有分区的偏移量和元数据信息
•遍历所有分区的偏移量数据
•如果有新的epoch值的领导者更新则标记元数据信息需要全量更新
•获取当前遍历到的分区的LeaderAndEpoch
•封装偏移量和LeaderAndEpoch为分区偏移量位置FetchPosition对象
•切换主题分区当前状态到FETCHING然后到AWAIT_VALIDATION
•返回一个Future对象可以同步等待结果
•结束
Kafka15, Kafka源码15