SHAN


  • Home

  • Archives

【Dubbo】Dubbo服务引用流程

Posted on 2021-08-28

当Provider将服务暴露之后,Consumer就可以通过注册中心进行引用,配置方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<dubbo:application name="demo-consumer"/>

<dubbo:registry address="zookeeper://127.0.0.1:2181"/>

<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>

</beans>

也可以使用注解@Reference:

1
2
3
4
5
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService {
@Reference // 2.7.0之后推荐使用@DubboReference
private DemoService demoService;
}

ReferenceBean

dubbo:reference对应ReferenceBean,ReferenceBean实现了FactoryBean和InitializingBean:

  1. ReferenceBean是一个FactoryBean,FactoryBean是Spring中的一个工厂bean,通过getObject得到bean对象。
  2. ReferenceBean实现了InitializingBean,同样是Spring中的一个类,通过afterPropertiesSet可以对bean进行初始化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
@Override
public Object getObject() {
// 获取bean对象,具体实现在ReferenceConfig中
return get();
}

@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
prepareDubboConfigBeans();
if (init == null) {
init = false;
}
if (shouldInit()) {
// 获取对象
getObject();
}
}
}

ReferenceConfig

ReferenceConfig是ReferenceBean的父类,获取bean的get方法中,对bean进行了判断,如果为空调用init方法进行初始化:

  1. 与Provider服务暴露流程一样,调用了 DubboBootstrap的init进行了初始化

  2. 调用createProxy创建代理对象:

    (1)判断是本地JVM引用还是远程引用

    (2)如果是远程引用,根据url判断是直连的方式还是从注册中心进行引用

    (3)会根据具体的协议调用PROTOCOL的refer方法生成Invoker对象

    (4)根据Invoker对象创建代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// 初始化
init();
}
return ref;
}

// 初始化
public synchronized void init() {
if (initialized) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
// DubboBootstrap进行初始化
bootstrap.init();
}

checkAndUpdateSubConfigs();

checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);

Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);

......

// 创建代理对象
ref = createProxy(map);

serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);

initialized = true;

// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}

// 创建代理对象
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 是否为本地JVM引用
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
// 如果url不为空,代表是直连的方式
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 如果url为空,代表从注册中心进行引用
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
...
}
}
// 如果只有一个url
if (urls.size() == 1) {
// 调用Protocol的refer方法,REF_PROTOCOL使用ExtensionLoader来加载具体的扩展类
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 如果有多个
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
//
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
//
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { //
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}

......

String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// 创建代理对象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

}

Invoker的创建

Protocol使用ExtensionLoader来加载具体的扩展类:

1
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

使用的是registry,因此将会进入RegistryProtocol的refer方法。

RegistryProtocol

doRefer方法中,会创建RegistryDirectory对象,并调用subscribe进行订阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
// 获取注册中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 服务引用
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 创建CONSUMER订阅URL
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 向注册中心订阅
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 创建Invoker对象
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
}

RegistryDirectory

RegistryDirectory继承了NotifyListener,是一个监听器。

RegistryDirectory中实现了subscribe方法,可以看到它是通过调用Registry的subscribe实现的,Registry是一个接口,我们使用的是zookeeper注册中心,所以接下来进入ZookeeperRegistry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

// Registry
private Registry registry;

public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 1.向注册中心订阅, this当前对象作为监听器对象传入,注意这里的this指的RegistryDirectory,因为是通过RegistryDirectory调用的
// 2.Registry是一个接口,有不同的实现类,接下来以ZookeeperRegistry为例,进入到ZookeeperRegistry的subscribe方法
registry.subscribe(url, this);
}
}

ZookeeperRegistry

FailbackRegistry

subscribe方法在它的父类FailbackRegistry中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 又调用了doSubscribe
doSubscribe(url, listener);
} catch (Exception e) {
......
}
}
}

在ZookeeperRegistry的doSubscribe方法中,会创建ZK客户端,从注册中心进行订阅,订阅之后会调用notify方法通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ZookeeperRegistry extends FailbackRegistry {
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
// 创建zk客户端
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
// 创建zk客户端
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知,在FailbackRegistry中实现
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

FailbackRegistry中的notify方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//org.apache.dubbo.registry.support.FailbackRegistry中的notify方法
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// 继续调用,在AbstractRegistry中实现
doNotify(url, listener, urls);
} catch (Exception t) {
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

// AbstractRegistry中的notify实现
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
......
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 调用了listener的notify,前面的流程可知,传入的是RegistryDirectory,所以接下来进入RegistryDirectory的notify方法
listener.notify(categoryList);
saveProperties(url);
}
}

由上可知,最终会调用NotifyListener的notify方法,前面的内容可知,NotifyListener传入的是RegistryDirectory对象,所以会进入到

RegistryDirectory的notify方法,在这个过程中比较关键的一步是toInvokers方法中将url转为Invoker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));

......
// 刷新Invoker
refreshOverrideAndInvoker(providerURLs);
}

private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
// 刷新Invoker
refreshInvoker(urls);
}

// 刷新Invoker
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true;
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
......
// 转为Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
......
}
}

// 转为Invoker
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
......
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
// 根据不同的协议调用对应的refer方法
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) {
// 将Invoker放入缓存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}

DubboProtocol

接下来以DubboProtocal为例,进入refer方法中,refer在DubboProtocol的父类AbstractProtocol中实现:

  1. 调用了getClients创建客户端连接
  2. 创建了DubboInvoker
1
2
3
4
5
6
7
8
public abstract class AbstractProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// 创建DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}
// 创建客户端连接
private ExchangeClient[] getClients(URL url) {

boolean useShareConnect = false;

int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
// 创建ExchangeClient
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);

} else {
// 初始化客户端
clients[i] = initClient(url);
}
}

return clients;
}

// 初始化
private ExchangeClient initClient(URL url) {

String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// 设置一些参数
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// 进行连接
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}

创建代理对象

在Invoker创建完毕之后,会通过PROXY_FACTORY创建代理对象:

1
2
3
4
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// 创建代理对象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

在dubbo中默认使用Javassist创建代理对象:

1
2
3
4
5
6
7
8
9
public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//创建InvokerInvocationHandler
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}

InvokerInvocationHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class InvokerInvocationHandler implements InvocationHandler {

public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
String serviceKey = invoker.getUrl().getServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
// invoke方法,调用接口的方法时会调用到invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 创建RpcInvocation
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);

if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}

return invoker.invoke(rpcInvocation).recreate();
}
}

参考

【月染霜华】Dubbo源码分析(六)服务引用的具体流程

【休息的风】dubbo 源码学习笔记 (三) —— dubbo引用服务的过程

【峡谷程序猿】Dubbo2.7.6启动原理之Consumer

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7

【Dubbo】Dubbo服务暴露流程

Posted on 2021-08-21

DubboBootstrapApplicationListener

DubboBootstrapApplicationListener是一个监听器,可以监听Dubbo的启动或者关闭事件,如果是启动事件,会通过DubboBootstrap启动dubbo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered {

public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

private final DubboBootstrap dubboBootstrap;

public DubboBootstrapApplicationListener() {
this.dubboBootstrap = DubboBootstrap.getInstance();
}

@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
// 如果是启动事件
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) { // 如果是关闭事件
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 启动
dubboBootstrap.start();
}

private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
}

DubboBootstrap

DubboBootstrap是dubbo启动的核心,首先它会调用initialize方法进行一些初始化操作,然后调用exportServices进行服务暴露。

在exportServices中获取所有需要暴露的服务,进行遍历,对每一个服务进行暴露,Dubbo将每一个服务封装为ServiceConfig类型,具体的暴露过程在ServiceConfig的export方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class DubboBootstrap extends GenericEventListener {
/**
* Start the bootstrap
*/
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
ready.set(false);
// 初始化操作
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. 暴露DUBBO服务
exportServices();

// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}

referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
// 暴露服务
private void exportServices() {
// 遍历所有的service
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
// 是否异步暴露
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
// 服务暴露
sc.export();
exportedServices.add(sc);
}
});
}
}

ServiceConfig

在export中,首先会初始化Metadata元数据相关信息,然后调用doExport进行服务暴露,doExport又是调用doExportUrls完成的。

在doExportUrls中,会获取注册中心的地址,因为一个服务可以支持多种协议,所以会遍历每一个协议,对每一个协议的服务进行暴露,向注册中心注册,同时会将当前的服务加入ServiceRepository容器中,服务暴露具体的过程是在doExportUrlsFor1Protocol中完成的。

doExportUrlsFor1Protocol方法中,会判断协议是否为空,如果为空,默认使用dubbo协议,然后根据协议信息构建URL。根据不同的scope会选择不同的暴露方式,包括本地暴露和远程暴露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
public class ServiceConfig<T> extends ServiceConfigBase<T> {
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
// 初始化Metadata元数据相关信息
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 是否需要延迟暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 暴露服务
doExport();
}
exported();
}

protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 如果已经暴露过
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 服务暴露
doExportUrls();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
// 获取服务容器
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取注册中心地址
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍历所有的协议,因为一个服务可以支持多种协议
for (ProtocolConfig protocolConfig : protocols) {
// 构建KEY,存入ServiceRepository中使用
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// 注册服务,添加到ServiceRepository中
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
// 暴露服务
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

// 暴露服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 如果为空,默认为dubbo协议
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 省略了部分代码
......
// export service
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
// 构建url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 获取scope
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

// 如果scope不是REMOTE
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 本地方式暴露服务
exportLocal(url);
}
// 远程方式暴露服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历注册中心地址
for (URL registryURL : registryURLs) {
// 如果协议是injvm不进行注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 添加动态参数
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 添加监控配置
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 生成Invoker的包装类
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务暴露
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
}

本地暴露

本地暴露是将服务暴露在本地的JVM中,可以看到协议是injvm:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

远程暴露

首先会遍历注册中心地址,因为一个服务可以向多个注册中心注册,并且判断协议是否是injvm,如果是的话不进行注册。然后为url添加动态参数和监控相关的配置,接着通过代理工厂PROXY_FACTORY生成服务的Invoker代理类,并且对Invoker进行了一层包装,包装后的类型为DelegateProviderMetaDataInvoker,最后调用PROTOCOL的export进行服务远程暴露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

// 远程方式暴露服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历注册中心地址
for (URL registryURL : registryURLs) {
// 如果协议是injvm不进行注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 添加动态参数
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 添加监控配置
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
...
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 生成Invoker的包装类
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务暴露
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
...
}

ProxyFactory

PROXY_FACTORY是一个ProxyFactory类型的对象:

1
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory类中使用了SPI注解,由此可知,dubbo默认使用javassist生成代理对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SPI("javassist")
public interface ProxyFactory {

@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;

@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

Protocol

ServiceConfig中PROTOCOL的实例化如下,可以看到是通过ExtensionLoader进行实例化的,dubbo是支持多种协议的,那么如何根据配置为不同的协议生成不同的Protocol对象,dubbo就是通过ExtensionLoader实现的:

1
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

进入Protocol,可以看到使用@SPI注解,并且export方法中使用了@Adaptive注解,通过ExtensionLoader动态生成Protocol$Adaptive类,根据不同的类型决定使用哪种实现类,默认使用dubbo协议DubboProtocol:

1
2
3
4
5
@SPI("dubbo")
public interface Protocol {
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
}

DubboProtocol

dubbo协议的实现在DubboProtocol中,export方法中是具体的服务暴露过程:

  1. 将Invoker又包装为了DubboExporter,加入到exporterMap
  2. 创建服务,最终是通过Exchangers进行的,生成了ExchangeServer对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class DubboProtocol extends AbstractProtocol {
// 服务暴露
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// 构建servicekey
String key = serviceKey(url);
// 将invoker转为DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 加入到exporterMap
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}

}
}
// 打开server
openServer(url);
optimizeSerialization(url);

return exporter;
}

private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
// 根据URL判断是否为服务端
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建server
serverMap.put(key, createServer(url));
}
}
} else {
// 重置server
server.reset(url);
}
}
}
// 创建server
private ProtocolServer createServer(URL url) {
// 设置参数
url = URLBuilder.from(url)
// 设置readonly事件当服务关闭的时候
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 设置心跳
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
// 创建ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
// 将ExchangeServer包装为DubboProtocolServer
return new DubboProtocolServer(server);
}

}

Exchangers

进入到Exchangers的bind方法中,可以看到又是通过 ExtensionLoader来生成具体的扩展类的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取Exchanger对象
return getExchanger(url).bind(url, handler);
}

