- 开始
- 调用订阅方法KafkaConsumer的subscribe
- 消费者获取轻量级锁然后检测消费者状态是否正常
- groupId为空则抛出异常
- topic为空则执行取消订阅逻辑
- 遍历topic列表循环订阅每个topic
- topic为空则抛出异常
- 如果未配置分区管理器则ConsumerPartitionAssignor则抛出异常
- Fetcher清除当前主题分区不属于新分配分区的缓冲数据
- SubscriptionState执行订阅逻辑subscribe
- 注册重新平衡监听器
- 设置订阅类型SubscriptionType.AUTO_TOPICS
- 为SubscriptionState设置成员变量subscription(用户请求的主题列表)需要订阅的topic 集合Set
- 调用ConsumerMetadata的requestUpdateForNewTopics更新请求版本(后面异步IO去执行这个请求)
- 释放消费者轻量级锁
- 结束
Kafka14, Kafka源码14