- 校验
- topic合法性校验
- 分区器校验
- 订阅
- 默认订阅模式设置为AUTO_TOPICS
- 更新订阅状态订阅信息
- 请求执行网络IO来更新元数据
消费者订阅主题调用的代码很简单如下所示:
String topic = "topic1";
consumer.subscribe(Collections.singletonList(topic));
KafkaConsumer类型的subscribe方法
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
//前面两个是topic合法性校验
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
//这里开始订阅
for (String topic : topics) {
//订阅之前先进行合法性校验
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
//这里如果未配置分区器则抛出异常 ,对应配置为partition.assignment.strategy 不配置其实也有默认值默认值为RangeAssignor策略 根据范围平均分片
throwIfNoAssignorsConfigured();
//清除掉缓冲区中本次未订阅的数据
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
//开始订阅
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
//请求IO执行
metadata.requestUpdateForNewTopics();
}
} finally {
release();
}
}
SubscriptionState类型的subscribe方法
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
return changeSubscription(topics);
}
这里仅仅将订阅重新负载均衡器,订阅类型,和订阅的TOPIC等数据更新到了SubscriptionState类型的对象中服务端还未感知到,后续网络IO执行具体的请求来实现TOPIC的订阅
-
- 开始
- 调用订阅方法KafkaConsumer的subscribe
- 消费者获取轻量级锁然后检测消费者状态是否正常
- groupId为空则抛出异常
- topic为空则执行取消订阅逻辑
- 遍历topic列表循环订阅每个topic
- topic为空则抛出异常
- 如果未配置分区管理器则ConsumerPartitionAssignor则抛出异常
- Fetcher清除当前主题分区不属于新分配分区的缓冲数据
- SubscriptionState执行订阅逻辑subscribe
- 注册重新平衡监听器
- 设置订阅类型SubscriptionType.AUTO_TOPICS
- 为SubscriptionState设置成员变量subscription(用户请求的主题列表)需要订阅的topic 集合Set
- 调用ConsumerMetadata的requestUpdateForNewTopics更新请求版本(后面异步IO去执行这个请求)
- 释放消费者轻量级锁
- 结束
Kafka15, Kafka源码15