public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
// 根据类型获取Exchanger
return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
// 根据类型生成Exchanger
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}

Exchanger

Exchanger使用了SPI机制,默认使用HeaderExchanger:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SPI(HeaderExchanger.NAME)
public interface Exchanger {

/**
* bind.
*
* @param url
* @param handler
* @return message server
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

/**
* connect.
*
* @param url
* @param handler
* @return message channel
*/
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

HeaderExchanger

进入到 ExtensionLoader的bind方法,调用了Transporters的bind方法进行ExchangeServer创建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HeaderExchanger implements Exchanger {

public static final String NAME = "header";

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建了HeaderExchangeServer,并且调用了Transporters的bind方法
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

}

Transporters

同样使用ExtensionLoader来生成扩展类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Transporters {
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
// 通过ExtensionLoader创建Transporter对象
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}

Transporter

Transporter使用SPI机制,默认为netty,所以底层默认使用netty网络通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SPI("netty")
public interface Transporter {

/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

参考

【峡谷程序猿】Dubbo2.7.6之服务暴露流程

【Mr_1214】dubbo-服务暴露过程之网络通信创建

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7

【Redis】字典

Posted on 2021-08-01

Redis 字典

基本语法

字典是Redis中的一种数据结构,底层使用哈希表实现,一个哈希表中可以存储多个键值对,它的语法如下,其中KEY为键,field和value为值(也是一个键值对):

1
HSET key field value

根据Key和field获取value:

1
HGET key field

哈希表

数据结构

dictht

dictht是哈希表的数据结构定义:

  • table:哈希表数组,数组中的元素是dictEntry类型的
  • size:哈希表数组的大小
  • sizemask:哈希表大小掩码,一般等于size-1
  • used:已有节点的数量(存储键值对的数量)
1
2
3
4
5
6
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;

dictEntry

dictEntry是哈希表节点的结构定义:

  • key:键值对中的键
  • v:键值对中的值
  • next:由于会出现哈希冲突,所以next是指向下一个节点的指针
1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {
void *key; // 键
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; // 值
struct dictEntry *next; // 指向下一个节点的指针
} dictEntry;

dict

dict是Redis中字典的结构定义:

  • type:指向dictType的指针
  • privdata
  • ht[2]:一个dictht类型的数组,数组大小为2,保存了两个哈希表,rehash时使用
  • rehashidx:记录了当前rehash的进度
  • pauserehash:rehash暂停标记,大于0表示没有进行rehash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct dict {
dictType *type; //
void *privdata; // 私有数据
dictht ht[2]; // 保存了两个哈希表
long rehashidx; // rehash的进度标记
int16_t pauserehash;
} dict;

typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
} dictType;

哈希冲突

一个键值对放入哈希表的时候,会根据key的值,计算一个hash值,然后根据hash值与哈希表大小掩码做与运算得到一个索引值,索引值决定元素放入哪个哈希桶中(落入哈希表数组哪个索引位置处)。

1
2
3
4
// 计算hash值
hash = dictHashKey(d,key)
// 计算索引
idx = hash & d->ht[table].sizemask;

在进行哈希计算的时候,不可避免会出现哈希冲突,出现哈希冲突的时候,Redis采用链式哈希解决冲突,也就是落入同一个桶中的元素,使用链表将这些冲突的元素链起来(dictEntry中的next指针)。

rehash

由于Redis采用链式哈希解决冲突,那么在冲突频繁的场景下,链表会变得越来越长,这种情况下查找效率是比较低下的,需要遍历链表对比KEY的值来获取数据,为了处理效率低下的问题,需要对哈希表进行扩容,扩容的过程称为rehash。

在dict结构替中ht保存了两个哈希表,ht[0]用于数据正常的增删改查,ht[1]用于rehash:

(1)正常情况下,所有的增删改查操作都在ht[0]中进行;

(2)需要进行rehash时,会使用ht[1]建立新的哈希表,并将ht[0]中的数据迁移到ht[1]中;

(3)迁移完成后,ht[0]的空间被释放,然后将ht[1]地址赋给ht[0],ht[1]的大小被设为0,ht[0]重新接收正常的请求,回到了第(1)步的状态;

rehash的触发条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* 判断是否需要扩容 */
static int _dictExpandIfNeeded(dict *d)
{
/* 如果已经处于rehash状态中直接返回 */
if (dictIsRehashing(d)) return DICT_OK;

/* 如果ht[0]的大小为0,意味着哈希表为空,此时做初始化操作 */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/*如果已经存储的节点数量大于或等于哈希表数组的大小,并且跨域扩容或者(节点数量/哈希表数组大小)大于一个比例,同时根据字典的类型判断是否允许分配内存*/
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
// 进行扩容
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}

/* 由于扩容需要分配内存,这里检查字典类型分配是否被允许*/
static int dictTypeExpandAllowed(dict *d) {
if (d->type->expandAllowed == NULL) return 1;
return d->type->expandAllowed(
_dictNextPower(d->ht[0].used + 1) * sizeof(dictEntry*),
(double)d->ht[0].used / d->ht[0].size);
}

d->ht[0].used/d->ht[0].size : 节点数量与哈希表数组大小的比例,称作负载因子。

dict_force_resize_ratio 的默认值是 5。

  1. ht[0]的大小为0,此时哈希表是空的,相当于对哈希表做一个初始化的操作。
  2. 如果哈希表中存储的节点数量大于或者等于哈希表数组的大小,并且哈希表可以扩容或者负载因子大于dict_force_resize_ratio(默认值为5),根据字典的类型判断允许分配内存,满足这三个条件开始扩容。

dict_can_resize

dict_can_resize用来判断哈希表是否可以扩容,有两种状态,值分别为1和0,1代表可以扩容,0代表禁用扩容:

1
2
3
4
5
6
7
void dictEnableResize(void) {
dict_can_resize = 1;
}

void dictDisableResize(void) {
dict_can_resize = 0;
}

updateDictResizePolicy中对dict_can_resize的状态进行了控制,当前没有RDB子进程并且也没有AOF子进程时设置dict_can_resize状态为可扩容:

1
2
3
4
5
6
7
8

void updateDictResizePolicy(void) {
// 没有RDB子进程并且也没有AOF子进程
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
dictEnableResize(); // 启用扩容
else
dictDisableResize(); // 禁用扩容
}
扩容大小

从代码中可以看到,扩容后哈希表数组的大小为已经存储的节点数量+1:

1
2
// 进行扩容
return dictExpand(d, d->ht[0].used + 1);

一些旧版本中扩容后的大小为已存储节点数量的2倍:

1
dictExpand(d, d->ht[0].used*2);

渐进式hash

当哈希表存储节点内容比较多时,需要将原来的节点一个一个拷贝到新的哈希表中,此时Redis主线程无法执行其他请求,造成阻塞,影响性能,为了解决这个问题,引入了渐进式hash。

渐进式hash并不会一次把旧节点全部拷贝到新的哈希表中,而是分多次渐进式的完成拷贝,其中rehashidx记录了迁移进度,每一次迁移的过程中会更新rehashidx的值,下一次进行数据迁移的时候,从rehashidx的位置开始迁移,在dictRehash中可以看到迁移的处理:

  1. 方法传入了一个参数n,代表本次需要迁移几个哈希桶
  2. 根据需要迁移哈希桶的数量,循环处理每一个哈希桶:
    • 如果当前哈希桶中为空,继续下一个桶的处理rehashidx++
    • 如果当前哈希桶不为空,将当前桶中的所有节点迁移到新的哈希表中,然后更新rehashidx的值继续处理下一个桶
  3. 如果已经处理够了n个桶,或者哈希表的所有数据已经迁移完毕,则结束迁移。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
// 循环处理每一个哈希桶,n为需要迁移哈希桶的数量
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// 如果当前哈希桶没有存储数据
while(d->ht[0].table[d->rehashidx] == NULL) {
// rehashidx的值是哈希表数组的某个索引值(指向了某个哈希桶),意味着当前迁移到数组的哪个索引位置处
d->rehashidx++; // 继续下一个桶
if (--empty_visits == 0) return 1;
}

de = d->ht[0].table[d->rehashidx];
// 如果当前的哈希桶中存储着数据,将哈希桶存储的所有数据迁移到新的哈希表中
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
// rehashidx,继续迁移下一个哈希桶
d->rehashidx++;
}

/* 判断ht[0]的节点是否迁移完成 */
if (d->ht[0].used == 0) {
// 释放ht[0]的空间
zfree(d->ht[0].table);
// 将ht[0]指向ht[1]
d->ht[0] = d->ht[1];
// 重置ht[1]的大小为0
_dictReset(&d->ht[1]);
// 设置rehashidx,-1代表rehash结束
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

_dictRehashStep

_dictRehashStep中可以看到调用dictRehash时,每次迁移哈希桶的数量为1:

1
2
3
static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}

总结

  1. Redis字典底层使用哈希表实现。

  2. 键值对放入哈希表的时候,会根据key的值,计算hash值,出现哈希冲突的时候,Redis采用链式哈希解决冲突,使用链表将这些冲突的元素链起来。

  3. 由于Redis采用链式哈希解决冲突,那么在冲突频繁的场景下,链表会变得越来越长,这种情况下查找效率是比较低下的,需要遍历链表对比KEY的值来获取数据,为了处理效率低下的问题,需要对哈希表进行扩容,扩容的过程称为rehash。

  4. 当哈希表存储节点内容比较多时,进行rehas的时候主线程无法执行其他请求,造成阻塞,影响性能,所以采用了渐进式hash,渐进式hash并不会一次把旧节点全部拷贝到新的哈希表中,而是分多次渐进式的完成拷贝。

参考

黄健宏《Redis设计与实现》

极客时间 - Redis源码剖析与实战(蒋德钧)

美团针对Redis Rehash机制的探索和实践

Redis版本:redis-6.2.5

【Redis】简单动态字符串SDS

Posted on 2021-07-31

C语言字符串

1
2
char *str = "redis"; // 可以不显式的添加\0,由编译器添加
char *str = "redis\0"; // 也可以添加\0代表字符串结束

C语言中使用char*字符数组表示字符串,’\0’来标记一个字符串的结束,不过在使用的过程中我们不需要显式的在字符串中加入’\0’。

存在问题

1.二进制安全

C语言以’\0’标记字符串的结尾,如果一个字符串本身带有’\0’,比如一些二进制数据,那么字符串就会被截断,导致无法存储二进制数据。

2.缓冲区溢出

假设内存有两个相邻的字符串s1和s2,s1保存了字符串“Redis”,s2中保存了字符串“MongoDB”,如果不对大小进行判断,直接调用strcat(s1, “Cluster”)函数在s1字符串后面追加“Cluster“,s1的数据溢出到s2所在空间,导致s2的内容被意外的修改。

3.频繁的内存分配

因为C字符串不记录自身长度,如果对字符串进行修改,需要内存重分配扩展底层数组的空间大小,比如调用strcat函数进行拼接时,需要重新分配内存,保证数组有足够的空间,否则就会发生缓冲区溢出。

由于内存重分配涉及复杂算法,并且可能需要执行系统调用,所以是一个耗时的操作。

4.获取字符串长度复杂度为O(N)

C字符串不记录自身长度,需要遍历每个字符计算字符串长度,在高并发情况下有可能称为性能的瓶颈。

参考:黄健宏《Redis设计与实现》

Redis String

String字符串是Redis中的一种数据结构,它的语法如下:

1
SET key value

key和value都是字符串,一个key对应一个value,通过key可以获取到对应的value:

1
GET KEY

简单动态字符串

Redis的字符串没有直接使用C语言的字符数组实现,而是通过SDS(Simple Dynamic String)简单动态字符串实现的。

SDS数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
typedef char *sds;

/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

Redis为了灵活的保存不同大小的字符串节省内存空间,设计了不同的结构头sdshdr64、sdshdr32、sdshdr16、sdshdr8和sdshdr5。

虽然结构头不同,但是他们都具有相同的属性(sdshdr5除外):

  • len:字符数组buf实际使用的大小,也就是字符串的长度
  • alloc:字符数组buf分配的空间大小
  • flags:标记SDS的类型:sdshdr64、sdshdr32、sdshdr16、sdshdr8和sdshdr5
  • buf[]:字符数组,用来存储实际的字符数据

一些优化

Redis使用了__attribute__ (( __packed__))节省内存空间,它可以告诉编译器使用紧凑的方式分配内存,不使用字节对齐的方式给变量分配内存。

