• 开始
  • 调用订阅方法KafkaConsumer的subscribe
  • 消费者获取轻量级锁然后检测消费者状态是否正常
  • groupId为空则抛出异常
  • topic为空则执行取消订阅逻辑
  • 遍历topic列表循环订阅每个topic
  • topic为空则抛出异常
  • 如果未配置分区管理器则ConsumerPartitionAssignor则抛出异常
  • Fetcher清除当前主题分区不属于新分配分区的缓冲数据
  • SubscriptionState执行订阅逻辑subscribe
  • 注册重新平衡监听器
  • 设置订阅类型SubscriptionType.AUTO_TOPICS
  • 为SubscriptionState设置成员变量subscription(用户请求的主题列表)需要订阅的topic 集合Set
  • 调用ConsumerMetadata的requestUpdateForNewTopics更新请求版本(后面异步IO去执行这个请求)
  • 释放消费者轻量级锁
  • 结束

,