将思路拉回到RegistryProtocol的创建Invoker对象的doCreateInvoker代码

protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
       directory.setRegistry(registry);
       directory.setProtocol(protocol);
       // all attributes of REFER_KEY
       Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
       URL urlToRegistry = new ServiceConfigURL(
           parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
           parameters.remove(REGISTER_IP_KEY),
           0,
           getPath(parameters, type),
           parameters
       );
       urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
       urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
       if (directory.isShouldRegister()) {
           directory.setRegisteredConsumerUrl(urlToRegistry);
           registry.register(directory.getRegisteredConsumerUrl());
       }
       directory.buildRouterChain(urlToRegistry);
       directory.subscribe(toSubscribeUrl(urlToRegistry));

       return (ClusterInvoker<T>) cluster.join(directory, true);
   }

不论是接口级还是应用级注册都会调用代码

return (ClusterInvoker<T>) cluster.join(directory, true);

我们详细看下:
MockClusterWrapper类型的join方法

buildFilterChain参数为true

public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
      return new MockClusterInvoker<T>(directory,
              this.cluster.join(directory, buildFilterChain));
  }

默认的cluster类型为FailoverCluster
我们看FailoverCluster的join方法

FailoverCluster没有实现join方法需要先调用它的父类型AbstractCluster的join方法如下:

public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
     //带有过滤器将走上面这个逻辑
     if (buildFilterChain) {
         return buildClusterInterceptors(doJoin(directory));
     } else {
         return doJoin(directory);
     }
 }

//过滤器参数为true将走上面这个逻辑 先创建Invoker对象再创建过滤器

if (buildFilterChain) {
    return buildClusterInterceptors(doJoin(directory));

doJoin方法将有默认类AbstractCluster的doJoin抽象方法调用具体方法FailoverCluster的doJoin方法如下
FailoverCluster的doJoin方法

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
       return new FailoverClusterInvoker<>(directory);
   }

可以看到这里调用链路创建了一个失效转移的Invoker对象FailoverClusterInvoker

FailoverClusterInvoker的构造器

public FailoverClusterInvoker(Directory<T> directory) {
      super(directory);
  }

父调用器AbstractClusterInvoker的构造器

 public AbstractClusterInvoker(Directory<T> directory) {
        this(directory, directory.getUrl());
    
}

重载的构造器如下:


    public AbstractClusterInvoker(Directory<T> directory, URL url) {
        if (directory == null) {
            throw new IllegalArgumentException("service directory == null");
        }

        this.directory = directory;
        //sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
        this.availableCheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
        Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());
        this.reselectCount = configuration.getInt(RESELECT_COUNT, DEFAULT_RESELECT_COUNT);
        this.enableConnectivityValidation = configuration.getBoolean(ENABLE_CONNECTIVITY_VALIDATION, true);
    }
````


初始化过滤器链路的代码AbstractCluster类型的buildClusterInterceptors方法如下:
```java
  private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker) {
//        AbstractClusterInvoker<T> last = clusterInvoker;
        AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker));

        if (Boolean.parseBoolean(ConfigurationUtils.getProperty(clusterInvoker.getDirectory().getConsumerUrl().getScopeModel(), CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) {
            return build27xCompatibleClusterInterceptors(clusterInvoker, last);
        }
        return last;
    }

ClusterFilterInvoker构造器

public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
          //过滤器构造链DefaultFilterChainBuilder
          List<FilterChainBuilder> builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
          if (CollectionUtils.isEmpty(builders)) {
              filterInvoker = invoker;
          } else {
              ClusterInvoker<T> tmpInvoker = invoker;
              for (FilterChainBuilder builder : builders) {
                  tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
              }
              filterInvoker = tmpInvoker;
          }
      }
DefaultFilterChainBuilder类型的buildClusterInvokerChain构造过滤器链路

```java
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
    ClusterInvoker<T> last = originalInvoker;
    URL url = originalInvoker.getUrl();
    List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
    List<ClusterFilter> filters;
    if (moduleModels != null && moduleModels.size() == 1) {
        //通过扩展查询匹配的消费者过滤器列表这里可以查询4个
        filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
    } else if (moduleModels != null && moduleModels.size() > 1) {
        filters = new ArrayList<>();
        List<ExtensionDirector> directors = new ArrayList<>();
        for (ModuleModel moduleModel : moduleModels) {
            List<ClusterFilter> tempFilters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModel).getActivateExtension(url, key, group);
            filters.addAll(tempFilters);
            directors.add(moduleModel.getExtensionDirector());
        }
        filters = sortingAndDeduplication(filters, directors);

    } else {
        filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, null).getActivateExtension(url, key, group);
    }
    //过滤器不为空则拼接到调用链表之中
    if (!CollectionUtils.isEmpty(filters)) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final ClusterFilter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
        }
        return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
    }

    return last;
}

默认为4个过滤器:
- ConsumerContextFilter
- FutureFilter
- MonitorClusterFilter
- RouterSnapshotFilter

, ,