关于字节对齐可参考结构体字节对齐,C语言结构体字节对齐详解。

柔性数组

在结构体定义中,可以看到最后一个buf数组是没有设置大小的,这种放在结构体中最后一个元素位置并且没有设置大小的数组称为柔性数组,它可以在程序运行过程中动态的进行内存分配。

SDS创建

(1)sds在创建的时候,buf数组初始大小为:struct结构体大小 + 字符串的长度+1, +1是为了在字符串末尾添加一个\0。

(2)在完成字符串到字符数组的拷贝之后,会在字符串末尾加一个\0,这样可以复用C语言的一些函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) {
void *sh;
sds s;
// 根据长度计算sds类型
char type = sdsReqType(initlen);
if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
// 获取结构体大小
int hdrlen = sdsHdrSize(type);
unsigned char *fp; /* flags pointer. */
size_t usable;

assert(initlen + hdrlen + 1 > initlen); /* Catch size_t overflow */
// 分配内存空间,初始大小为:struct结构体大小+字符串的长度+1,+1是为了在字符串末尾添加一个\0
sh = trymalloc?
s_trymalloc_usable(hdrlen+initlen+1, &usable) :
s_malloc_usable(hdrlen+initlen+1, &usable);
// 如果分配失败
if (sh == NULL) return NULL;
if (init==SDS_NOINIT)
init = NULL;
else if (!init)
memset(sh, 0, hdrlen+initlen+1);
// 指向buf数组的指针
s = (char*)sh+hdrlen;
fp = ((unsigned char*)s)-1;
usable = usable-hdrlen-1;
if (usable > sdsTypeMaxSize(type))
usable = sdsTypeMaxSize(type);
// 类型选择
switch(type) {
case SDS_TYPE_5: {
*fp = type | (initlen << SDS_TYPE_BITS);
break;
}
case SDS_TYPE_8: {
SDS_HDR_VAR(8,s);
sh->len = initlen;
sh->alloc = usable;
*fp = type;
break;
}
case SDS_TYPE_16: {
SDS_HDR_VAR(16,s);
sh->len = initlen;
sh->alloc = usable;
*fp = type;
break;
}
case SDS_TYPE_32: {
SDS_HDR_VAR(32,s);
sh->len = initlen;
sh->alloc = usable;
*fp = type;
break;
}
case SDS_TYPE_64: {
SDS_HDR_VAR(64,s);
sh->len = initlen; // 设置字符串长度
sh->alloc = usable; // 设置分配的总空间大小
*fp = type; // 设置sds类型
break;
}
}
if (initlen && init)
memcpy(s, init, initlen); // 将字符串拷贝到buf数组
// 字符串末尾添加一个\0
s[initlen] = '\0';
return s;
}

// 获取结构体大小
static inline int sdsHdrSize(char type) {
switch(type&SDS_TYPE_MASK) {
case SDS_TYPE_5:
return sizeof(struct sdshdr5);
case SDS_TYPE_8:
return sizeof(struct sdshdr8);
case SDS_TYPE_16:
return sizeof(struct sdshdr16);
case SDS_TYPE_32:
return sizeof(struct sdshdr32);
case SDS_TYPE_64:
return sizeof(struct sdshdr64);
}
return 0;
}

SDS扩容

(1)判断剩余可用空间是否大于需要增加的长度,如果大于说明空间足够,直接返回即可,反之计算新的长度,进行扩容;

(2)空间预分配,扩容新的长度为已经使用的长度+需要增加的长度,然后会判断是否小于SDS_MAX_PREALLOC(1M):

  • 如果小于,直接扩容为新长度的2倍
  • 如果大于,扩容容量为新长度+SDS_MAX_PREALLOC的值

(3)由于扩容之后大小发生了改变,需要重新计算使用哪种SDS类型:

  • 如果类型不需要改变,直接扩容即可
  • 如果类型发生改变,需要重新进行内存分配,并将旧的内存释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
sds sdsMakeRoomFor(sds s, size_t addlen) {
void *sh, *newsh;
// 获取剩余可用空间大小
size_t avail = sdsavail(s);
size_t len, newlen;
char type, oldtype = s[-1] & SDS_TYPE_MASK;
int hdrlen;
size_t usable;

// 如果可用空间大于需要增加的长度,返回即可
if (avail >= addlen) return s;
// 获取实际使用的长度
len = sdslen(s);
sh = (char*)s-sdsHdrSize(oldtype);
// 计算新的长度,已经使用的长度+需要增加的长度
newlen = (len+addlen);
assert(newlen > len); /* Catch size_t overflow */
// 如果新的长度小于SDS_MAX_PREALLOC
if (newlen < SDS_MAX_PREALLOC)
// 扩容为新长度的2倍
newlen *= 2;
else
// 新长度 = 新长度+SDS_MAX_PREALLOC
newlen += SDS_MAX_PREALLOC;
// 根据新的长度计算需要使用sds的类型
type = sdsReqType(newlen);
if (type == SDS_TYPE_5) type = SDS_TYPE_8;
// 获取struct大小
hdrlen = sdsHdrSize(type);
assert(hdrlen + newlen + 1 > len); /* Catch size_t overflow */
// 如果sds类型不需要改变
if (oldtype==type) {
// 扩容
newsh = s_realloc_usable(sh, hdrlen+newlen+1, &usable);
if (newsh == NULL) return NULL;
s = (char*)newsh+hdrlen;
} else {
// 如果需要改变sds类型,重新分配空间
newsh = s_malloc_usable(hdrlen+newlen+1, &usable);
if (newsh == NULL) return NULL;
// 拷贝字符串到字符数组
memcpy((char*)newsh+hdrlen, s, len+1);
// 释放旧的空间
s_free(sh);
s = (char*)newsh+hdrlen;
s[-1] = type;
sdssetlen(s, len);
}
usable = usable-hdrlen-1;
if (usable > sdsTypeMaxSize(type))
usable = sdsTypeMaxSize(type);
sdssetalloc(s, usable);
return s;
}

惰性空间释放

当字符串缩短后,程序并不会立刻释放多余的空间,之后可直接复用多余的空间。

Redis SDS总结

  1. 直接保存了字符串的长度,不需要遍历每个字符去计算长度。
  2. 使用了字符数组保存数据,并且记录了字符串长度,通过长度来判断字符串的结束而不是\0,可以保存二进制数据。
  3. 需要改变字符串长度大小时,会通过sdsMakeRoomFor方法确保有足够的内存空间,不需要开发人员自行判断,保证了数据的安全性,不会造成缓冲区溢出。
  4. 在进行扩容时,会进行空间预分配,多分配一些空间,减小内存分配的次数。
  5. 创建了不同的结构体,保存不同大小的字符串节省内存空间。
  6. 使用__attribute__ (( __packed__))紧凑的方式分配内存,节省内存空间。

参考

极客时间 - Redis源码剖析与实战(蒋德钧)

yellowriver007 - Redis内部数据结构详解(2)——sds

偷懒的程序员-小彭 - redis 系列,要懂redis,首先得看懂sds(全网最细节的sds讲解)

CHENG Jian - C语言0长度数组(可变数组/柔性数组)详解

Redis版本:redis-6.2.5

【Netty】Pipeline

Posted on 2021-06-06

Pipeline的初始化

ChannelPipeline是一个双向链表,在Channel构建的过程中,创建了ChannelPipeline,初始化了头结点head和尾节点tail:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 创建pipeline
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}

// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 尾节点
tail = new TailContext(this);
// 头结点
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}
}
HeadContext

HeadContext是头结点,它既实现了ChannelOutboundHandler又实现了ChannelInboundHandler,它比TailContext多了一个Unsafe类型的变量:

1
2
3
4
5
6
7
8
9
10
11
// HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// 比tailContext多了一个Unsafe类型的变量
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}

HeadContext继承关系:

TailContext

TailContext是尾节点,它实现了ChannelInboundHandler,InboundHandler是入站处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    // TailContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 构造函数初始化
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// 一个掩码,代表需要那些方法需要执行,后面会说到
this.executionMask = mask(handlerClass);.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}

TailContext继承关系:

Pipeline添加ChannelHandler

ChannelHandler分为入站ChannelInboundHandler 和出站ChannelOutboundHandler 两种处理器,看下他们的继承关系:

ChannelInboundHandler

ChannelOutboundHandler

ChannelInboundHandler和ChannelOutboundHandler都是ChannelHandler的子类。

ChannelInboundHandler是入站处理器,如果以服务端为角度,那么入站指的就是从客户端发送过来数据(数据流向从客户端到服务端),触发从头结点HeadContext开始事件传播,一直到尾节点TailContext结束。

ChannelOutboundHandler是出站处理器,与ChannelInboundHandler相反,同样以服务器为角度,数据流向从服务端到客户端就是出站,它会从TailContext开始,一直到HeadContext结束。

向Pipeline双向链表中添加ChannelHandler处理器

在创建ServerBootstrap的时候,有一步是为ServerBootstrap设置ChannelHandler,通过addLast方法向Pipeline中添加Handler:

1
2
3
4
5
6
7
8
9
10
11
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型
.localAddress(new InetSocketAddress(port)) // 设置端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置channelHandler
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("handler", new HttpServerHandler());// 自定义Handler
}
})

DefaultChannelPipeline中实现了addLast方法:

  1. 检查是否重复添加Handler
  2. 创建Pipeline节点
  3. 将节点加入到Pipeline双向链表中
  4. 回调handlerAdded方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
// 加锁
synchronized (this) {
// 检查是否重复添加
checkMultiplicity(handler);
// 创建Pipeline节点,是DefaultChannelHandlerContext类型的对象
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到Pipline中
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回调 handlerAdded
callHandlerAdded0(newCtx);
return this;
}
}

检查重复添加

检查重复添加的时候会判断当前的Handler是否是非共享(共享指的是可以添加到多个ChannelPipeline中)的,并且added状态为已添加,如果满足这个条件将抛出异常,防止重复添加。

如果未添加过,将added状态置为true表示已添加,addLast方法是通过synchronized加锁的,由此也看出加锁的必要性,如果不加锁,多线程情况下很可能导致重复添加Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 是否是非共享的handler并且已添加过
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
// 设置为已添加
h.added = true;
}
}

创建节点

创建节点之前首先会过滤handler的名称,如果名称为空,就新创建一个,如果不为空检查是否冲突,接下来开始创建节点:

1
2
3
4
// 过滤名称
name = filterName(name, handler);
// 创建节点
newCtx = newContext(group, name, handler);
过滤名称
  1. 如果名称为空,就生成一个,生成规则是简单类名+ #0 ,如果名称冲突,就将#后面的0改成1,然后数字递增直到名称不重复,如 HeadContext 的默认名称为 “DefaultChannelPipeline$HeadContext#0”
  2. 如果名称不为空,校验名称是否重复,如果重复将抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
// 如果名称为空,为handler生成一个名称
return generateName(handler);
}
// 校验名称是否重复
checkDuplicateName(name);
return name;
}
// 生成名称
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
// 获取handler的Class
Class<?> handlerType = handler.getClass();
// 从缓存中获取
String name = cache.get(handlerType);
// 如果缓存中没有,就生成一个,生成规则是简单类名+#0
if (name == null) {
name = generateName0(handlerType);
// 加到缓存中
cache.put(handlerType, name);
}
// 如果有重复的
if (context0(name) != null) {
// 去掉#后面的数字
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
// 从1开始递增,直到没有重复的
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}

private static String generateName0(Class<?> handlerType) {
// 通过简单类名+#0
return StringUtil.simpleClassName(handlerType) + "#0";
}

private void checkDuplicateName(String name) {
// 如果名称有相同的抛出异常
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}

private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
// 遍历链表的所有节点,判断名称是否有相同的
while (context != tail) {
// 如果名称相同
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
生成节点

节点的类型是DefaultChannelHandlerContext

1
2
3
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext

DefaultChannelHanlerContext将自定义的Handler封装起来,作为链表中的节点。

DefaultChannelHanlerContext是ChannelHandlerContext 的子类,ChannelHandlerContext用于保存ChannelHandler上下文,在Pipeline中传递,它包含了 ChannelHandler 生命周期的所有事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

private final ChannelHandler handler;

DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 调用父类构造函数
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
// 设置pipeline
this.pipeline = pipeline;
this.executor = executor;
// 设置掩码
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}

DefaultChannelHandlerContext继承关系:

将节点加入pipeline

addLast0是尾插法,将生成的节点加入到了链表中尾节点之前,因为tail节点要永远指向尾节点,新节点只能加在尾节点之前:

