将思路拉回到RegistryProtocol的创建Invoker对象的doCreateInvoker代码
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY),
0,
getPath(parameters, type),
parameters
);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
directory.subscribe(toSubscribeUrl(urlToRegistry));
return (ClusterInvoker<T>) cluster.join(directory, true);
}
不论是接口级还是应用级注册都会调用代码
return (ClusterInvoker<T>) cluster.join(directory, true);
我们详细看下:
MockClusterWrapper类型的join方法
buildFilterChain参数为true
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory, buildFilterChain));
}
默认的cluster类型为FailoverCluster
我们看FailoverCluster的join方法
FailoverCluster没有实现join方法需要先调用它的父类型AbstractCluster的join方法如下:
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
//带有过滤器将走上面这个逻辑
if (buildFilterChain) {
return buildClusterInterceptors(doJoin(directory));
} else {
return doJoin(directory);
}
}
//过滤器参数为true将走上面这个逻辑 先创建Invoker对象再创建过滤器
if (buildFilterChain) {
return buildClusterInterceptors(doJoin(directory));
doJoin方法将有默认类AbstractCluster的doJoin抽象方法调用具体方法FailoverCluster的doJoin方法如下
FailoverCluster的doJoin方法
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
可以看到这里调用链路创建了一个失效转移的Invoker对象FailoverClusterInvoker
FailoverClusterInvoker的构造器
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
父调用器AbstractClusterInvoker的构造器
public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
重载的构造器如下:
public AbstractClusterInvoker(Directory<T> directory, URL url) {
if (directory == null) {
throw new IllegalArgumentException("service directory == null");
}
this.directory = directory;
//sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
this.availableCheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());
this.reselectCount = configuration.getInt(RESELECT_COUNT, DEFAULT_RESELECT_COUNT);
this.enableConnectivityValidation = configuration.getBoolean(ENABLE_CONNECTIVITY_VALIDATION, true);
}
````
初始化过滤器链路的代码AbstractCluster类型的buildClusterInterceptors方法如下:
```java
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker) {
// AbstractClusterInvoker<T> last = clusterInvoker;
AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker));
if (Boolean.parseBoolean(ConfigurationUtils.getProperty(clusterInvoker.getDirectory().getConsumerUrl().getScopeModel(), CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) {
return build27xCompatibleClusterInterceptors(clusterInvoker, last);
}
return last;
}
ClusterFilterInvoker构造器
public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
//过滤器构造链DefaultFilterChainBuilder
List<FilterChainBuilder> builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
if (CollectionUtils.isEmpty(builders)) {
filterInvoker = invoker;
} else {
ClusterInvoker<T> tmpInvoker = invoker;
for (FilterChainBuilder builder : builders) {
tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
filterInvoker = tmpInvoker;
}
}
DefaultFilterChainBuilder类型的buildClusterInvokerChain构造过滤器链路
```java
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
URL url = originalInvoker.getUrl();
List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
List<ClusterFilter> filters;
if (moduleModels != null && moduleModels.size() == 1) {
//通过扩展查询匹配的消费者过滤器列表这里可以查询4个
filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
} else if (moduleModels != null && moduleModels.size() > 1) {
filters = new ArrayList<>();
List<ExtensionDirector> directors = new ArrayList<>();
for (ModuleModel moduleModel : moduleModels) {
List<ClusterFilter> tempFilters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModel).getActivateExtension(url, key, group);
filters.addAll(tempFilters);
directors.add(moduleModel.getExtensionDirector());
}
filters = sortingAndDeduplication(filters, directors);
} else {
filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, null).getActivateExtension(url, key, group);
}
//过滤器不为空则拼接到调用链表之中
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
}
return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
}
return last;
}
默认为4个过滤器:
- ConsumerContextFilter
- FutureFilter
- MonitorClusterFilter
- RouterSnapshotFilter
Dubbo27, Dubbo327, Dubbo3源码解析27