【Netty】Netty启动流程

首先看一段Netty服务端启动的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// EventLoopGroup线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型
.localAddress(new InetSocketAddress(port)) // 设置端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置channelHandler
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("codec", new HttpServerCodec())
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并启动服务
ChannelFuture f = b.bind().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
  1. 创建了bossGroup和workerGroup,它们都是EventLoopGroup类型的
  2. 创建了ServerBootstrap启动类
  3. 为ServerBootstrap设置EventLoopGroup、Channel类型、端口和ChannelHandler
  4. ServerBootstrap绑定端口并且启动服务

以ServerBootstrap的bind方法为入口,看下Netty服务的启动过程。

ServerBootstrap

ServerBootstrap是AbstractBootstrap的子类,bind方法在AbstractBootstrap中实现,主要包含了两件事情:

  1. 创建Channel并完成Channel的初始化与注册
  2. 将channel进行端口绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { 
// ChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;

public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
// 调用了doBind方法
return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并且注册Channel
final ChannelFuture regFuture = initAndRegister();
// 获取Channel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 端口绑定
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 端口绑定
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

Channel的初始化与注册

  1. 创建Channel
  2. 初始化channel
  3. 注册Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1.通过channelFactory创建Channel
channel = channelFactory创建Channel.newChannel();
// 2. 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 3. 注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

创建Channel

ChannelFactory

ChannelFactory是一个接口,从名字上可以看出它是创建channel的一个工厂,通过DEBUG可以看出使用的ReflectChannelFactory进行channel创建的,进入ReflectChannelFactory,可以看到通过Constructor进行Channel创建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 设置constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}

@Override
public T newChannel() {
try {
// 通过反射创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

那么这个Constructor是在什么时候传入的呢,回到最开始的代码,可以看到为ServerBootstrap设置了channel类型,类型为NioServerSocketChannel:

1
2
3
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型

进入ServerBootstrap的channel()方法,可以看到创建了ReflectiveChannelFactory,并且设置了channel类型:

1
2
3
4
5
6
7
8
public abstract class AbstractBootstrap {
public B channel(Class<? extends C> channelClass) {
// 创建了ReflectiveChannelFactory类型的channelFactory,并传入了channel类型也就是上一步中设置的NioServerSocketChannel
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
}

总结:

1.ServerBootstrap会设置Channel的类型,选择创建什么类型的Channel。

2.使用ChannelFactory(ReflectChannelFactory实现),根据设置的channel类型,通过反射创建出NioServerSocketChannel,完成Channel的创建。

NioServerSocketChannel

NioServerSocketChannel中的构造函数可以看出是通过SelectorProvider创建channel的:

  1. 构建NioServerSocketChannel的时候指定SelectorProvider,就通过指定的SelectorProvider来创建
  2. 如果没有指定SelectorProvider,那么就通过调用SelectorProvider的provider()方法返回一个SelectorProvider
  3. 调用SelectorProvider的openServerSocketChannel完成channel的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

private final ServerSocketChannelConfig config;

// 默认的SelectorProvider
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* 构造函数,创建channel
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
* 构造函数,通过指定的SelectorProvider创建channel
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* 通过指定的channel创建NioServerSocketChannel
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 调用openServerSocketChannel创建Channel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
}
SelectorProvider

SelectorProvider是JDK中的一个抽象类,provider()方法中,有三种方式创建SelectorProvider:

  1. loadProviderFromProperty()方法

    调用JVM的系统配置,获取环境配置信息,判断是否配置了java.nio.channels.spi.SelectorProvider参数,如果配置了,就使用配置的SelectorProvider实现类进行创建。

  2. loadProviderAsService()方法

    通过SPI机制查找配置的ServiceLoader实现类。

  3. 使用DefaultSelectorProvider创建

    默认的SelectorProvider,不同的操作系统下JDK的代码也不一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class SelectorProvider { 
// 创建provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// 根据系统配置选择SelectorProvider
if (loadProviderFromProperty())
return provider;
// 根据SPI机制选择SelectorProvider
if (loadProviderAsService())
return provider;
// 调用DefaultSelectorProvider创建provider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

private static boolean loadProviderFromProperty() {
// 调用JVM的系统配置,获取环境配置信息,判断是否配置了java.nio.channels.spi.SelectorProvider
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
// 根据配置的SelectorProvider的类型创建SelectorProvider
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
// ...
// 省略了异常处理
}
}

private static boolean loadProviderAsService() {
// 通过SPI机制查找配置的ServiceLoader实现类
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
// ...
}
}
}
}
DefaultSelectorProvider

操作系统和版本的不同,DefaultSelectorProvider返回的SelectorProvider也不同,比如我的电脑是MAC操作系统,在create方法中返回的就是KQueueSelectorProvider:

1
2
3
4
5
6
7
8
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
// 创建DefaultSelectorProvider,MAC下使用的是KQueueSelectorProvider
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}

看下DefaultSelectorProvider在Linux操作系统下的create方法实现:

1
2
3
4
5
6
7
8
9
10
11
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
// EPoll
        return createProvider("sun.nio.ch.EPollSelectorProvider");
// Poll
    return new sun.nio.ch.PollSelectorProvider();
}

总结:根据操作系统和版本可以创建出不同类型的SelectorProvider,比如Linux下使用的是EPollSelectorProvider、SunOS下使用的是DevPollSelectorProvider。

初始化Channel

回到initAndRegister中的init方法,看下Channel的初始化过程,init方法是在ServerBootstrap中实现的,初始化工作主要是设置一些参数、为ChannelPipeline设置处理器等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
// 设置一些参数
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
// 创建ChannelPipeline
ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
// 添加处理器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