1
2
3
4
5
6
7
8
9
10
11
12
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 记录尾结点的前一个节点
AbstractChannelHandlerContext prev = tail.prev;
// 当前节点的前一个节点设置为原本尾结点的前一个节点
newCtx.prev = prev;
// 当前节点的下一个节点设置为尾节点
newCtx.next = tail;
// 原本尾结点前的那个节点的下一个节点设置为当前节点
prev.next = newCtx;
// 尾节点的前一个节点设置为当前节点
tail.prev = newCtx;
}
回调handlerAdded

在Handler完成添加之后,会触发Handler的handlerAdded事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用handler的HandlerAdded方法
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}

// 省略代码
}
}
}
// AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
// 触发handler的handlerAdded事件
handler().handlerAdded(this);
}
}
}

Pipeline事件传播

ChannelPipeline分为入站ChannelInboundHandler 和出站ChannelOutboundHandler 两种处理器。

入站(InBound):事件传播方向为head - > tail

出站(Outbount):事件传播方向为tail - > head

Inbound 事件传播

在NioEventLoop处理任务的过程中,如果有就绪的I/O事件,判断事件类型,如果是读事件或者有连接请求,调用NioUnsafe的read方法进行处理:

1
2
3
4
// 如果是读事件或者有连接请求
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}

NioMessageUnsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
// 获取pipeline
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// ChannelRead事件传播
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

// 省略了代码
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// 触发ChannelRead事件
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
// AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 是否是EventLoop绑定线程
if (executor.inEventLoop()) {
// 触发ChannelRead事件
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用handler的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
// 向下传播ChannelRead事件
fireChannelRead(msg);
}
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 找到下一个节点继续执行它的invokeChannelRead,向下传播ChannelRead事件
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}

}
// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 继续传播ChannelRead,会回到AbstractChannelHandlerContext的fireChannelRead,触发下一个节点的ChannelRead
ctx.fireChannelRead(msg);
}
}
}

ChannelInitializer

在Channel初始化过程中,有一步是为pipeline添加ChannelHandler处理器,它添加了一个ChannelInitializer类型的Handler,ChannelInitializer可以为pipeline添加其他的处理器,从代码上可以看到它实现的initChannel方法向Pipeline添加了一个ServerBootstrapAcceptor处理器,用来接收连接。

为什么要使用ChannelInitializer?因为初始化的时候,Channel还未注册到Selector上,所以只能使用ChannelInitializer等待注册完成后,再触发它的initChannel,向Pipeline添加ServerBootstrapAcceptor处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
// 获取pipeline
ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
// 添加ChannelHandler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 通过EventLoop执行任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加ServerBootstrapAcceptor处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是ServerBootstrap的内部类,可以看到它继承了ChannelInboundHandlerAdapter,在InBound事件传播过程中,可以知道如果如果有可读事件或者有连接事件时,会触发channelRead的事件传播,那么看一下ServerBootstrapAcceptor的channelRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 注意这里,将channel注册到了childGroup中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
}

Netty版本:4.1.42.Final

参考:

若地 :Netty 核心原理剖析与 RPC 实践

Netty(十一)源码解析 之 Channel 的 inBound 与 outBound 处理器

深入理解 Netty-Pipeline组件

【Netty】NioEventLoop任务处理

Posted on 2021-05-30

在Channel注册的过程中,有一步是判断当前线程是否是EventLoop绑定的线程,如果不是,将会开启一个新线程,然后调用SingleThreadEventExecutor.this.run()方法,执行核心任务处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 开启线程
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 这里将NioEventLoop的thread成员变量设置为当前获取到的线程,也是实现了当前线程与NioEventLoop的绑定
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行核心任务,在NioEventLoop中实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略了部分代码
}
}
});
}

任务处理

准备知识

Channel和Selector关系

Select操作

(1)select():阻塞的操作,会一直阻塞直到至少一个channel有就绪的事件才返回,返回值为就绪的channel的数量。在以下情况下,可以中断阻塞:

  • 线程被中断
  • 调用了selector.wakeup()

(2) select(long time):和select()类似,会一直阻塞直到至少一个channel有就绪的事件才返回,但是它可以设置超时时间,如果超过了这个时间,也会返回。

(3)selectNow():非阻塞的操作,会立刻返回就绪的channel的数量。

NioEventLoop

进入到NioEventLoop的run方法,run方法中是一个无限循环:

  1. 先调用calculateStrategy方法选择策略,判断任务队列中是否有任务,如果有任务,执行非阻塞的selectNow,返回就绪的channel的数量(必然是大于等于0的,接下来就会跳出switch case走后面的流程),如果没有任务会返回返回SELECT策略:

    • 如果是CONTINUE,重新执行策略选择

    • 如果是BUSY_WAIT,Netty不支持

    • 如果是SELECT,调用select方法,获取准备就绪的 I/O 事件

    • 其他情况,进入default跳出策略选择,继续执行后面的代码
  2. 调用processSelectedKeys处理已经就绪的channel的I/O事件
  3. 处理任务队列,执行队列中的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// 选择策略
