【Dubbo】Dubbo服务引用流程

当Provider将服务暴露之后,Consumer就可以通过注册中心进行引用,配置方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<dubbo:application name="demo-consumer"/>

<dubbo:registry address="zookeeper://127.0.0.1:2181"/>

<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>

</beans>

也可以使用注解@Reference:

1
2
3
4
5
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService {
@Reference // 2.7.0之后推荐使用@DubboReference
private DemoService demoService;
}

ReferenceBean

dubbo:reference对应ReferenceBean,ReferenceBean实现了FactoryBean和InitializingBean:

  1. ReferenceBean是一个FactoryBean,FactoryBean是Spring中的一个工厂bean,通过getObject得到bean对象。
  2. ReferenceBean实现了InitializingBean,同样是Spring中的一个类,通过afterPropertiesSet可以对bean进行初始化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
@Override
public Object getObject() {
// 获取bean对象,具体实现在ReferenceConfig中
return get();
}

@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
prepareDubboConfigBeans();
if (init == null) {
init = false;
}
if (shouldInit()) {
// 获取对象
getObject();
}
}
}

ReferenceConfig

ReferenceConfig是ReferenceBean的父类,获取bean的get方法中,对bean进行了判断,如果为空调用init方法进行初始化:

  1. 与Provider服务暴露流程一样,调用了 DubboBootstrap的init进行了初始化

  2. 调用createProxy创建代理对象:

    (1)判断是本地JVM引用还是远程引用

    (2)如果是远程引用,根据url判断是直连的方式还是从注册中心进行引用

    (3)会根据具体的协议调用PROTOCOL的refer方法生成Invoker对象

    (4)根据Invoker对象创建代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// 初始化
init();
}
return ref;
}

// 初始化
public synchronized void init() {
if (initialized) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
// DubboBootstrap进行初始化
bootstrap.init();
}

checkAndUpdateSubConfigs();

checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);

Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);

......

// 创建代理对象
ref = createProxy(map);

serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);

initialized = true;

// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}

// 创建代理对象
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 是否为本地JVM引用
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
// 如果url不为空,代表是直连的方式
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 如果url为空,代表从注册中心进行引用
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
...
}
}
// 如果只有一个url
if (urls.size() == 1) {
// 调用Protocol的refer方法,REF_PROTOCOL使用ExtensionLoader来加载具体的扩展类
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 如果有多个
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
//
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
//
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { //
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}

......

String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// 创建代理对象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

}

Invoker的创建

Protocol使用ExtensionLoader来加载具体的扩展类:

1
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

使用的是registry,因此将会进入RegistryProtocol的refer方法。

RegistryProtocol

doRefer方法中,会创建RegistryDirectory对象,并调用subscribe进行订阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
// 获取注册中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 服务引用
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 创建CONSUMER订阅URL
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 向注册中心订阅
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 创建Invoker对象
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
}

RegistryDirectory

RegistryDirectory继承了NotifyListener,是一个监听器。

RegistryDirectory中实现了subscribe方法,可以看到它是通过调用Registry的subscribe实现的,Registry是一个接口,我们使用的是zookeeper注册中心,所以接下来进入ZookeeperRegistry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

// Registry
private Registry registry;

public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 1.向注册中心订阅, this当前对象作为监听器对象传入,注意这里的this指的RegistryDirectory,因为是通过RegistryDirectory调用的
// 2.Registry是一个接口,有不同的实现类,接下来以ZookeeperRegistry为例,进入到ZookeeperRegistry的subscribe方法
registry.subscribe(url, this);
}
}

ZookeeperRegistry

FailbackRegistry

subscribe方法在它的父类FailbackRegistry中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 又调用了doSubscribe
doSubscribe(url, listener);
} catch (Exception e) {
......
}
}
}

在ZookeeperRegistry的doSubscribe方法中,会创建ZK客户端,从注册中心进行订阅,订阅之后会调用notify方法通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ZookeeperRegistry extends FailbackRegistry {
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
// 创建zk客户端
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
// 创建zk客户端
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知,在FailbackRegistry中实现
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

FailbackRegistry中的notify方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//org.apache.dubbo.registry.support.FailbackRegistry中的notify方法
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// 继续调用,在AbstractRegistry中实现
doNotify(url, listener, urls);
} catch (Exception t) {
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

// AbstractRegistry中的notify实现
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
......
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 调用了listener的notify,前面的流程可知,传入的是RegistryDirectory,所以接下来进入RegistryDirectory的notify方法
listener.notify(categoryList);
saveProperties(url);
}
}

由上可知,最终会调用NotifyListener的notify方法,前面的内容可知,NotifyListener传入的是RegistryDirectory对象,所以会进入到

RegistryDirectory的notify方法,在这个过程中比较关键的一步是toInvokers方法中将url转为Invoker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));

......
// 刷新Invoker
refreshOverrideAndInvoker(providerURLs);
}

private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
// 刷新Invoker
refreshInvoker(urls);
}

// 刷新Invoker
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true;
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
......
// 转为Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
......
}
}

// 转为Invoker
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
......
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
// 根据不同的协议调用对应的refer方法
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) {
// 将Invoker放入缓存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}

DubboProtocol

接下来以DubboProtocal为例,进入refer方法中,refer在DubboProtocol的父类AbstractProtocol中实现:

  1. 调用了getClients创建客户端连接
  2. 创建了DubboInvoker
1
2
3
4
5
6
7
8
public abstract class AbstractProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// 创建DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}
// 创建客户端连接
private ExchangeClient[] getClients(URL url) {

boolean useShareConnect = false;

int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
// 创建ExchangeClient
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);

} else {
// 初始化客户端
clients[i] = initClient(url);
}
}

return clients;
}

// 初始化
private ExchangeClient initClient(URL url) {

String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// 设置一些参数
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// 进行连接
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}

创建代理对象

在Invoker创建完毕之后,会通过PROXY_FACTORY创建代理对象:

1
2
3
4
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// 创建代理对象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

在dubbo中默认使用Javassist创建代理对象:

1
2
3
4
5
6
7
8
9
public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//创建InvokerInvocationHandler
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}

InvokerInvocationHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class InvokerInvocationHandler implements InvocationHandler {

public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
String serviceKey = invoker.getUrl().getServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
// invoke方法,调用接口的方法时会调用到invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 创建RpcInvocation
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);

if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}

return invoker.invoke(rpcInvocation).recreate();
}
}

参考

【月染霜华】Dubbo源码分析(六)服务引用的具体流程

【休息的风】dubbo 源码学习笔记 (三) —— dubbo引用服务的过程

【峡谷程序猿】Dubbo2.7.6启动原理之Consumer

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7