26-RPC调用过程
这个就要回到消费者的调用代码了主要代码如下:
下面是我写的消费者样例代码中的一块代码:
//我们这个DemoService是个接口实际的对象是由Dubbo内部通过动态代理的方式创建的一个对象类型为DemoServiceDubboProxy1
DemoService demoService = bootstrap.getCache().get(reference);
//消费者调用提供者的代码
String message = demoService.sayHello("dubbo");
DemoServiceDubboProxy1中的sayHello代理方法我们看不到这里直接看代理方法调用的方法:如下
InvokerInvocationHandler类型的invoke
参数:
- proxy DemoServiceDubboProxy1
- method sayHello方法对应的Java反射元数据Method
- args 这里args为参数 我们只有一个参数值为dubbo
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } //方法名字为sayHello String methodName = method.getName(); //参数类型只有一个为java.lang.String Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } //这个RpcInvocation比较重要主要是封装调用方法的一些元数据信息 RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args); if (serviceModel instanceof ConsumerModel) { rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel); rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method)); } //invoker为调用器我们继续看 return InvocationUtil.invoke(invoker, rpcInvocation); }
继续看方法 InvocationUtil.invoke(invoker, rpcInvocation);
InvocationUtil的invoke方法
public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
//url例子:consumer://192.168.1.169/link.elastic.dubbo.entity.DemoService?application=dubbo-demo-api-consumer&background=false&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=99368&qos.enable=false&qos.port=-1®ister.ip=192.168.1.169&release=3.0.10&side=consumer&sticky=false×tamp=1661595732778
URL url = invoker.getUrl();
//这里无分组值为:link.elastic.dubbo.entity.DemoService
String serviceKey = url.getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// invoker.getUrl() returns consumer url.
RpcServiceContext.getServiceContext().setConsumerUrl(url);
//默认前后开启性能分析
if (ProfilerSwitch.isEnableSimpleProfiler()) {
//创建一个InternalThreadLocal<ProfilerEntry> bizProfiler线程本地对象来存储性能信息
//首次进入这个性能实体为空
ProfilerEntry parentProfiler = Profiler.getBizProfiler();
ProfilerEntry bizProfiler;
//首次为空走下面逻辑创建个性能分析实体
//这里如果是第二个调用invoker方法则将性能数据串起来前面的放到parent ProfilerEntry 内部用链表结构实现一个性能链路
if (parentProfiler != null) {
bizProfiler = Profiler.enter(parentProfiler,
"Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
} else {
bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
}
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
//第一个invoker类型为MigrationInvoker
return invoker.invoke(rpcInvocation).recreate();
} finally {
Profiler.release(bizProfiler);
int timeout;
Object timeoutKey = rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
if (timeoutKey instanceof Integer) {
timeout = (Integer) timeoutKey;
} else {
timeout = url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
TIMEOUT_KEY,
DEFAULT_TIMEOUT);
}
long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
StringBuilder attachment = new StringBuilder();
rpcInvocation.foreachAttachment((entry) -> {
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
});
logger.warn(String.format(
"[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
rpcInvocation.getProtocolServiceKey(),
rpcInvocation.getMethodName(),
usage / 1000_000,
usage % 1000_000,
timeout,
attachment,
Profiler.buildDetail(bizProfiler)));
}
}
}
return invoker.invoke(rpcInvocation).recreate();
}
MigrationInvoker类型的
其实前面我们已经说过是如何决策应用级还是接口级的调用默认走应用级下面直接看应用级相关的invoker链路
public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
// fall back to interface mode
return invoker.invoke(invocation);
}
// check if invoker available for each time
return decideInvoker().invoke(invocation);
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
currentAvailableInvoker = decideInvoker();
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
}
接下来要走的Invoker逻辑是带有容错逻辑的MockClusterInvoker的invoker
MockClusterInvoker类型的invoker
public Result invoke(Invocation invocation) throws RpcException {
Result result;
//判断是否开启服务容错逻辑mock 默认是没有开启的
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith(FORCE_KEY)) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if (result.getException() != null && result.getException() instanceof RpcException) {
RpcException rpcException = (RpcException) result.getException();
if (rpcException.isBiz()) {
throw rpcException;
} else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
````
接下来要走的逻辑是默认类AbstractCluster的invoke方法
```java
public Result invoke(Invocation invocation) throws RpcException {
return filterInvoker.invoke(invocation);
}
接下来要走的逻辑是带有回调通知的链路:
CallbackRegistrationInvoker类型的invoke方法
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
FILTER filter = filters.get(i);
try {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} catch (Throwable filterThrowable) {
LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
}
throw filterThrowable;
}
}
});
return asyncResult;
}
接下来要走的是过滤器的链路:
CopyOfFilterChainNode类型的invoke方法
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, originalInvoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
} finally {
}
return asyncResult;
}
````
然后走第一个过滤器ConsumerContextFilter
```java
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
try {
RpcContext.getServiceContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0);
RpcContext context = RpcContext.getClientAttachment();
context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
if (CollectionUtils.isNotEmpty(supportedSelectors)) {
for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
Map<String, Object> selected = supportedSelector.select();
if (CollectionUtils.isNotEmptyMap(selected)) {
((RpcInvocation) invocation).addObjectAttachments(selected);
}
}
} else {
((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
}
Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// pass default timeout set by end user (ReferenceConfig)
Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
RpcContext.restoreServiceContext(originServiceContext);
}
}
继续走过滤器逻辑FutureFilter
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
return invoker.invoke(invocation);
}
过滤器MonitorFilter
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
// count up
getConcurrent(invoker, invocation).incrementAndGet();
}
// proceed invocation chain
return invoker.invoke(invocation);
}
RouterSnapshotFilter过滤器
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!switcher.isEnable()) {
return invoker.invoke(invocation);
}
if (!logger.isInfoEnabled()) {
return invoker.invoke(invocation);
}
if (!switcher.isEnable(invocation.getServiceModel().getServiceKey())) {
return invoker.invoke(invocation);
}
RpcContext.getServiceContext().setNeedPrintRouterSnapshot(true);
return invoker.invoke(invocation);
}
容错模版类AbstractClusterInvoker
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
// Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
// if (contextAttachments != null && contextAttachments.size() != 0) {
// ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
// }
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");
//从服务目录里面根据路由规则动态查询invoke服务提供者的调用器
List<Invoker<T>> invokers = list(invocation);
InvocationProfilerUtils.releaseDetailProfiler(invocation);
//获取负载均衡策略默认为随机RandomLoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");
try {
return doInvoke(invocation, invokers, loadbalance);
} finally {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
}
}
执行具有失效转移功能的FailoverClusterInvoker类型的doInvoke方法
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//重试次数计算默认为3
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
Result result = invokeWithContext(invoker, invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
success = true;
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
AbstractClusterInvoker类型的select
负载均衡算法执行:
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availableCheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
Invoker<T> tInvoker = invokers.get(0);
checkShouldInvalidateInvoker(tInvoker);
return tInvoker;
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
boolean isSelected = selected != null && selected.contains(invoker);
boolean isUnavailable = availableCheck && !invoker.isAvailable() && getUrl() != null;
if (isUnavailable) {
invalidateInvoker(invoker);
}
if (isSelected || isUnavailable) {
try {
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availableCheck);
if (rInvoker != null) {
invoker = rInvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
AbstractClusterInvoker类型的调用
invokeWithContext调用方法
ListenerInvokerWrapper的invoke方法
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
AbstractInvoker的invoke
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (isDestroyed()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// prepare rpc invocation 准备调用初始化一些变量
prepareInvocation(invocation);
// do invoke rpc invocation and return async result 执行调用
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
// wait rpc result if sync
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
AbstractClusterInvoker类型的doInvokeAndReturn
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
AsyncRpcResult asyncResult;
try {
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) {
Throwable te = e.getTargetException();
if (te != null) {
// if biz exception
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
} else {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
} catch (RpcException e) {
// if biz exception
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {
// set server context
RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
}
return asyncResult;
}
DubboInvoker类型的doInvoke方法
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.setAttachment(TIMEOUT_KEY, timeout);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//这里主要看默认的逻辑同步调用 默认的同步调用模式线程池为ThreadlessExecutor 无线程线程池
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
ReferenceCountExchangeClient的request
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return client.request(request, timeout, executor);
}
HeaderExchangeClient的request
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return channel.request(request, timeout, executor);
}
HeaderExchangeChannel的request方法
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
AbstractPeer类型的send
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
Dubbo netty4包下的NettyChannel的send
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
AbstractChannel的send
public void send(Object message, boolean sent) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this, "Failed to send message "
+ (message == null ? "" : message.getClass().getName()) + ":" + PayloadDropper.getRequestWithoutData(message)
+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
}
}
netty包下的AbstractChannel的writeAndFlush方法
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
DefaultChannelPipeline类型的writeAndFlush
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
AbstractChannelHandlerContext类型的writeAndFlush
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
AbstractChannelHandlerContext类型的write方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
//executor位NioEventLoop类型对象
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//生成WriteTask
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
//事件执行器执行任务WriteTask
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
private static boolean safeExecute(EventExecutor executor, Runnable runnable,
ChannelPromise promise, Object msg, boolean lazy) {
try {
if (lazy && executor instanceof AbstractEventExecutor) {
((AbstractEventExecutor) executor).lazyExecute(runnable);
} else {
executor.execute(runnable);
}
return true;
} catch (Throwable cause) {
try {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
} finally {
promise.setFailure(cause);
}
return false;
}
}
SingleThreadEventExecutor类型的execute
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
//当前线程是否处于事件循环之中
boolean inEventLoop = inEventLoop();
//将当前任务添加到SingleThreadEventExecutor类型的MpscUnboundedArrayQueue taskQueue队列中
addTask(task);
if (!inEventLoop) {
//是否需要开启线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
SingleThreadEventExecutor类型的wakeup
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
SelectedSelectionKeySetSelector类型的wakeup
public Selector wakeup() {
return delegate.wakeup();
}
NioEventLoop的wakeup
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
我用的mac电脑走的是KQueueSelectorImpl逻辑
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
try {
IOUtil.write1(fd1, (byte)0);
} catch (IOException ioe) {
throw new InternalError(ioe);
}
interruptTriggered = true;
}
}
return this;
}
Dubbo27, Dubbo327, Dubbo3源码解析27