public interface SelectStrategy {

int SELECT = -1;

int CONTINUE = -2;

int BUSY_WAIT = -3;
}
// NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
// 无限循环
for (;;) {
try {
try {
// select策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT: // 不支持

case SelectStrategy.SELECT:
// 获取准备就绪的 I/O 事件,这里先将wakenUp置为false
select(wakenUp.getAndSet(false));
// 如果wakenUp为true代表已经执行了select操作
if (wakenUp.get()) {
// 中断select阻塞操作
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// ioRatio用来控制处理I/O时间与处理任务队列时间的占比
if (ioRatio == 100) {
try {
// 处理 I/O 事件
processSelectedKeys();
} finally {
// 处理任务队列
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

int selectNow() throws IOException {
try {
// 非阻塞的select,返回值是已经就绪的channel的数量,如果没有返回0
return selector.selectNow();
} finally {
// wakenUp如果状态为true
if (wakenUp.get()) {
// wakeup()方法用来中断select阻塞操作
selector.wakeup();
}
}
}

}

// DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
// 判断选择策略
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
//如果有任务,调用selectSupplier的get方法执行非阻塞的selectNow操作,否则返回SELECT策略
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

Selector获取 I/O 事件

在select方法中,用来轮询注册的 I/O 事件,它首先记录了当前时间保存在currentTimeNanos变量里,然后开始for循环:

  1. 定时任务队列中的任务是按照开始执行任务的延迟时间从小到大排序的,首先会从定时任务队列中,获取距离当前时间最近一个待执行的任务,计算出还需要多久开始执行,如果距离开始执行任务的时间已经超过了0.5ms,说明定时任务执行的优先级比较高,所以会退出当前的循环,回到上一步的调用方法run()中,重新判断selector策略。
  2. 如果定时任务中没有需要立刻执行的任务,会继续往下走,由于在这个过程中,可能会产生新的任务,所以会调用hasTasks判断普通任务队列taskQueue和tailTasks任务队列是否为空,如果为空,并且将成功的设置了线程唤醒状态,先执行一次非阻塞的selectNow操作,然后中断循环,同样回到上一步的调用方法run()中,重新判断selector策略。
  3. 如果上两个条件都没有满足,则执行一次select(timeoutMillis)阻塞等待 I/O 事件,它在以下条件满足时会被中断:
    • 有就绪的Channel
    • seleter.wakeup()被调用
    • 当前线程被中断
    • 阻塞时间超时
    • 出现空轮询时
  4. 在第3步结束后,会判断是否有就绪的事件、seleter.wakeup()被调用或者有任务队列中有任务,如果满足之一将中断循环,回到上一步,重新判断selector策略。
  5. 处理空轮询的问题,获取当前时间减去select阻塞的超时时间,判断是否大于等于currentTimeNanos(本次select方法开始执行的时间,也就是for循环开始前的时间)
    • 如果大于等于currentTimeNanos,说明select阻塞是在超时情况下被打断的,这种属于正常情况,只需将selectCnt重置为1结束本次循环(因为要进行下一次循环,所以重置)。
    • 如果小于currentTimeNanos,判断执行select的执行次数是否超过了阈值,如果超过则重建Selector,所以Netty通过在某个时间段内判断select的执行次数是否超过阈值来处理空轮询的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 // 轮询注册的 I/O 事件
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
// 获取当前时间
long currentTimeNanos = System.nanoTime();
// delayNanos返回定时任务队列中最近需要执行的任务,距离执行开始的时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
// 开始for循环
for (;;) {
// 判断定时任务中距离开始执行任务的时间是否超过0.5ms
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超过执行时间0.5ms以上,说明定时任务队列中的任务需要立刻执行
if (timeoutMillis <= 0) {
// 判断是否执行过select操作,如果没有,会先进行一次非阻塞的selectNow操作
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
// 退出循环
break;
}

// 判断普通任务队列taskQueue和tailTasks任务队列是否为空,如果不为空,尝试将wakenUp设置为true,表示已退出select阻塞操作
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// 非阻塞的selectNow操作
selector.selectNow();
selectCnt = 1;
// 中断循环
break;
}
// select 阻塞操作获取等待获取 I/O 事件
int selectedKeys = selector.select(timeoutMillis);
// 记录执行select的次数
selectCnt ++;
// 如果有就绪的事件、调用了seleter.wakeup()方法或者有任务队列中有任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 如果当前线程被中断
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 获取当前时间
long time = System.nanoTime();
// 当前时间减去select阻塞的超时时间,如果大于等于currentTimeNanos(select方法开始执行的时间),说明阻塞时间超时情况下被打断的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 重置selectCnt置为1
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 如果执行select的执行次数超过了阈值
// 重建Selector
selector = selectRebuildSelector(selectCnt);
// 重置
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

/**
* 返回最近一个待执行任务距离任务开始执行的时间
*/
protected long delayNanos(long currentTimeNanos) {
// 从定时任务队列中取出位于队头的任务,任务是按照延迟时间从小到大排序的
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
// 返回距离任务开始执行的时间
return scheduledTask.delayNanos(currentTimeNanos);
}

// io.netty.channel.SingleThreadEventLoop中实现的
@Override
protected boolean hasTasks() {
// 判断普通任务队列taskQueue和tailTasks任务队列是否为空
return super.hasTasks() || !tailTasks.isEmpty();
}

/**
* io.netty.util.concurrent.SingleThreadEventExecutor
*/
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}

处理I/O事件

处理I/O事件是在processSelectedKeys方法中实现的:

  1. 首先判断selectedKeys是否为空,它是一个Set集合,如果不为空,走优化后的方法,如果为空,就从selector中获取selectedKeys进行处理,走未优化过的方法。
  2. 遍历selectedKeys,获取当前key的所有就绪操作,有以下几种类型:
    • SelectionKey.OP_CONNECT :处理请求建立连接的事件
    • SelectionKey.OP_WRITE : 处理可写事件
    • SelectionKey.OP_READ :处理可读事件
    • SelectionKey.OP_ACCEPT :处理接受连接请求的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private SelectedSelectionKeySet selectedKeys;

private void processSelectedKeys() {
if (selectedKeys != null) {
// 优化后的
processSelectedKeysOptimized();
} else {
// 普通的
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
// 遍历selectedKeys,处理已经就绪的selectedKeys
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 如果是AbstractNioChannel类型
if (a instanceof AbstractNioChannel) {
// 处理I/O事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 是否需要重新轮询
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
// 获取当前key的所有就绪操作
int readyOps = k.readyOps();
// 处理连接,客户端请求建立连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//
unsafe.finishConnect();
}
// 处理可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理可读事件和连接请求接受
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

处理任务队列

  1. 从定时任务队列中合并任务到普通任务队列taskQueue中
  2. 在无限循环中,从taskQueue中不断获取任务开始执行,直到获取任务为空
  3. 进行收尾工作,处理tailTask中的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { 
protected boolean runAllTasks(long timeoutNanos) {
// 将定时任务队列中的任务放到taskQueue
fetchFromScheduledTaskQueue();
// 从taskQueue中获取一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// 循环处理任务
for (;;) {
// 执行任务
safeExecute(task);

runTasks ++;

if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 取出下一个任务
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
// 从定时任务队列中获取任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
// 将定时任务添加到taskQueue
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
// io.netty.util.concurrent.AbstractScheduledEventExecutor
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
// 如果定时任务还未到截止执行时间,先不处理
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
// 走到这里,说明已经过了截止时间,需要紧急处理
// 取出需要执行的定时任务
scheduledTaskQueue.remove();
return scheduledTask;
}
}
// AbstractEventExecutor
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
protected static void safeExecute(Runnable task) {
try {
// 运行任务
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
}

tailTask任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected void afterRunningAllTasks() {
// 处理tailTask的任务
this.runAllTasksFrom(this.tailTasks);
}
}

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从队列中获取任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 获取下一个任务
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
}

Netty版本:4.1.42.Final

参考:

Netty(九)源码解析 之 NioEventLoop 任务的执行

若地 :Netty 核心原理剖析与 RPC 实践

深入理解 NioEventLoop启动流程

Java NIO之Selector(选择器)

【Netty】EventLoopGroup

Posted on 2021-05-29

EventLoop继承关系

1
2
3
4
5
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
// 设置EventLoopGroup
b.group(bossGroup, workerGroup);

NioEventLoopGroup

  1. NioEventLoopGroup是一个NioEventLoop组,它有一个EventExecutor[] 成员变量,数组中的每一个元素是NioEventLoop类型的对象,所以NioEventLoopGroup可以看做是一系列NioEventLoop的集合。

  2. NioEventLoopGroup初始化的时候可以指定线程数,如果没有指定,将获取系统可以的处理器数量*2作为线程数,这个线程数代表着NioEventLoopGroup可以有多少个NioEventLoop。

  3. NioEventLoopGroup的构造函数中,有一个Executor类型的参数,默认使用ThreadPerTaskExecutor实现,因为NioEventLoop中也有一个Executor,暂且将这个NioEventLoopGroup的Executor称为全局的Executor,在实例化NioEventLoop的时候,传入了这个全局的Executor。

    • ThreadPerTaskExecutor的execute方法会通过ThreadFactory线程工厂创建一个线程,并且启动该线程,EventLoop中就是通过它来创建线程的。
    • 看到Executor首先会想起Java中的线程池,将需要执行的任务添加到线程池,由线程池管理并执行任务。
  4. NioEventLoopGroup中有一个EventExecutorChooser选择器,因为EventExecutor[] 数组中有多个NioEventLoop对象,选择器就是来选择使用哪个NioEventLoop。

NioEventLoopGroup初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* NioEventLoopGroup构造函数
*/
public NioEventLoopGroup() {
this(0);
}
// 带有线程数的构造函数
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
// 带有线程数和Executor的构造函数
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 带有线程数、Executor和SelectorProvider的构造函数
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// 带有线程数、Executor、SelectorProvider和SelectStrategyFactory的构造函数
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
// 调用父类的构造函数,在MultithreadEventLoopGroup中实现
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

}

MultithreadEventLoopGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
// EventExecutor线程组
private final EventExecutor[] children;
// 注意这个成员变量在父类MultithreadEventExecutorGroup类中
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

static {
// 设置默认的EventLoop线程数:先从系统配置中获取io.netty.eventLoopThreads的值,如果没有配置,就获取系统可以的处理器数量*2作为线程数
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

/**
* 最终会执行这个构造函数
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 如果线程数小于等于0抛出异常
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 一个executor线程池,用来创建线程,默认使用ThreadPerTaskExecutor类型的执行器NioEventLoop中也有一个Executor,暂且将这个Executor称为NioEventLoopGroup全局的executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建EventExecutor线程组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 为每一个创建EventExecutor完成实例化,在newChild方法中可以看到,创建的是NioEventLoop类型的对象
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果出现异常,对EventExecutor进行关闭处理
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 初始化选择器ExecutorChooser
chooser = chooserFactory.newChooser(children);
// 为每一个EventExecutor添加一个关闭的监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 存放所有的EventExecutor Set集合
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}

// ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
// 通过线程工厂创建线程
threadFactory.newThread(command).start();
}
}
// DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
// 创建线程
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}

if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
}
return t;
}
}
// NioEventLoopGroup
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
// 实例化EventExecutor数组的每一个对象
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 创建NioEventLoop,这里传入了NioEventLoopGroup的executor
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}

// DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 实例化ExecutorChooser,判断数组长度是否是2的整数次幂
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
}

NioEventLoop

  1. NioEventLoop中包含了一个Selector
  2. NioEventLoop中有三种任务队列:
    • 普通任务队列taskQueue,一些核心的任务就是放在taskQueue中的
    • 定时任务队列scheduledTaskQueue,处理一些定时任务
    • 收尾任务队列tailTasks,做一些收尾的工作
  3. 每个NioEventLoop中也有一个Executor,因为NioEventLoop是Executor的子类,所以NioEventLoop本身也是一个Executor
  4. ThreadExecutorMap中维护了线程和EventLoop的绑定关系,记录了每个线程绑定到了哪个EventLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public final class NioEventLoop extends SingleThreadEventLoop {

/**
* selector
*/
private Selector selector;
private Selector unwrappedSelector;

// 构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 调用父类SingleThreadEventLoop的构造函数
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
// SelectStrategy判空
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
// 封装的selector
selector = selectorTuple.selector;
// 原始的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// 创建任务队列
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {
return queueFactory == null?newTaskQueue0(DEFAULT_MAX_PENDING_TASKS):queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
}

// io.netty.channel.SingleThreadEventLoop
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
// tailTask
private final Queue<Runnable> tailTasks;
// 构造函数
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
// 调用SingleThreadEventExecutor父类构造函数
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 设置 tailTask
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
}

// io.netty.util.concurrent.SingleThreadEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 普通任务任务队列taskQueue
private final Queue<Runnable> taskQueue;
private final Executor executor;
private final RejectedExecutionHandler rejectedExecutionHandler;

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
// 调用父类的构造函数
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 初始化NioEventLoop的executor,this.executor 指的是当前对象的executor,为了防止混淆,先看作是NioEventLoop的executor,apply(executor, this)中的executor,是NioEventLoopGroup实例化的时候创建的全局的Executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 设置taskQueue
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
// 拒绝策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}

// io.netty.util.concurrent.AbstractScheduledEventExecutor
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
// 定时任务队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}

ThreadExecutorMap

ThreadExecutorMap中记录了每个线程绑定的EventLoop,从名字上也能看出,它记录的是Thread线程和Executor的对应关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class ThreadExecutorMap {
// FastThreadLocal类型的,记录每个线程绑定的EventLoop(EventLoop是EventExecutor的子类)
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();

// 设置当前线程关联的EventExecutor
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
// executor是NioEventLoopGroup实例化的时候创建的全局的Executor,eventExecutor当前正在创建的EventLoop(NioEventLoop)对象
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 创建一个Executor
return new Executor() {
@Override
public void execute(final Runnable command) {
// 全局的executor的execute方法会创建一个线程,并且启动线程,所以apply返回Runnable对象后,会交给全局的executor来创建线程并启动
executor.execute(apply(command, eventExecutor));
}
};
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 设置当前线程关联的eventExecutor
setCurrentEventExecutor(eventExecutor);
try {
// 运行任务
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
}

Channel初始化与注册

以Channel初始化与注册的过程为例,看一下EventLoop的使用:

回到AbstractBootstrap的initAndRegister初始化并注册Channel的方法,可以看到获取了EventLoopGroup,然后调用它的register方法进行注册。

EventLoopGroup的register实际上是通过选择器,从EventLoop数组中选择了一个EventLoop返回,最终调用EventLoop的register方法进行注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// AbstractBootstrap
final ChannelFuture initAndRegister() {
// ...
// 获取EventLoopGroup,调用register进行注册
ChannelFuture regFuture = config().group().register(channel);
// ...
}

// AbstractBootstrapConfig
@SuppressWarnings("deprecation")
public final EventLoopGroup group() {
return bootstrap.group();
}

// MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
//调用了next()方法获取一个EventLoop
return next().register(channel);
}
// 获取EventLoop
@Override
public EventLoop next() {
// 调用父类的next方法,进入到MultithreadEventExecutorGroup的next方法
return (EventLoop) super.next();
}

// MultithreadEventExecutorGroup
@Override
public EventExecutor next() {
// 选择一个EventLoop
return chooser.next();
}
// DefaultEventExecutorChooserFactory
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
// 选择一个EventLoop
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

AbstractChannel的register进行注册:

  1. 可以看到将当前的Channel绑定到了选择EventLoop上。
  2. 判断当前线程是否是NioEventLoop中的线程,首次启动注册Channel时,当前线程不是NioEventLoop的线程,所以会走到else的代码中,向EventLoop中添加了一个注册任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断是否已经注册过
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 设置当前channel的eventLoop,也就是将channel绑定到了EventLoop上
AbstractChannel.this.eventLoop = eventLoop;
// 当前线程是否是EventLoop中的线程
if (eventLoop.inEventLoop()) {
// 注册channel
register0(promise);
} else {
try {
// 将注册任务放入EventLoop中执行,SingleThreadEventExecutor中实现了execute方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 注册channel
register0(promise);
}
});
} catch (Throwable t)
// 省略...
}
}
}
}
// AbstractEventExecutor
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
// SingleThreadEventExecutor中实现
@Override
public boolean inEventLoop(Thread thread) {
// 判断当前线程与EventLoop中的线程是否相等
return thread == this.thread;
}
}
// SingleThreadEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
}

SingleThreadEventExecutor

SingleThreadEventExecutor实现了execute方法,进行任务添加:

  1. 将需要执行的任务放到了taskQueue任务队列中
  2. 如果不是EventLoop线程(首次启动Channel不是EventLoop线程),调用startThread开启了一个线程,开启方式是通过调用EventLoop的executor,向线程池中执行任务,最终是通过EventLoopGroup的executor的execute方法会创建线程并且启动线程。
    • 之前提到过EventLoop也是一个Executor,所以它本身也可以接受任务,实现了execute方法,这个方法就是在SingleThreadEventExecutor中实现的。
    • EventLoop中有一个Executor的成员变量,它的execute方法是通过EventLoopGroup的Executor创建线程使用的。
  3. 将NioEventLoop的thread成员变量设置为当前获取到的线程,实现了当前线程与NioEventLoop的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//io.netty.util.concurrent.SingleThreadEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 添加任务,这个execute方法是EventLoop自己的
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 是否是EventLoop中的线程
boolean inEventLoop = inEventLoop();
// 将任务添加到taskQueue
addTask(task);
// 如果不是EventLoop线程
if (!inEventLoop) {
// 开启线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

// 开启线程
private void startThread() {
// 是否未启动状态
if (state == ST_NOT_STARTED) {
// 设置为启动状态
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 开启线程
doStartThread();
success = true;
} finally {
if (!success) {
// 如果失败,状态设置未启动
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
// 开启线程
private void doStartThread() {
assert thread == null;
// 调用EventLoop的executor,向线程池中添加任务,execute方法会进入到ThreadExecutorMap类中apply方法实例化Executor时实现的execute方法
executor.execute(new Runnable() {
@Override
public void run() {
// 这里将NioEventLoop的thread成员变量设置为当前获取到的线程,也是实现了当前线程与NioEventLoop的绑定
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行核心任务,在NioEventLoop中实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略了部分代码
}
}
});
}

// 向taskQueue队列中添加任务
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
// 添加到taskQueue
return taskQueue.offer(task);
}
}

ThreadExecutorMap

在NioEventLoop实例化的时候可以知道,NioEventLoop的Executor是在ThreadExecutorMap中完成实例化的,所以调用NioEventLoop的Executor的execute方法就会进入ThreadExecutorMap中实例化Executor时重写的execute方法,在这个方法中,又会调用EventLoopGroup的executor创建线程并启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 创建一个Executor对象
return new Executor() {
// 并重写execute方法
@Override
public void execute(final Runnable command) {
// EventLoopGroup的executor的execute方法会创建一个线程,并且启动线程,所以apply返回Runnable对象后,会交给全局的executor来创建线程并启动
executor.execute(apply(command, eventExecutor));
}
};
}

总结:

到这里,应该已经知道EventLoop和EventLoopGroup的作用是什么了,再次总结一下:

1. NioEventLoopGroup是一个NioEventLoop组,里面包含多个NioEventLoop,可以看做是一系列NioEventLoop的集合

2. NioEventLoopGroup初始化会实例化一个Executor,它是用来创建线程使用的,可以看做是一个创建线程的线程池

3. NioEventLoopGroup会通过选择策略,类似于负载均衡算法从NioEventLoop数组中选取一个NioEventLoop

4.NioEventLoop中有三个任务队列,分别是taskQueue(处理普通任务)、tailTasks(收尾工作处理)和scheduledTaskQueue(定时任务队列)

5.NioEventLoop是Executor的子类,所以它本身也是一个Executor,可以向NioEventLoop中提交任务,任务提交后会添加到任务队列中等待处理

