24 Dubbo应用级服务发现
24.1 简介
这里我们要看的是MigrationInvoker类型的refreshServiceDiscoveryInvoker方法:根据应用级服务发现,创建应用级的服务调用器,这里有很多逻辑和接口级服务发现类似,不过与接口级调用的invoker对象不同的是应用级的粒度会比较大一些这一步不会去注意去订阅各个服务接口,只会订阅服务提供者的应用。
默认情况下如果没有配置强制走接口级或者应用级的服务配置,接口级逻辑和应用级服务订阅都会走,这里我们可以直接来看代码吧:
MigrationInvoker类型的refreshServiceDiscoveryInvoker方法
24.2 刷新服务发现调用器Invoker
下面这个入口代码和接口级的Invoker对象创建类似,唯一不同的是接口级Invoker对象的创建调用的是registryProtocol对象(InterfaceCompatibleRegistryProtocol类型)的getInvoker方法,而这里调用了getServiceDiscoveryInvoker
MigrationInvoker类型的refreshServiceDiscoveryInvoker方法代码如下:
protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {
clearListener(serviceDiscoveryInvoker);
if (needRefresh(serviceDiscoveryInvoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
}
if (serviceDiscoveryInvoker != null) {
serviceDiscoveryInvoker.destroy();
}
//registryProtocol类型为:InterfaceCompatibleRegistryProtocol
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
}
setListener(serviceDiscoveryInvoker, () -> {
latch.countDown();
if (reportService.hasReporter()) {
reportService.reportConsumptionStatus(
reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app"));
}
if (step == APPLICATION_FIRST) {
calcPreferredInvoker(rule);
}
});
}
InterfaceCompatibleRegistryProtocol类型的getServiceDiscoveryInvoker方法
public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
//注册中心Registry类型对象获取 ,
//这里获取到的是ListenerRegistryWrapper 类型,其中包装了ServiceDiscoveryRegistry类型
registry = getRegistry(super.getRegistryUrl(url));
//服务发现注册目录对象创建 这里具体逻辑就不说了
DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
//开始创建invoker
return doCreateInvoker(directory, cluster, registry, type);
}
调用InterfaceCompatibleRegistryProtocol类型的父类型RegistryProtocol的doCreateInvoker方法:
下面这个代码接口级服务发现逻辑我中我们已经说过了代码是一样的RegistryProtocol的doCreateInvoker方法:
大部分逻辑是一样的 唯一有两个对象是不同的:
- 应用级:registry类型服务发现的类型ServiceDiscoveryRegistry 接口级为:ZookeeperRegistry类型
- 应用级:ServiceDiscoveryRegistryDirectory 接口级为:RegistryDirectory 类型
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY),
0,
getPath(parameters, type),
parameters
);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
//这一行逻辑会走到ServiceDiscoveryRegistry 不过应用级消费者是无需注册服务数据的
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
//发起订阅
directory.subscribe(toSubscribeUrl(urlToRegistry));
return (ClusterInvoker<T>) cluster.join(directory, true);
}
ServiceDiscoveryRegistry类型的subscribe方法:
public void subscribe(URL url) {
if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
enableConfigurationListen = true;
//为ConsumerConfigurationListener类型中的listeners列表添加监听器: 监听器类型为ServiceDiscoveryRegistryDirectory
getConsumerConfigurationListener(moduleModel).addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url);
} else {
enableConfigurationListen = false;
}
//调用父类类型DynamicDirectory的订阅方法subscribe 开始开启订阅逻辑 这个逻辑与接口级的逻辑是一样的
super.subscribe(url);
}
//调用父类类型DynamicDirectory的订阅方法subscribe 开始开启订阅逻辑 这个逻辑与接口级的逻辑是一样的
super.subscribe(url);
为了理解起来更直观我重复贴一下代码来重新看一遍:
DynamicDirectory的订阅方法subscribe方法如下:
public void subscribe(URL url) {
setSubscribeUrl(url);
//这里先走ListenerRegistryWrapper的subscribe逻辑再走包装的ServiceDiscoveryRegistry类型的逻辑
registry.subscribe(url, this);
}
接下来的subscribe会先走ListenerRegistryWrapper的subscribe逻辑再走包装的ServiceDiscoveryRegistry类型的逻辑
接下来直接看下核心的一些代码:
ListenerRegistryWrapper类型的subscribe方法
public void subscribe(URL url, NotifyListener listener) {
try {
if (registry != null) {
registry.subscribe(url, listener);
}
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener registryListener : listeners) {
if (registryListener != null) {
try {
registryListener.onSubscribe(url, registry);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
ServiceDiscoveryRegistry类型的subscribe方法:
public final void subscribe(URL url, NotifyListener listener) {
//前面是否注册shouldRegister为false这里是是否订阅shouldSubscribe方法结果为true
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
//执行订阅逻辑
doSubscribe(url, listener);
}
从这里开始要进入应用级服务订阅的路基了继续往下看
ServiceDiscoveryRegistry类型的doSubscribe方法:
public void doSubscribe(URL url, NotifyListener listener) {
url = addRegistryClusterKey(url);
//服务发现类型为ZookeeperServiceDiscovery
serviceDiscovery.subscribe(url, listener);
boolean check = url.getParameter(CHECK_KEY, false);
String key = ServiceNameMapping.buildMappingKey(url);
//应用级服务发现悲观锁先加上一把
Lock mappingLock = serviceNameMapping.getMappingLock(key);
try {
mappingLock.lock();
Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
try {
MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
//注意注意这行代码超级重要 当前是服务接口要找到服务的应用名字 将会查询映射信息对应节点:
///dubbo/mapping/link.elastic.dubbo.entity.DemoService
//这里最终获取到的应用服务提供者名字集合为dubbo-demo-api-provider
subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
} catch (Exception e) {
logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
}
if (CollectionUtils.isEmpty(subscribedServices)) {
logger.info("No interface-apps mapping found in local cache, stop subscribing, will automatically wait for mapping listener callback: " + url);
// if (check) {
// throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
// }
return;
}
//执行订阅url的逻辑
subscribeURLs(url, listener, subscribedServices);
} finally {
mappingLock.unlock();
}
}
接下来看
服务发现类型为ZookeeperServiceDiscovery的subscribe方法,不过这里封装在父类型里面了,调用其父类型AbstractServiceDiscovery的subscribe方法
先看父类型AbstractServiceDiscovery的subscribe方法
public void subscribe(URL url, NotifyListener listener) {
//这个metadataInfo类型为MetadataInfo
metadataInfo.addSubscribedURL(url);
}
MetadataInfo类型的addSubscribedURL方法:
public synchronized void addSubscribedURL(URL url) {
if (subscribedServiceURLs == null) {
subscribedServiceURLs = new ConcurrentSkipListMap<>();
}
//下面将其url添加到subscribedServiceURLs成员变量里面就结束了
addURL(subscribedServiceURLs, url);
}
前面服务发现的subscribe并没有做什么逻辑性质的操作仅仅是将url放到了成员变量里面,接下来继续看ServiceDiscoveryRegistry类型的doSubscribe方法:
通过服务接口信息查询应用名字 对应注册中心路径为:dubbo/mapping/link.elastic.dubbo.entity.DemoService
对应代码MetadataServiceNameMapping类型的getAndListen方法:
下面这个代码比较长,我们重点看一行代码 mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
String key = ServiceNameMapping.buildMappingKey(subscribedURL);
// use previously cached services.
Set<String> mappingServices = this.getCachedMapping(key);
// Asynchronously register listener in case previous cache does not exist or cache expired.
if (CollectionUtils.isEmpty(mappingServices)) {
try {
logger.info("Local cache mapping is empty");
//重点看这个同步调用获取注册中心的路径信息 call方法在父类型AbstractServiceNameMapping中
mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();
} catch (Exception e) {
// ignore
}
if (CollectionUtils.isEmpty(mappingServices)) {
String registryServices = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);
if (StringUtils.isNotEmpty(registryServices)) {
logger.info(subscribedURL.getServiceInterface() + " mapping to " + registryServices + " instructed by registry subscribed-services.");
mappingServices = parseServices(registryServices);
}
}
if (CollectionUtils.isNotEmpty(mappingServices)) {
this.putCachedMapping(key, mappingServices);
}
} else {
ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
}
return mappingServices;
}
接下来看MetadataServiceNameMapping类型的父类型AbstractServiceNameMapping的call方法
同样道理这里主要关注getAndListen方法即可
public Set<String> call() throws Exception {
synchronized (mappingListeners) {
Set<String> mappedServices = emptySet();
try {
//这个缓存的key与服务接口和分组有关这里我没配置分组那就只有接口了 key为link.elastic.dubbo.entity.DemoService
String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
if (listener != null) {
//这里获取到的应用名字为:dubbo-demo-api-provider
mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
listeners.add(listener);
if (CollectionUtils.isNotEmpty(mappedServices)) {
if (notifyAtFirstTime) {
// guarantee at-least-once notification no matter what kind of underlying meta server is used.
// listener notification will also cause updating of mapping cache.
listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
}
}
} else {
mappedServices = get(subscribedURL);
if (CollectionUtils.isNotEmpty(mappedServices)) {
AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
}
}
} catch (Exception e) {
logger.error("Failed getting mapping info from remote center. ", e);
}
return mappedServices;
}
}
}
然后看AbstractServiceNameMapping的getAndListen方法
public Set<String> getAndListen(URL url, MappingListener mappingListener) {
String serviceInterface = url.getServiceInterface();
// randomly pick one metadata report is ok for it's guaranteed all metadata report will have the same mapping data.
String registryCluster = getRegistryCluster(url);
MetadataReport metadataReport = metadataReportInstance.getMetadataReport(registryCluster);
if (metadataReport == null) {
return Collections.emptySet();
}
return metadataReport.getServiceAppMapping(serviceInterface, mappingListener, url);
}
ZookeeperMetadataReport类型的getServiceAppMapping方法:
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
String path = buildPathKey(DEFAULT_MAPPING_GROUP, serviceKey);
MappingDataListener mappingDataListener = casListenerMap.computeIfAbsent(path, _k -> {
MappingDataListener newMappingListener = new MappingDataListener(serviceKey, path);
zkClient.addDataListener(path, newMappingListener);
return newMappingListener;
});
mappingDataListener.addListener(listener);
//这个拼装后的路径为:/dubbo/mapping/link.elastic.dubbo.entity.DemoService
//这里获取到的应用名字为:dubbo-demo-api-provider
return getAppNames(zkClient.getContent(path));
}
有了应用名字开始订阅服务提供者
ServiceDiscoveryRegistry类型的subscribeURLs方法
代码如下所示:
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String protocolServiceKey = url.getProtocolServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
//再来一把url订阅的悲观锁
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
//这个应用名字为:dubbo-demo-api-provider
for (String serviceName : serviceNames) {
//这个代码调用的是curator 框架的方法 通过应用名字节点查询节点下面的所有服务提供者的应用信息待会截图看
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
//发现了存在服务提供者则触发监听器开始进行应用发现通知
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
查询到的应用信息:
DefaultServiceInstance{
serviceName='dubbo-demo-api-provider',
host='192.168.1.169', port=20880,
enabled=true, healthy=true,
metadata={
dubbo.endpoints=[{"port":20880,"protocol":"dubbo"}],
dubbo.metadata-service.url-params={"connections":"1","version":"1.0.0","dubbo":"2.0.2","release":"3.0.10","side":"provider","port":"20880","protocol":"dubbo"},
dubbo.metadata.revision=af365a420dba83941ddf2087b998a1d2,
dubbo.metadata.storage-type=local, timestamp=1661078418865}}
ServiceInstancesChangedListener类型的时间通知方法onEvent
public void onEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
doOnEvent(event);
}
继续看ServiceInstancesChangedListener类型的时间通知方法的doOnEvent
private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
//刷新内存数据
refreshInstance(event);
if (logger.isDebugEnabled()) {
logger.debug(event.getServiceInstances().toString());
}
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<String, Map<String, Set<String>>> localServiceToRevisions = new HashMap<>();
// grouping all instances of this app(service name) by revision
//刷新内存数据
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
String revision = getExportedServicesRevision(instance);
if (revision == null || EMPTY_REVISION.equals(revision)) {
if (logger.isDebugEnabled()) {
logger.debug("Find instance without valid service metadata: " + instance.getAddress());
}
continue;
}
List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
}
}
// get MetadataInfo with revision
//重点看这里
for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
String revision = entry.getKey();
List<ServiceInstance> subInstances = entry.getValue();
//这里对应ZookeeperServiceDiscovery类型
MetadataInfo metadata = serviceDiscovery.getRemoteMetadata(revision, subInstances);
//解析元数据 最终结果存在localServiceToRevisions变量中 key为协议 值为服务接口与服务元数据信息
parseMetadata(revision, metadata, localServiceToRevisions);
// update metadata into each instance, in case new instance created.
//为每个实例更新其元数据信息
for (ServiceInstance tmpInstance : subInstances) {
MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
tmpInstance.setServiceMetadata(metadata);
}
}
}
int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) {// retry every 10 seconds
hasEmptyMetadata = true;
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS);
logger.warn("Address refresh try task submitted");
}
// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
logger.error("Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");
return;
}
}
hasEmptyMetadata = false;
Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new HashMap<>();
Map<String, Object> newServiceUrls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : localServiceToRevisions.entrySet()) {
String protocol = entry.getKey();
entry.getValue().forEach((protocolServiceKey, revisions) -> {
Map<Set<String>, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> new HashMap<>());
Object urls = revisionsToUrls.get(revisions);
if (urls == null) {
urls = getServiceUrlsCache(revisionToInstances, revisions, protocol);
revisionsToUrls.put(revisions, urls);
}
newServiceUrls.put(protocolServiceKey, urls);
});
}
this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
}
ZookeeperServiceDiscovery类型的getRemoteMetadata方法
public MetadataInfo getRemoteMetadata(String revision, List<ServiceInstance> instances) {
MetadataInfo metadata = metaCacheManager.get(revision);
//缓存的元数据为空将从元数据中心远端获取
if (metadata != null && metadata != MetadataInfo.EMPTY) {
metadata.init();
// metadata loaded from cache
if (logger.isDebugEnabled()) {
logger.debug("MetadataInfo for revision=" + revision + ", " + metadata);
}
return metadata;
}
//这里将从元数据中心获取数据
synchronized (metaCacheManager) {
// try to load metadata from remote.
int triedTimes = 0;
//失败则重试3次
while (triedTimes < 3) {
//根据版本号查询元数据 发起一次RPC请求
metadata = MetadataUtils.getRemoteMetadata(revision, instances, metadataReport);
if (metadata != MetadataInfo.EMPTY) {// succeeded
//前面RPC请求元数据成功接下来开始初始化
metadata.init();
break;
} else {// failed
if (triedTimes > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Retry the " + triedTimes + " times to get metadata for revision=" + revision);
}
}
triedTimes++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
if (metadata == MetadataInfo.EMPTY) {
logger.error("Failed to get metadata for revision after 3 retries, revision=" + revision);
} else {
//缓存查询到的元数据到元数据缓存管理器中
metaCacheManager.put(revision, metadata);
}
}
return metadata;
}
MetadataUtils类型的getRemoteMetadata方法:
public static MetadataInfo getRemoteMetadata(String revision, List<ServiceInstance> instances, MetadataReport metadataReport) {
//随机轮训一台主机查询它的应用实例信息
ServiceInstance instance = selectInstance(instances);
//元数据提供者存储的位置默认为本地存储local 消费者从提供者那里拿,
//remote - Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取;
//local - Provider 把 metadata 放在本地,Consumer 从 Provider 处直接获取;
//这个配置可以看链接:https://dubbo.apache.org/zh/docs3-v2/java-sdk/reference-manual/config/properties/
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
MetadataInfo metadataInfo;
try {
if (logger.isDebugEnabled()) {
logger.debug("Instance " + instance.getAddress() + " is using metadata type " + metadataType);
}
//remote - Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取;
//local - Provider 把 metadata 放在本地,Consumer 从 Provider 处直接获取;
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
//这里走的是remote配置
metadataInfo = MetadataUtils.getMetadata(revision, instance, metadataReport);
} else {
// change the instance used to communicate to avoid all requests route to the same instance
ProxyHolder proxyHolder = null;
try {
//手动调用服务提供者内置的MetadataService Dubbo服务
proxyHolder = MetadataUtils.referProxy(instance);
//发起RPC调用 调用提供者的提供的元数据请求RPC接口
metadataInfo = proxyHolder.getProxy().getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
} finally {
MetadataUtils.destroyProxy(proxyHolder);
}
}
} catch (Exception e) {
logger.error("Failed to get app metadata for revision " + revision + " for type " + metadataType + " from instance " + instance.getAddress(), e);
metadataInfo = null;
}
if (metadataInfo == null) {
metadataInfo = MetadataInfo.EMPTY;
}
return metadataInfo;
}
MetadataUtils类型的referProxy
public static ProxyHolder referProxy(ServiceInstance instance) {
MetadataServiceURLBuilder builder;
ExtensionLoader<MetadataServiceURLBuilder> loader = instance.getApplicationModel()
.getExtensionLoader(MetadataServiceURLBuilder.class);
Map<String, String> metadata = instance.getMetadata();
// METADATA_SERVICE_URLS_PROPERTY_NAME is a unique key exists only on instances of spring-cloud-alibaba.
String dubboUrlsForJson = metadata.get(METADATA_SERVICE_URLS_PROPERTY_NAME);
//
if (metadata.isEmpty() || StringUtils.isEmpty(dubboUrlsForJson)) {
builder = loader.getExtension(StandardMetadataServiceURLBuilder.NAME);
} else {
builder = loader.getExtension(SpringCloudMetadataServiceURLBuilder.NAME);
}
//默认的builder类型为StandardMetadataServiceURLBuilder 将远数据对象转url配置
//例如:dubbo://192.168.1.169:20880/org.apache.dubbo.metadata.MetadataService?connections=1&corethreads=2&dubbo=2.0.2&group=dubbo-demo-api-provider&port=20880&protocol=dubbo&release=3.0.10&retries=0&side=provider&threadpool=cached&threads=100&timeout=5000&version=1.0.0
List<URL> urls = builder.build(instance);
if (CollectionUtils.isEmpty(urls)) {
throw new IllegalStateException("Introspection service discovery mode is enabled "
+ instance + ", but no metadata service can build from it.");
}
URL url = urls.get(0);
// Simply rely on the first metadata url, as stated in MetadataServiceURLBuilder.
ApplicationModel applicationModel = instance.getApplicationModel();
ModuleModel internalModel = applicationModel.getInternalModule();
ConsumerModel consumerModel = applicationModel.getInternalModule().registerInternalConsumer(MetadataService.class, url);
Protocol protocol = applicationModel.getExtensionLoader(Protocol.class).getAdaptiveExtension();
url.setServiceModel(consumerModel);
//!!!! 重点看这一行与普通的服务一样这里默认也是使用DubboProtocol来引用元数据服务
Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, url);
ProxyFactory proxyFactory = applicationModel.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
//为其将要调用的invoker生成对应代理对象
MetadataService metadataService = proxyFactory.getProxy(invoker);
consumerModel.getServiceMetadata().setTarget(metadataService);
consumerModel.getServiceMetadata().addAttribute(PROXY_CLASS_REF, metadataService);
consumerModel.setProxyObject(metadataService);
consumerModel.initMethodModels();
return new ProxyHolder(consumerModel, metadataService, internalModel);
}
metadata的init方法初始化从提供者那里获取到的元数据
public void init() {
if (!initiated.compareAndSet(false, true)) {
return;
}
if (CollectionUtils.isNotEmptyMap(services)) {
//遍历所有的服务信息然后初始化 ServiceInfo
services.forEach((_k, serviceInfo) -> {
serviceInfo.init();
// create duplicate serviceKey(without protocol)->serviceInfo mapping to support metadata search when protocol is not specified on consumer side.
if (subscribedServices == null) {
subscribedServices = new HashMap<>();
}
Set<ServiceInfo> serviceInfos = subscribedServices.computeIfAbsent(serviceInfo.getServiceKey(), _key -> new HashSet<>());
serviceInfos.add(serviceInfo);
});
}
}
ServiceInfo类型的init方法
protected void init() {
//初始化matchKey变量 格式为:service + group + version + protocol
buildMatchKey();
//初始化服务keyserviceKey 格式为:service + group + version
buildServiceKey(name, group, version);
// init method params
//初始化与方法匹配的参数
this.methodParams = URLParam.initMethodParameters(params);
// Actually, consumer params is empty after deserialized on the consumer side, so no need to initialize.
// Check how InstanceAddressURL operates on consumer url for more detail.
// this.consumerMethodParams = URLParam.initMethodParameters(consumerParams);
// no need to init numbers for it's only for cache purpose
}
ServiceDiscoveryRegistryDirectory接收订阅到的服务的通知方法:
public synchronized void notify(List<URL> instanceUrls) {
if (isDestroyed()) {
return;
}
// Set the context of the address notification thread.
RpcServiceContext.getServiceContext().setConsumerUrl(getConsumerUrl());
// 3.x added for extend URL address
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
instanceUrls = addressListener.notify(instanceUrls, getConsumerUrl(), this);
}
}
refreshOverrideAndInvoker(instanceUrls);
}
private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) {
// mock zookeeper://xxx?mock=return null
refreshInvoker(instanceUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null, use EMPTY url to clear current addresses.");
this.originalUrls = invokerUrls;
if (invokerUrls.size() == 1 && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
logger.warn("Received url with EMPTY protocol, will clear all available addresses.");
this.forbidden = true; // Forbid to access
routerChain.setInvokers(BitList.emptyList());
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow accessing
if (CollectionUtils.isEmpty(invokerUrls)) {
logger.warn("Received empty url list, will ignore for protection purpose.");
return;
}
// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
Map<String, Invoker<T>> oldUrlInvokerMap = null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
//主要这一行做一些协议的过滤与实例禁用数据的过滤得到最终结果需要的调用器
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
// pre-route and build cache
routerChain.setInvokers(this.getInvokers());
this.urlInvokerMap = newUrlInvokerMap;
if (oldUrlInvokerMap != null) {
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
// notify invokers refreshed
this.invokersChanged();
}
private Map<String, Invoker<T>> toInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
for (URL url : urls) {
InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
continue;
}
if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
" in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
instanceAddressURL.setProviderFirstParams(providerFirstParams);
// Override provider urls if needed
if (enableConfigurationListen) {
instanceAddressURL = overrideWithConfigurator(instanceAddressURL);
}
Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(instanceAddressURL.getAddress());
if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
try {
boolean enabled = true;
if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
} else {
enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = protocol.refer(serviceType, instanceAddressURL);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
}
} else {
newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
oldUrlInvokerMap.remove(instanceAddressURL.getAddress(), invoker);
}
}
return newUrlInvokerMap;
}
Dubbo27, Dubbo327, Dubbo3源码解析27