•开始
•Fetcher的sendFetches()为我们分配了分区的任何节点设置一个fetch请求,前提是这些节点没有正在进行的fetch或挂起的fetch数据。
•prepareFetchRequests准备请求数据FetchRequestData
•调用fetchablePartitions方法从被分配的分区列表assignment中查询出当前可以读取数据的分区
•根据当前完成的TopicPartition获取其位置FetchPosition
•当前分区的待读取broker节点查询,如果存在优先副本节点则使用优先副本节点如果不存在则使用当前分区的leader节点
•当前待读取数据节点无对应飞行窗口请求则开始创建请求
•为当前节点创建一个FetchSessionHandler和FetchSessionHandler.Builder和FetchRequest.PartitionData
•最后封装数据到集合然后返回Map<Node, FetchSessionHandler.FetchRequestData>
•遍历当前的FetchRequestData
•为其创建FetchRequest.Builder
•KafkaClient将所有参数封装为ClientRequest然后将其放在unsent集合中
•调用selector.wakeup();唤醒休眠中的IO
•回到最外层调用ConsumerNetworkClient的poll方法执行IO逻辑
•调用Selectable类型的poll方法来查询io事件然后执行这个在生产者那里详细说了这里就不说了
•处理IO结果
Kafka15, Kafka源码15