6.NioEventLoop中也有一个Executor,以成员变量的形式存在,它可以调用NioEventLoopGroup的Executor来创建线程,执行任务

Netty版本:4.1.42.Final

参考:

Netty(八)源码解析 之 NioEventLoopGroup和NioEventLoop源码分析、NioEventLoop绑定线程的流程分析

【Netty】Netty启动流程

Posted on 2021-05-25

首先看一段Netty服务端启动的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// EventLoopGroup线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型
.localAddress(new InetSocketAddress(port)) // 设置端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置channelHandler
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("codec", new HttpServerCodec())
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并启动服务
ChannelFuture f = b.bind().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
  1. 创建了bossGroup和workerGroup,它们都是EventLoopGroup类型的
  2. 创建了ServerBootstrap启动类
  3. 为ServerBootstrap设置EventLoopGroup、Channel类型、端口和ChannelHandler
  4. ServerBootstrap绑定端口并且启动服务

以ServerBootstrap的bind方法为入口,看下Netty服务的启动过程。

ServerBootstrap

ServerBootstrap是AbstractBootstrap的子类,bind方法在AbstractBootstrap中实现,主要包含了两件事情:

  1. 创建Channel并完成Channel的初始化与注册
  2. 将channel进行端口绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { 
// ChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;

public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
// 调用了doBind方法
return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并且注册Channel
final ChannelFuture regFuture = initAndRegister();
// 获取Channel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 端口绑定
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 端口绑定
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

Channel的初始化与注册

  1. 创建Channel
  2. 初始化channel
  3. 注册Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1.通过channelFactory创建Channel
channel = channelFactory创建Channel.newChannel();
// 2. 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 3. 注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

创建Channel

ChannelFactory

ChannelFactory是一个接口,从名字上可以看出它是创建channel的一个工厂,通过DEBUG可以看出使用的ReflectChannelFactory进行channel创建的,进入ReflectChannelFactory,可以看到通过Constructor进行Channel创建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 设置constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}

@Override
public T newChannel() {
try {
// 通过反射创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

那么这个Constructor是在什么时候传入的呢,回到最开始的代码,可以看到为ServerBootstrap设置了channel类型,类型为NioServerSocketChannel:

1
2
3
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型

进入ServerBootstrap的channel()方法,可以看到创建了ReflectiveChannelFactory,并且设置了channel类型:

1
2
3
4
5
6
7
8
public abstract class AbstractBootstrap {
public B channel(Class<? extends C> channelClass) {
// 创建了ReflectiveChannelFactory类型的channelFactory,并传入了channel类型也就是上一步中设置的NioServerSocketChannel
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
}

总结:

1.ServerBootstrap会设置Channel的类型,选择创建什么类型的Channel。

2.使用ChannelFactory(ReflectChannelFactory实现),根据设置的channel类型,通过反射创建出NioServerSocketChannel,完成Channel的创建。

NioServerSocketChannel

NioServerSocketChannel中的构造函数可以看出是通过SelectorProvider创建channel的:

  1. 构建NioServerSocketChannel的时候指定SelectorProvider,就通过指定的SelectorProvider来创建
  2. 如果没有指定SelectorProvider,那么就通过调用SelectorProvider的provider()方法返回一个SelectorProvider
  3. 调用SelectorProvider的openServerSocketChannel完成channel的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

private final ServerSocketChannelConfig config;

// 默认的SelectorProvider
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* 构造函数,创建channel
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
* 构造函数,通过指定的SelectorProvider创建channel
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* 通过指定的channel创建NioServerSocketChannel
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 调用openServerSocketChannel创建Channel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
}
SelectorProvider

SelectorProvider是JDK中的一个抽象类,provider()方法中,有三种方式创建SelectorProvider:

  1. loadProviderFromProperty()方法

    调用JVM的系统配置,获取环境配置信息,判断是否配置了java.nio.channels.spi.SelectorProvider参数,如果配置了,就使用配置的SelectorProvider实现类进行创建。

  2. loadProviderAsService()方法

    通过SPI机制查找配置的ServiceLoader实现类。

  3. 使用DefaultSelectorProvider创建

    默认的SelectorProvider,不同的操作系统下JDK的代码也不一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class SelectorProvider { 
// 创建provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// 根据系统配置选择SelectorProvider
if (loadProviderFromProperty())
return provider;
// 根据SPI机制选择SelectorProvider
if (loadProviderAsService())
return provider;
// 调用DefaultSelectorProvider创建provider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

private static boolean loadProviderFromProperty() {
// 调用JVM的系统配置,获取环境配置信息,判断是否配置了java.nio.channels.spi.SelectorProvider
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
// 根据配置的SelectorProvider的类型创建SelectorProvider
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
// ...
// 省略了异常处理
}
}

private static boolean loadProviderAsService() {
// 通过SPI机制查找配置的ServiceLoader实现类
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
// ...
}
}
}
}
DefaultSelectorProvider

操作系统和版本的不同,DefaultSelectorProvider返回的SelectorProvider也不同,比如我的电脑是MAC操作系统,在create方法中返回的就是KQueueSelectorProvider:

1
2
3
4
5
6
7
8
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
// 创建DefaultSelectorProvider,MAC下使用的是KQueueSelectorProvider
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}

看下DefaultSelectorProvider在Linux操作系统下的create方法实现:

1
2
3
4
5
6
7
8
9
10
11
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
// EPoll
        return createProvider("sun.nio.ch.EPollSelectorProvider");
// Poll
    return new sun.nio.ch.PollSelectorProvider();
}

总结:根据操作系统和版本可以创建出不同类型的SelectorProvider,比如Linux下使用的是EPollSelectorProvider、SunOS下使用的是DevPollSelectorProvider。

初始化Channel

回到initAndRegister中的init方法,看下Channel的初始化过程,init方法是在ServerBootstrap中实现的,初始化工作主要是设置一些参数、为ChannelPipeline设置处理器等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
// 设置一些参数
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
// 创建ChannelPipeline
ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
// 添加处理器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

注册Channel

回到AbstractBootstrap的initAndRegister方法,可以看到是调用EventLoopGroup的register方法进行channel注册的,它会从EventLoopGroup选择一个EventLoop与channel进行绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public abstract AbstractBootstrapConfig<B, C> config();

final ChannelFuture initAndRegister() {
// ...
// 调用了AbstractBootstrapConfig的group方法,返回了EventLoopGroup,然后调用EventLoopGroup的register方法进行channel注册
ChannelFuture regFuture = config().group().register(channel);
// 省略了代码
}
}

// AbstractBootstrapConfig
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
// 返回bootstrap设置的EventLoopGroup
public final EventLoopGroup group() {
return bootstrap.group();
}
}

最终会调用到AbstractChannel的register方法进行channel的注册,中间的跳转过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1. MultithreadEventLoopGroup实现了register方法,
public abstract class MultithreadEventLoopGroup实现了 extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
// 继续向下调用,会进入SingleThreadEventLoop的register方法
return next().register(channel);
}
}

// 2.SingleThreadEventLoop
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

@Override
public ChannelFuture register(Channel channel) {
// 创建了DefaultChannelPromise,与当前Channel进行绑定
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 调用当前channel对象的unsafe,unsafe是AbstractNioChannel的一个内部类,AbstractNioChannel是AbstractChannel的一个子类,可以调用到父类的register方法,也就进入到了AbstractChannel的register方法
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel

AbstractChannel的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断是否已经注册过
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 设置当前channel的eventLoop,也就是将channel绑定到了EventLoop上
AbstractChannel.this.eventLoop = eventLoop;
// 当前线程是否是EventLoopGroup中的线程
if (eventLoop.inEventLoop()) {
// 注册channel
register0(promise);
} else {
try {
// 将注册任务放入EventLoop中执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 注册channel
register0(promise);
}
});
} catch (Throwable t) {
// 省略...
}
}
}

private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 进行注册,在AbstractNioChannel中实现
doRegister();
neverRegistered = false;
registered = true;
// 触发pipeline HandlerAdded
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
AbstractNioChannel

doRegister是在AbstractNioChannel中实现的:

  1. this.javaChannel()返回了ServerSocketChannel,具体类型是sun.nio.ch.ServerSocketChannelImpl,它是JDK中的类
  2. this.eventLoop().unwrappedSelector()返回了EventLoop中的Selector
  3. 调用了JDK底层的方法,将当前的channel绑定到了EventLoop中的Selector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class AbstractNioChannel extends AbstractChannel {
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
// 将Channel注册到Selecter上,javaChannel返回了NioServerSocketChannel,this.eventLoop().unwrappedSelector()返回了selector
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if(selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
}
// NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

@Override
protected ServerSocketChannel javaChannel() {
// 返回ServerSocketChannel,具体类型是sun.nio.ch.ServerSocketChannelImpl,调用了JDK底层的方法,将channel注册到了Selector上
return (ServerSocketChannel) super.javaChannel();
}
}

端口绑定

回到AbstractBootstrap的doBind0方法,这里实现了端口的绑定,跟着断点最终会进入到NioServerSocketChannel的bind方法,又是调用了JDK底层的方法进行端口的绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// doBind中调用了doBind0进行端口绑定
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 由EventLoop异步实现
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// channel与selector绑定的异步任务是否成功
if (regFuture.isSuccess()) {
// 调用了channel的bind方法进行绑定,会进入到AbstractChannel的bind方法
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}

// io.netty.channel.AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 调用了pipline的bind方法
return pipeline.bind(localAddress, promise);
}
}
// io.netty.channel.DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 会进入到AbstractChannelHandlerContext的bind方法
return tail.bind(localAddress, promise);
}
}
// io.netty.channel.AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 空值校验
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
// 判断是否是EventLoop中的线程
if (executor.inEventLoop()) {
// 端口绑定
next.invokeBind(localAddress, promise);
} else {
// 在EventLoop中异步处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
// 端口绑定
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// 端口绑定
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 进入到DefaultChannelPipeline的bind方法
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
// io.netty.channel.DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// unsage是AbstractNioMessageChannel类型的,bind方法在AbstractChannel中实现
unsafe.bind(localAddress, promise);
}
}
// io.netty.channel.AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 省略了部分代码
boolean wasActive = isActive();
try {
// 端口绑定,会进入到NioServerSocketChannel
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 绑定完成后,Channel处于活跃状态
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
}
// NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用了JDK底层的方法进行端口绑定
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
}

Accept事件监听

在AbstractChannel的bind方法中,端口完成绑定后会判断Channel的活跃状态,然后调用fireChannelActive触发Channle活跃的事件,跟着断点最终会进入到AbstractNioChannel的doBeginRead方法:

1
2
3
4
5
6
7
8
9
10
// 绑定完成后,Channel处于活跃状态
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发事件
pipeline.fireChannelActive();
}
});
}