注册Channel

回到AbstractBootstrap的initAndRegister方法,可以看到是调用EventLoopGroup的register方法进行channel注册的,它会从EventLoopGroup选择一个EventLoop与channel进行绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public abstract AbstractBootstrapConfig<B, C> config();

final ChannelFuture initAndRegister() {
// ...
// 调用了AbstractBootstrapConfig的group方法,返回了EventLoopGroup,然后调用EventLoopGroup的register方法进行channel注册
ChannelFuture regFuture = config().group().register(channel);
// 省略了代码
}
}

// AbstractBootstrapConfig
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
// 返回bootstrap设置的EventLoopGroup
public final EventLoopGroup group() {
return bootstrap.group();
}
}

最终会调用到AbstractChannel的register方法进行channel的注册,中间的跳转过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1. MultithreadEventLoopGroup实现了register方法,
public abstract class MultithreadEventLoopGroup实现了 extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
// 继续向下调用,会进入SingleThreadEventLoop的register方法
return next().register(channel);
}
}

// 2.SingleThreadEventLoop
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

@Override
public ChannelFuture register(Channel channel) {
// 创建了DefaultChannelPromise,与当前Channel进行绑定
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 调用当前channel对象的unsafe,unsafe是AbstractNioChannel的一个内部类,AbstractNioChannel是AbstractChannel的一个子类,可以调用到父类的register方法,也就进入到了AbstractChannel的register方法
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel

AbstractChannel的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断是否已经注册过
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 设置当前channel的eventLoop,也就是将channel绑定到了EventLoop上
AbstractChannel.this.eventLoop = eventLoop;
// 当前线程是否是EventLoopGroup中的线程
if (eventLoop.inEventLoop()) {
// 注册channel
register0(promise);
} else {
try {
// 将注册任务放入EventLoop中执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 注册channel
register0(promise);
}
});
} catch (Throwable t) {
// 省略...
}
}
}

private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 进行注册,在AbstractNioChannel中实现
doRegister();
neverRegistered = false;
registered = true;
// 触发pipeline HandlerAdded
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
AbstractNioChannel

doRegister是在AbstractNioChannel中实现的:

  1. this.javaChannel()返回了ServerSocketChannel,具体类型是sun.nio.ch.ServerSocketChannelImpl,它是JDK中的类
  2. this.eventLoop().unwrappedSelector()返回了EventLoop中的Selector
  3. 调用了JDK底层的方法,将当前的channel绑定到了EventLoop中的Selector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class AbstractNioChannel extends AbstractChannel {
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
// 将Channel注册到Selecter上,javaChannel返回了NioServerSocketChannel,this.eventLoop().unwrappedSelector()返回了selector
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if(selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
}
// NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

@Override
protected ServerSocketChannel javaChannel() {
// 返回ServerSocketChannel,具体类型是sun.nio.ch.ServerSocketChannelImpl,调用了JDK底层的方法,将channel注册到了Selector上
return (ServerSocketChannel) super.javaChannel();
}
}

端口绑定

回到AbstractBootstrap的doBind0方法,这里实现了端口的绑定,跟着断点最终会进入到NioServerSocketChannel的bind方法,又是调用了JDK底层的方法进行端口的绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// doBind中调用了doBind0进行端口绑定
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 由EventLoop异步实现
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// channel与selector绑定的异步任务是否成功
if (regFuture.isSuccess()) {
// 调用了channel的bind方法进行绑定,会进入到AbstractChannel的bind方法
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}

// io.netty.channel.AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 调用了pipline的bind方法
return pipeline.bind(localAddress, promise);
}
}
// io.netty.channel.DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 会进入到AbstractChannelHandlerContext的bind方法
return tail.bind(localAddress, promise);
}
}
// io.netty.channel.AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 空值校验
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
// 判断是否是EventLoop中的线程
if (executor.inEventLoop()) {
// 端口绑定
next.invokeBind(localAddress, promise);
} else {
// 在EventLoop中异步处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
// 端口绑定
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// 端口绑定
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 进入到DefaultChannelPipeline的bind方法
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
// io.netty.channel.DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// unsage是AbstractNioMessageChannel类型的,bind方法在AbstractChannel中实现
unsafe.bind(localAddress, promise);
}
}
// io.netty.channel.AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 省略了部分代码
boolean wasActive = isActive();
try {
// 端口绑定,会进入到NioServerSocketChannel
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 绑定完成后,Channel处于活跃状态
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
}
// NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用了JDK底层的方法进行端口绑定
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
}

Accept事件监听

在AbstractChannel的bind方法中,端口完成绑定后会判断Channel的活跃状态,然后调用fireChannelActive触发Channle活跃的事件,跟着断点最终会进入到AbstractNioChannel的doBeginRead方法:

1
2
3
4
5
6
7
8
9
10
// 绑定完成后,Channel处于活跃状态
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发事件
pipeline.fireChannelActive();
}
});
}

NioServerSocketChannel的构造函数中,初始化了readInterestOp为SelectionKey.OP_ACCEPT,所以在doBeginRead中,OP_ACCEPT事件会被注册到Channel的事件集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//NioServerSocketChannel
public class NioServerSocketChannel{
/**
* 通过指定的channel创建NioServerSocketChannel
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
// 设置了设置了readInterestOp为SelectionKey.OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}

public abstract class AbstractNioChannel extends AbstractChannel {
protected final int readInterestOp;

@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 注册OP_ACCEPT事件到Channel
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}

Netty版本:4.1.42.Final

参考:

Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码分析

若地 :Netty 核心原理剖析与 RPC 实践