【Dubbo】Dubbo服务暴露流程

DubboBootstrapApplicationListener

DubboBootstrapApplicationListener是一个监听器,可以监听Dubbo的启动或者关闭事件,如果是启动事件,会通过DubboBootstrap启动dubbo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered {

public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

private final DubboBootstrap dubboBootstrap;

public DubboBootstrapApplicationListener() {
this.dubboBootstrap = DubboBootstrap.getInstance();
}

@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
// 如果是启动事件
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) { // 如果是关闭事件
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 启动
dubboBootstrap.start();
}

private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
}

DubboBootstrap

DubboBootstrap是dubbo启动的核心,首先它会调用initialize方法进行一些初始化操作,然后调用exportServices进行服务暴露。

在exportServices中获取所有需要暴露的服务,进行遍历,对每一个服务进行暴露,Dubbo将每一个服务封装为ServiceConfig类型,具体的暴露过程在ServiceConfig的export方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class DubboBootstrap extends GenericEventListener {
/**
* Start the bootstrap
*/
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
ready.set(false);
// 初始化操作
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. 暴露DUBBO服务
exportServices();

// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}

referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
// 暴露服务
private void exportServices() {
// 遍历所有的service
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
// 是否异步暴露
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
// 服务暴露
sc.export();
exportedServices.add(sc);
}
});
}
}

ServiceConfig

在export中,首先会初始化Metadata元数据相关信息,然后调用doExport进行服务暴露,doExport又是调用doExportUrls完成的。

在doExportUrls中,会获取注册中心的地址,因为一个服务可以支持多种协议,所以会遍历每一个协议,对每一个协议的服务进行暴露,向注册中心注册,同时会将当前的服务加入ServiceRepository容器中,服务暴露具体的过程是在doExportUrlsFor1Protocol中完成的。

doExportUrlsFor1Protocol方法中,会判断协议是否为空,如果为空,默认使用dubbo协议,然后根据协议信息构建URL。根据不同的scope会选择不同的暴露方式,包括本地暴露和远程暴露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
public class ServiceConfig<T> extends ServiceConfigBase<T> {
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
// 初始化Metadata元数据相关信息
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 是否需要延迟暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 暴露服务
doExport();
}
exported();
}

protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 如果已经暴露过
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 服务暴露
doExportUrls();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
// 获取服务容器
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取注册中心地址
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍历所有的协议,因为一个服务可以支持多种协议
for (ProtocolConfig protocolConfig : protocols) {
// 构建KEY,存入ServiceRepository中使用
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// 注册服务,添加到ServiceRepository中
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
// 暴露服务
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

// 暴露服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 如果为空,默认为dubbo协议
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 省略了部分代码
......
// export service
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
// 构建url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 获取scope
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

// 如果scope不是REMOTE
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 本地方式暴露服务
exportLocal(url);
}
// 远程方式暴露服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历注册中心地址
for (URL registryURL : registryURLs) {
// 如果协议是injvm不进行注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 添加动态参数
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 添加监控配置
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 生成Invoker的包装类
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务暴露
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
}

本地暴露

本地暴露是将服务暴露在本地的JVM中,可以看到协议是injvm:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

远程暴露

首先会遍历注册中心地址,因为一个服务可以向多个注册中心注册,并且判断协议是否是injvm,如果是的话不进行注册。然后为url添加动态参数和监控相关的配置,接着通过代理工厂PROXY_FACTORY生成服务的Invoker代理类,并且对Invoker进行了一层包装,包装后的类型为DelegateProviderMetaDataInvoker,最后调用PROTOCOL的export进行服务远程暴露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

// 远程方式暴露服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历注册中心地址
for (URL registryURL : registryURLs) {
// 如果协议是injvm不进行注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 添加动态参数
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 添加监控配置
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
...
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 生成Invoker的包装类
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务暴露
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
...
}

ProxyFactory

PROXY_FACTORY是一个ProxyFactory类型的对象:

1
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory类中使用了SPI注解,由此可知,dubbo默认使用javassist生成代理对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SPI("javassist")
public interface ProxyFactory {

@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;

@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

Protocol

ServiceConfig中PROTOCOL的实例化如下,可以看到是通过ExtensionLoader进行实例化的,dubbo是支持多种协议的,那么如何根据配置为不同的协议生成不同的Protocol对象,dubbo就是通过ExtensionLoader实现的:

1
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

进入Protocol,可以看到使用@SPI注解,并且export方法中使用了@Adaptive注解,通过ExtensionLoader动态生成Protocol$Adaptive类,根据不同的类型决定使用哪种实现类,默认使用dubbo协议DubboProtocol:

1
2
3
4
5
@SPI("dubbo")
public interface Protocol {
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
}

DubboProtocol

dubbo协议的实现在DubboProtocol中,export方法中是具体的服务暴露过程:

  1. 将Invoker又包装为了DubboExporter,加入到exporterMap
  2. 创建服务,最终是通过Exchangers进行的,生成了ExchangeServer对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class DubboProtocol extends AbstractProtocol {
// 服务暴露
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// 构建servicekey
String key = serviceKey(url);
// 将invoker转为DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 加入到exporterMap
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}

}
}
// 打开server
openServer(url);
optimizeSerialization(url);

return exporter;
}

private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
// 根据URL判断是否为服务端
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建server
serverMap.put(key, createServer(url));
}
}
} else {
// 重置server
server.reset(url);
}
}
}
// 创建server
private ProtocolServer createServer(URL url) {
// 设置参数
url = URLBuilder.from(url)
// 设置readonly事件当服务关闭的时候
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 设置心跳
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
// 创建ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
// 将ExchangeServer包装为DubboProtocolServer
return new DubboProtocolServer(server);
}

}

Exchangers

进入到Exchangers的bind方法中,可以看到又是通过 ExtensionLoader来生成具体的扩展类的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取Exchanger对象
return getExchanger(url).bind(url, handler);
}

public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
// 根据类型获取Exchanger
return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
// 根据类型生成Exchanger
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}

Exchanger

Exchanger使用了SPI机制,默认使用HeaderExchanger:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SPI(HeaderExchanger.NAME)
public interface Exchanger {

/**
* bind.
*
* @param url
* @param handler
* @return message server
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

/**
* connect.
*
* @param url
* @param handler
* @return message channel
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

HeaderExchanger

进入到 ExtensionLoader的bind方法,调用了Transporters的bind方法进行ExchangeServer创建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HeaderExchanger implements Exchanger {

public static final String NAME = "header";

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建了HeaderExchangeServer,并且调用了Transporters的bind方法
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

}

Transporters

同样使用ExtensionLoader来生成扩展类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Transporters {
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
// 通过ExtensionLoader创建Transporter对象
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}

Transporter

Transporter使用SPI机制,默认为netty,所以底层默认使用netty网络通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SPI("netty")
public interface Transporter {

/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

参考

【峡谷程序猿】Dubbo2.7.6之服务暴露流程

【Mr_1214】dubbo-服务暴露过程之网络通信创建

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7