NioServerSocketChannel的构造函数中,初始化了readInterestOp为SelectionKey.OP_ACCEPT,所以在doBeginRead中,OP_ACCEPT事件会被注册到Channel的事件集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//NioServerSocketChannel
public class NioServerSocketChannel{
/**
* 通过指定的channel创建NioServerSocketChannel
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
// 设置了设置了readInterestOp为SelectionKey.OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}

public abstract class AbstractNioChannel extends AbstractChannel {
protected final int readInterestOp;

@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 注册OP_ACCEPT事件到Channel
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}

Netty版本:4.1.42.Final

参考:

Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码分析

若地 :Netty 核心原理剖析与 RPC 实践

【Mybatis】Mybatis执行流程

Posted on 2021-04-10

Mybatis四大组件

Executor:执行器,用来调度StatementHandler、ParameterHandler、ResultHandler等来执行对应的SQL。

StatementHandler:回顾一下JDBC操作数据库的过程,首先加载驱动创建连接,通过连接创建Statement,之后就可以使用Statement进行增删改查操作,在Mybatis中StatementHandler可以看作是对Statement的封装,用于执行数据库操作,它是四大组件的核心。

1
2
3
4
5
6
7
8
// 加载驱动
Class.forName("com.mysql.jdbc.Driver");
// 创建连接
Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/db_test", "username", "password");
// 通过连接创建Statement
Statement statement = connection.createStatement();
// 通过Statement进行数据库的增删改查操作
ResultSet resultSet = statement.executeQuery("select * from test");

ParameterHandler:用于对查询参数进行处理。

ResultSetHandler:用于执行SQL后对返回的数据集ResultSet的封装处理。

首先,看一下手动加载mybatis配置文件,获取SqlSession的过程:

  1. 加载xml配置文件
  2. 通过SqlSessionFactoryBuilder构建SqlSessionFactory
  3. 通过SqlSessionFactory获取SqlSession
1
2
3
4
5
6
7
8
// mybatis的xml配置文件
String resource = "mybatis-config.xml";
// 加载配置文件,获取文件流
InputStream inputStream = Resources.getResourceAsStream(resource);
// SqlSessionFactoryBuilder构建SqlSessionFactory
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
// 根据SqlSessionFactory创建SqlSession
SqlSession session = sqlSessionFactory.openSession();

可以看到通过SqlSessionFactoryBuilder可以构建SqlSessionFactory,然后通过SqlSessionFactory就可以获取到SqlSession进行数据库的增删改查操作了,那么就从SqlSessionFactoryBuilder入手看一下Mybatis的执行流程。

Configuration加载

我们知道mybatis是有配置文件的,那么使用mybatis的过程中,首先它一定会去加载配置文件解析各种配置。Configuration加载包括两个部分,一个是解析配置文件,另外一个是创建SqlSessionFactory,这些操作是在SqlSessionFactoryBuilder的build方法中完成的。

1
2
3
4
5
6
7
8
9
<!-- mybatis配置文件 -->
<configuration>
<typeAliases>
<package name="com.springboot.entity"/>
</typeAliases>
<mappers>
<mapper resource="com/springboot/mapper/StudentMapper.xml"/>
</mappers>
</configuration>

SqlSessionFactoryBuilder

SqlSessionFactoryBuilder主要干了两件事情:

  1. 通过传入的文件流,创建了XMLConfigBuilder对象,从名字可以看出这是一个处理XML配置文件相关的类,调用了它的parse方法,解析XML配置文件,然后返回一个Configuration对象。
  2. XML解析结果Configuration作为参数,调用build方法创建了DefaultSqlSessionFactory,DefaultSqlSessionFactory实现了SqlSessionFactory接口,可以作为SqlSessionFactory返回,完成SqlSessionFactory的构建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class SqlSessionFactoryBuilder {

// 构建SqlSessionFactory
public SqlSessionFactory build(Reader reader) {
return build(reader, null, null);
}

// 省略了其他的build的方法

/**
* 其他build方法最终都是调用这个build方法构建的
* @param inputStream 文件流
* @param environment
* @param properties
* @return
*/
public SqlSessionFactory build(InputStream inputStream, String environment, Properties properties) {
try {
// 构建XMLConfigBuilder,用来解析XML配置文件
XMLConfigBuilder parser = new XMLConfigBuilder(inputStream, environment, properties);
// 调用build方法,创建DefaultSqlSessionFactory,parser.parse()方法用来解析XML文件中的各种配置
return build(parser.parse());
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error building SqlSession.", e);
} finally {
ErrorContext.instance().reset();
try {
inputStream.close();
} catch (IOException e) {
// Intentionally ignore. Prefer previous error.
}
}
}

public SqlSessionFactory build(Configuration config) {
// 创建一个DefaultSqlSessionFactory返回,它实现了SqlSessionFactory接口
return new DefaultSqlSessionFactory(config);
}
}

配置文件解析

XMLConfigBuilder

XMLConfigBuilder主要用来解析XML配置文件,在parseConfiguration方法中可以看到,对XML文件的各个节点进行了一系列的解析,这里我们先了解一下整体流程,具体的细节可以先不管。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class XMLConfigBuilder extends BaseBuilder {
// 构造函数
public XMLConfigBuilder(Reader reader) {
this(reader, null, null);
}

public XMLConfigBuilder(Reader reader, String environment, Properties props) {
// 创建了XPathParser对象
this(new XPathParser(reader, true, props, new XMLMapperEntityResolver()), environment, props);
}

// 构造函数,初始化
private XMLConfigBuilder(XPathParser parser, String environment, Properties props) {
super(new Configuration());
ErrorContext.instance().resource("SQL Mapper Configuration");
this.configuration.setVariables(props);
this.parsed = false;
this.environment = environment;
this.parser = parser;
}

/**
* 解析配置
* @return
*/
public Configuration parse() {
if (parsed) {
throw new BuilderException("Each XMLConfigBuilder can only be used once.");
}
parsed = true;
// 获取configuration节点,进行配置解析
parseConfiguration(parser.evalNode("/configuration"));
return configuration;
}

// 解析配置
private void parseConfiguration(XNode root) {
try {
//issue #117 read properties first
propertiesElement(root.evalNode("properties"));
// 解析settings
Properties settings = settingsAsProperties(root.evalNode("settings"));
loadCustomVfs(settings);
// 解析别名
typeAliasesElement(root.evalNode("typeAliases"));
// 解析插件
pluginElement(root.evalNode("plugins"));
objectFactoryElement(root.evalNode("objectFactory"));
objectWrapperFactoryElement(root.evalNode("objectWrapperFactory"));
reflectorFactoryElement(root.evalNode("reflectorFactory"));
settingsElement(settings);
// read it after objectFactory and objectWrapperFactory issue #631
environmentsElement(root.evalNode("environments"));
databaseIdProviderElement(root.evalNode("databaseIdProvider"));
typeHandlerElement(root.evalNode("typeHandlers"));
// 解析mapper
mapperElement(root.evalNode("mappers"));
} catch (Exception e) {
throw new BuilderException("Error parsing SQL Mapper Configuration. Cause: " + e, e);
}
}
}

创建SqlSessionFactory

SqlSessionFactory

SqlSessionFactory是一个接口,从名字上就可以看出它和SqlSession有关,是创建SqlSession的一个工厂,里面主要包含了openSession和getConfiguration获取Configuration对象的方法。

1
2
3
4
5
6
7
8
9
10
11
12
public interface SqlSessionFactory {
SqlSession openSession();
SqlSession openSession(boolean autoCommit);
SqlSession openSession(Connection connection);
SqlSession openSession(TransactionIsolationLevel level);

SqlSession openSession(ExecutorType execType);
SqlSession openSession(ExecutorType execType, boolean autoCommit);
SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);
SqlSession openSession(ExecutorType execType, Connection connection);
Configuration getConfiguration();
}
DefaultSqlSessionFactory

在SqlSessionFactoryBuilder的build方法中可以知道它创建了一个DefaultSqlSessionFactory返回,DefaultSqlSessionFactory实现了SqlSessionFactory接口,它提供了两个方法创建SqlSession,分别是openSessionFromDataSource和openSessionFromConnection,它们都创建的是DefaultSqlSession。

创建DefaultSqlSession需要传入Executor作为参数,所以会先调用Configuration的newExecutor方法创建Executor,在newExecutor方法中会根据传入的ExecutorType生成对应的Executor。

所以DefaultSqlSessionFactory也主要干了两件事:

  1. 创建Executor执行器
  2. 创建DefaultSqlSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class DefaultSqlSessionFactory implements SqlSessionFactory {
private final Configuration configuration;

public DefaultSqlSessionFactory(Configuration configuration) {
this.configuration = configuration;
}

@Override
public SqlSession openSession() {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
}

// 通过DataSource创建SqlSession
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
// 事务相关
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
// 根据Configuration创建Executor执行器,默认返回的是SimpleExecutor
final Executor executor = configuration.newExecutor(tx, execType);
// 创建了一个DefaultSqlSession返回
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

// 通过Connection创建SqlSession
private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
try {
boolean autoCommit;
try {
// 设置自动提交
autoCommit = connection.getAutoCommit();
} catch (SQLException e) {
// Failover to true, as most poor drivers
// or databases won't support transactions
autoCommit = true;
}
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
final Transaction tx = transactionFactory.newTransaction(connection);
final Executor executor = configuration.newExecutor(tx, execType);
// 创建了一个DefaultSqlSession返回
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
}
创建Executor
  • SimpleExecutor: 简单执行器,默认使用的执行器就是SimpleExecutor。

  • ReuseExecutor: 可重用执行器。

  • BatchExecutor: 批量执行器,用于进行批量处理。

  • CachingExecutor:缓存执行器,如果开启了缓存,会返回CachingExecutor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Configuration {
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
//根据类型判断创建哪种类型的执行器
if (ExecutorType.BATCH == executorType) {
// 批量Executor
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
// ReuseExecutor
executor = new ReuseExecutor(this, transaction);
} else {
// 默认的执行器
executor = new SimpleExecutor(this, transaction);
}
//如果开启了缓存,创建缓存执行器(装饰者模式)
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
}

查询过程的执行

经过上面的步骤,拿到了一个DefaultSqlSessionFactory,接下来就可以通过DefaultSqlSessionFactory来获取SqlSession对象了,默认返回的是DefaultSqlSession,获取到SqlSession就可以对数据库进行增删改查了。

DefaultSqlSession

DefaultSqlSession中实现了SqlSession接口中的增删改查方法,以selectList查询方法为例,可以看到最终是通过执行器进行查询的,那么接下来就以SimpleExecutor为例,看一下Mybatis的查询过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DefaultSqlSession implements SqlSession {

private final Configuration configuration;
private final Executor executor;

...

@Override
public <E> List<E> selectList(String statement) {
return this.selectList(statement, null);
}

@Override
public <E> List<E> selectList(String statement, Object parameter) {
return this.selectList(statement, parameter, RowBounds.DEFAULT);
}

@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
// 从配置类中获取MappedStatement
MappedStatement ms = configuration.getMappedStatement(statement);
// 通过执行器Executor进行查询
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

...
}

SimpleExecutor

Mybatis默认使用的执行器是SimpleExecutor,进入SimpleExecutor的query方法查看一下执行过程,query方法实际是在其父类BaseExecutor实现的,所以先进入BaseExecutor的query方法,这里我们只需要关注queryFromDatabase方法即可,和缓存有关的先不管:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public abstract class BaseExecutor implements Executor {

@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
...
try {
queryStack++;
//从缓存中获取数据,key的类型为CacheKey
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
//如果获取结果为空,从数据库中查找
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
...
return list;
}

// 从数据库中查询
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
// 放入缓存
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
// 执行查询,由BaseExecutor子类实现doQuery方法
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
// 放入缓存
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
}

SimpleExecutor的doQuery方法:

  1. 获取Configuration配置类

  2. 生成StatementHandler

  3. 创建Statement
  4. Statement参数处理
  5. 调用StatementHandler的query方法进行查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SimpleExecutor extends BaseExecutor {
// 查询
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
// 获取Configuration
Configuration configuration = ms.getConfiguration();
// 生成StatementHandler
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
// 创建Statement
stmt = prepareStatement(handler, ms.getStatementLog());
// 通过StatementHandler执行查询
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}

// 生成Statement并处理参数
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
// prepare由StatementHandler由子类实现,用于完成JDBC Statement接口的实例化
stmt = handler.prepare(connection, transaction.getTimeout());
// 处理Statement对应的参数
handler.parameterize(stmt);
return stmt;
}
}

StatementHandler的生成

StatementHandler的生成是在Configuration中的newStatementHandler方法中实现的, 可以看到创建了一个RoutingStatementHandler返回。

1
2
3
4
5
6
7
8
9
public class Configuration {
// 创建StatementHandler
public StatementHandler newStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
//创建RoutingStatementHandler
StatementHandler statementHandler = new RoutingStatementHandler(executor, mappedStatement, parameterObject, rowBounds, resultHandler, boundSql);
statementHandler = (StatementHandler) interceptorChain.pluginAll(statementHandler);
return statementHandler;
}
}

RoutingStatementHandler的构造函数中,根据StatementType进行判断生成哪种类型的StatementHandler,一共有三种类型:

  • SimpleStatementHandler

  • PreparedStatementHandler

  • CallableStatementHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class RoutingStatementHandler implements StatementHandler {

private final StatementHandler delegate;

/**
* 构造函数
* @param executor
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param boundSql
*/
public RoutingStatementHandler(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
//根据类型判断创建哪种处理器
switch (ms.getStatementType()) {
case STATEMENT:
delegate = new SimpleStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
case PREPARED:
delegate = new PreparedStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
case CALLABLE:
delegate = new CallableStatementHandler(executor, ms, parameter, rowBounds, resultHandler, boundSql);
break;
default:
throw new ExecutorException("Unknown statement type: " + ms.getStatementType());
}

}
}

Statement的生成

StatementHandler的prepare方法负责生成Statement,以PreparedStatementHandler为例,看一下Statement的生成过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class BaseStatementHandler implements StatementHandler {

// 生成Statement
@Override
public Statement prepare(Connection connection, Integer transactionTimeout) throws SQLException {
ErrorContext.instance().sql(boundSql.getSql());
Statement statement = null;
try {
// 这里初始化JDBC的Statement对象,instantiateStatement方法由子类实现
statement = instantiateStatement(connection);
setStatementTimeout(statement, transactionTimeout);
setFetchSize(statement);
// 返回statement
return statement;
} catch (SQLException e) {
closeStatement(statement);
throw e;
} catch (Exception e) {
closeStatement(statement);
throw new ExecutorException("Error preparing statement. Cause: " + e, e);
}
}
}

public class PreparedStatementHandler extends BaseStatementHandler {
/**
* 初始化JDBC statement
* @param connection
* @return
* @throws SQLException
*/
@Override
protected Statement instantiateStatement(Connection connection) throws SQLException {
String sql = boundSql.getSql();
if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
String[] keyColumnNames = mappedStatement.getKeyColumns();
if (keyColumnNames == null) {
// 通过Connection创建prepareStatement对象
return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
} else {
return connection.prepareStatement(sql, keyColumnNames);
}
} else if (mappedStatement.getResultSetType() != null) {
return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);
} else {
return connection.prepareStatement(sql);
}
}
}

可以看到,最终通过Connection创建了PrepareStatement。

Statement参数处理

Statement生成之后,调用了parameterize方法进行参数处理,实际上是调用ParameterHandler的setParameters方法对Statement进行参数设置:

1
2
3
4
5
6
7
8
public class PreparedStatementHandler extends BaseStatementHandler {
protected final ParameterHandler parameterHandler;//参数处理器
// 处理参数
@Override
public void parameterize(Statement statement) throws SQLException {
parameterHandler.setParameters((PreparedStatement) statement);//设置参数
}
}

ParameterHandler只是一个接口,它有一个子类DefaultParameterHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public interface ParameterHandler {
Object getParameterObject();
void setParameters(PreparedStatement ps)
throws SQLException;

}

public class DefaultParameterHandler implements ParameterHandler {

private final TypeHandlerRegistry typeHandlerRegistry;
private final MappedStatement mappedStatement;
private final Object parameterObject;
private final BoundSql boundSql;
private final Configuration configuration;
...

// 设置参数
@Override
public void setParameters(PreparedStatement ps) {
ErrorContext.instance().activity("setting parameters").object(mappedStatement.getParameterMap().getId());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
if (parameterMappings != null) {
for (int i = 0; i < parameterMappings.size(); i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) { // issue #448 ask first for additional params
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
// 获取类型处理器
TypeHandler typeHandler = parameterMapping.getTypeHandler();
// 获取参数的JDBC类型
JdbcType jdbcType = parameterMapping.getJdbcType();
if (value == null && jdbcType == null) {
jdbcType = configuration.getJdbcTypeForNull();
}
try {
// 设置参数,setParameter在BaseTypeHandler实现的
typeHandler.setParameter(ps, i + 1, value, jdbcType);
} catch (TypeException e) {
throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e);
} catch (SQLException e) {
throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e);
}
}
}
}
}

}

其中TypeHandler用于实现JAVA类型和JDBC类型的转换,它会根据参数的JAVA类型和JDBC类型选择合适的TypeHandler,再通过TypeHandler进行参数设置,以此达到JAVA类型到JDBC类型的转换。

1
2
3
4
5
<select id="getStudentById" resultMap="studentMap" parameterType="String">
SELECT *
FROM STUDENT
WHERE ID = #{id,javaType=String,jdbcType=VARCHAR}
</select

通过#{id,javaType=String,jdbcType=VARCHAR}可知JAVA类型是String,JDBC类型是VARCHAR,因此mabatis会使用StringTypeHandler进行参数处理。

BaseTypeHandler

进入BaseTypeHandler的setParameter看下参数的设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class BaseTypeHandler<T> extends TypeReference<T> implements TypeHandler<T> {

...

@Override
public void setParameter(PreparedStatement ps, int i, T parameter, JdbcType jdbcType) throws SQLException {
if (parameter == null) {
if (jdbcType == null) {
throw new TypeException("JDBC requires that the JdbcType must be specified for all nullable parameters.");
}
try {
// 类型为空时调用
ps.setNull(i, jdbcType.TYPE_CODE);
} catch (SQLException e) {
throw new TypeException("Error setting null for parameter #" + i + " with JdbcType " + jdbcType + " . " +
"Try setting a different JdbcType for this parameter or a different jdbcTypeForNull configuration property. " +
"Cause: " + e, e);
}
} else {
try {
//当类型不为空时调用,由子类实现
setNonNullParameter(ps, i, parameter, jdbcType);
} catch (Exception e) {
throw new TypeException("Error setting non null for parameter #" + i + " with JdbcType " + jdbcType + " . " +
"Try setting a different JdbcType for this parameter or a different configuration property. " +
"Cause: " + e, e);
}
}
}

// 以StringTypeHandler为例
public class StringTypeHandler extends BaseTypeHandler<String> {

@Override
public void setNonNullParameter(PreparedStatement ps, int i, String parameter, JdbcType jdbcType)
throws SQLException {
ps.setString(i, parameter);//通过PreparedStatement的setString方法设置参数
}
...
}

StatementHandler的查询

由SimpleExecutor的doQuery方法可知,生成StatementHandler和Statement之后,调用了StatementHandler的query方法进行最终的查询,那么再次进入到PreparedStatementHandler,看下一下查询过程的执行:

1
2
3
4
5
6
7
8
9
10
11
12
public class PreparedStatementHandler extends BaseStatementHandler {
// 执行查询
@Override
public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
// 转成PreparedStatement
PreparedStatement ps = (PreparedStatement) statement;
// 执行查询
ps.execute();
// 通过ResultSetHandler处理查询结果
return resultSetHandler.<E> handleResultSets(ps);
}
}

查询结果的处理

在PreparedStatementHandler的query方法中,最后通过ResultSetHandler的handleResultSets方法对statement的查询结果进行了处理。

ResultSetHandler只是一个接口,handleResultSets()方法在它的子类DefaultResultSetHandler中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface ResultSetHandler {
<E> List<E> handleResultSets(Statement stmt) throws SQLException;
<E> Cursor<E> handleCursorResultSets(Statement stmt) throws SQLException;
void handleOutputParameters(CallableStatement cs) throws SQLException;

}

public class DefaultResultSetHandler implements ResultSetHandler {
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
// 获取第一个结果
ResultSetWrapper rsw = getFirstResultSet(stmt);
// 从mappedStatement获取ResultMap
List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
// 将结果转为ResultMap对象
handleResultSet(rsw, resultMap, multipleResults, null);
// 获取下一个结果集
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}
return collapseSingleResultList(multipleResults);
}
}

参考:

五月的仓颉:MyBatis源码分析

acm_lkl:mybatis TypeHandler详解

【RocketMQ】Broker的启动

Posted on 2021-03-20

BrokerStartup

BrokerStartup是Broker的启动类:

1.与NameServer启动时一样,会初始化相关配置参数,与NameServer不一样的地方是,Broker不仅会创建Netty Server的配置, 也会创建Netty Client的配置,因为Broker作为Client,需要向NameServer发送请求进行注册,同时Broker又需要作为Server接受Producer和Consumer的请求生产/消费消息;

2.Broker会从配置文件中获取NameServer的地址列表,后续向NameServer注册;

3.创建BrokerController,BrokerController是Broker的核心组件;

4.注册关闭时的钩子函数,在关闭前处理资源的关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
public class BrokerStartup {
// 入口
public static void main(String[] args) {
start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
try {
// 启动
controller.start();

String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}

log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

// 创建BrokerController
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}

if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}

try {
// 处理命令行相关参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// 创建Broker配置
final BrokerConfig brokerConfig = new BrokerConfig();
// 创建Netty Server配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 创建Netty Client配置
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 设置Netty Cliet TLS
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 设置Netty Server默认端口
nettyServerConfig.setListenPort(10911);
// 创建MessageStoreConfig,从名字上可以看出是存储消息的一些配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 如果是SLAVE
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
// 如果启动的时候命令行中带了-c参数
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
// 读取自定义配置文件
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);

properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);

BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
// 将命令行中的参数设置到BrokerConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 获取NameServer地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
// 解析NameServer地址
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
// 判断Broker角色
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}

break;
default:
break;
}
// 是否基于delger管理主从同步和commitlog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
// 设置HA监听端口
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
// 日志相关
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
// 如果启动带了-p参数
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
// 打印配置参数
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
// 创建BrokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
// 初始化BrokerStartup
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
// 调用BrokerController的shutdown方法进行关资源的关闭
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));

return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

}

BrokerController

BrokerController是Broker的核心组件,包含了一些核心的功能组件和线程池,管理Broker的请求处理、后台线程和磁盘数据。

初始化

1.从磁盘上加载一些相关配置,如Topic配置、Consumer的Offset等数据;

2.创建消息管理组件DefaultMessageStore,如果开启了Dleger,初始化DLeger相关配置;

3.创建Netty Server服务器;

4.创建了一系列的线程池,用来定时调度后台任务,如处理消息拉取的任务、处理发送消息的任务;

5.处理NameServer地址列表;

6.一些其他的操作,暂时先不管。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
public boolean initialize() throws CloneNotSupportedException {
// 从磁盘上加载Topic的配置
boolean result = this.topicConfigManager.load();
// 加载Consumer的Offset
result = result && this.consumerOffsetManager.load();
// 加载订阅组
result = result && this.subscriptionGroupManager.load();
// 加载过滤器
result = result && this.consumerFilterManager.load();

if (result) {
try {
// 创建消息管理组件DefaultMessageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// 初始化DLeger相关配置
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}

result = result && this.messageStore.load();

if (result) {
// 创建Netty Server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
// 设置监听端口
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// 发送消息的线程池,处理发送过来的消息
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
// 拉取消息的线程池,处理消息的拉取
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
// 处理回复消息的线程池
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
// 处理消息查询的线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
// 管理客户端的线程池
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
// 处理心跳机制的线程池
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
// 处理事务结束的线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
// 管理消费者的线程池
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));

this.registerProcessor();
/**一些后台任务的定时调度线程池**/
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
// 处理Broker统计任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 处理消费者offset持久化到磁盘的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 处理过滤器持久化到磁盘的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 对broker保护的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
// 定时进行落后commitlog分发的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (this.brokerConfig.getNamesrvAddr() != null) {
// 更新NameServer地址列表
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
// 拉取NameServer地址列表
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// Dleger相关的处理代码
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
// 先不管
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;

@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}

private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
// 处理事务相关的代码
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}

启动

1.start方法中启动了一系列的组件,比较核心的是Netty Server服务器的启动和BrokerOuterAPI的启动。Netty启动之后就可以接收网络请求,BrokerOuterAPI处理一些核心的功能,比如通过Netty Client向外发送请求;

2.向NameServer注册;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void start() throws Exception {
// 启动消息存储组件
if (this.messageStore != null) {
this.messageStore.start();
}
// 启动Netty Server服务器
if (this.remotingServer != null) {
this.remotingServer.start();
}

if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
// 文件相关的服务组件启动
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// 核心组件,通过Netty Client发送请求等
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 先不关注
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}

if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}

if (this.filterServerManager != null) {
this.filterServerManager.start();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
// 定时任务向NameServer注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
// 向NameServer注册
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}

服务注册

在注册的代码中会判断是否需要注册,如果需要注册,将会通过brokerOuterAPI核心组件发送请求到NameServer进行注册,Broker会把自己注册给所有的NameServer,然后对注册的结果进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// Topic配置相关的东西
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 判断是否需要注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 注册
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 通过brokerOuterAPI核心组件发送请求到NameServer进行注册,Broker会把自己注册给所有的NameServer
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
// 对注册的结果进行处理
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}

总结

Broker的启动过程简单总结:

1.初始化相关配置参数,包括Broker的配置、Netty Client、Netty Server的配置等;

2.创建核心组件BrokerController,初始化了一系列的组件,并创建Netty Server和一系列的后台任务线程池,Netty Server用来接收网络请求,线程池用来调度各种任务;

3.启动BrokerController中的一系列的组件,并通过BrokerOuterAPI向NameServer注册;

注:图片来自于儒猿技术窝-从 0 开始带你成为消息中间件实战高手

参考

儒猿技术窝:从 0 开始带你成为消息中间件实战高手

1234…6

shan

53 posts
12 tags
© 2022 shan
Powered by Hexo
|
Theme — NexT.Muse v5.1.4