【Netty】Pipeline

Pipeline的初始化

ChannelPipeline是一个双向链表,在Channel构建的过程中,创建了ChannelPipeline,初始化了头结点head和尾节点tail:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 创建pipeline
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}

// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 尾节点
tail = new TailContext(this);
// 头结点
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}
}
HeadContext

HeadContext是头结点,它既实现了ChannelOutboundHandler又实现了ChannelInboundHandler,它比TailContext多了一个Unsafe类型的变量:

1
2
3
4
5
6
7
8
9
10
11
// HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// 比tailContext多了一个Unsafe类型的变量
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}

HeadContext继承关系:

TailContext

TailContext是尾节点,它实现了ChannelInboundHandler,InboundHandler是入站处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    // TailContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 构造函数初始化
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// 一个掩码,代表需要那些方法需要执行,后面会说到
this.executionMask = mask(handlerClass);.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}

TailContext继承关系:

Pipeline添加ChannelHandler

ChannelHandler分为入站ChannelInboundHandler 和出站ChannelOutboundHandler 两种处理器,看下他们的继承关系:

ChannelInboundHandler

ChannelOutboundHandler

ChannelInboundHandler和ChannelOutboundHandler都是ChannelHandler的子类。

ChannelInboundHandler是入站处理器,如果以服务端为角度,那么入站指的就是从客户端发送过来数据(数据流向从客户端到服务端),触发从头结点HeadContext开始事件传播,一直到尾节点TailContext结束。

ChannelOutboundHandler是出站处理器,与ChannelInboundHandler相反,同样以服务器为角度,数据流向从服务端到客户端就是出站,它会从TailContext开始,一直到HeadContext结束。

向Pipeline双向链表中添加ChannelHandler处理器

在创建ServerBootstrap的时候,有一步是为ServerBootstrap设置ChannelHandler,通过addLast方法向Pipeline中添加Handler:

1
2
3
4
5
6
7
8
9
10
11
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 设置channel类型
.localAddress(new InetSocketAddress(port)) // 设置端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置channelHandler
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("handler", new HttpServerHandler());// 自定义Handler
}
})

DefaultChannelPipeline中实现了addLast方法:

  1. 检查是否重复添加Handler
  2. 创建Pipeline节点
  3. 将节点加入到Pipeline双向链表中
  4. 回调handlerAdded方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
// 加锁
synchronized (this) {
// 检查是否重复添加
checkMultiplicity(handler);
// 创建Pipeline节点,是DefaultChannelHandlerContext类型的对象
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到Pipline中
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回调 handlerAdded
callHandlerAdded0(newCtx);
return this;
}
}

检查重复添加

检查重复添加的时候会判断当前的Handler是否是非共享(共享指的是可以添加到多个ChannelPipeline中)的,并且added状态为已添加,如果满足这个条件将抛出异常,防止重复添加。

如果未添加过,将added状态置为true表示已添加,addLast方法是通过synchronized加锁的,由此也看出加锁的必要性,如果不加锁,多线程情况下很可能导致重复添加Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 是否是非共享的handler并且已添加过
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
// 设置为已添加
h.added = true;
}
}

创建节点

创建节点之前首先会过滤handler的名称,如果名称为空,就新创建一个,如果不为空检查是否冲突,接下来开始创建节点:

1
2
3
4
// 过滤名称
name = filterName(name, handler);
// 创建节点
newCtx = newContext(group, name, handler);
过滤名称
  1. 如果名称为空,就生成一个,生成规则是简单类名+ #0 ,如果名称冲突,就将#后面的0改成1,然后数字递增直到名称不重复,如 HeadContext 的默认名称为 “DefaultChannelPipeline$HeadContext#0”
  2. 如果名称不为空,校验名称是否重复,如果重复将抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
// 如果名称为空,为handler生成一个名称
return generateName(handler);
}
// 校验名称是否重复
checkDuplicateName(name);
return name;
}
// 生成名称
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
// 获取handler的Class
Class<?> handlerType = handler.getClass();
// 从缓存中获取
String name = cache.get(handlerType);
// 如果缓存中没有,就生成一个,生成规则是简单类名+#0
if (name == null) {
name = generateName0(handlerType);
// 加到缓存中
cache.put(handlerType, name);
}
// 如果有重复的
if (context0(name) != null) {
// 去掉#后面的数字
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
// 从1开始递增,直到没有重复的
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}

private static String generateName0(Class<?> handlerType) {
// 通过简单类名+#0
return StringUtil.simpleClassName(handlerType) + "#0";
}

private void checkDuplicateName(String name) {
// 如果名称有相同的抛出异常
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}

private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
// 遍历链表的所有节点,判断名称是否有相同的
while (context != tail) {
// 如果名称相同
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
生成节点

节点的类型是DefaultChannelHandlerContext

1
2
3
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext

DefaultChannelHanlerContext将自定义的Handler封装起来,作为链表中的节点。

DefaultChannelHanlerContext是ChannelHandlerContext 的子类,ChannelHandlerContext用于保存ChannelHandler上下文,在Pipeline中传递,它包含了 ChannelHandler 生命周期的所有事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

private final ChannelHandler handler;

DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 调用父类构造函数
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
// 设置pipeline
this.pipeline = pipeline;
this.executor = executor;
// 设置掩码
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}

DefaultChannelHandlerContext继承关系:

将节点加入pipeline

addLast0是尾插法,将生成的节点加入到了链表中尾节点之前,因为tail节点要永远指向尾节点,新节点只能加在尾节点之前:

1
2
3
4
5
6
7
8
9
10
11
12
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 记录尾结点的前一个节点
AbstractChannelHandlerContext prev = tail.prev;
// 当前节点的前一个节点设置为原本尾结点的前一个节点
newCtx.prev = prev;
// 当前节点的下一个节点设置为尾节点
newCtx.next = tail;
// 原本尾结点前的那个节点的下一个节点设置为当前节点
prev.next = newCtx;
// 尾节点的前一个节点设置为当前节点
tail.prev = newCtx;
}
回调handlerAdded

在Handler完成添加之后,会触发Handler的handlerAdded事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用handler的HandlerAdded方法
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}

// 省略代码
}
}
}
// AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
// 触发handler的handlerAdded事件
handler().handlerAdded(this);
}
}
}

Pipeline事件传播

ChannelPipeline分为入站ChannelInboundHandler 和出站ChannelOutboundHandler 两种处理器。

入站(InBound):事件传播方向为head - > tail

出站(Outbount):事件传播方向为tail - > head

Inbound 事件传播

在NioEventLoop处理任务的过程中,如果有就绪的I/O事件,判断事件类型,如果是读事件或者有连接请求,调用NioUnsafe的read方法进行处理:

1
2
3
4
// 如果是读事件或者有连接请求
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}

NioMessageUnsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
// 获取pipeline
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// ChannelRead事件传播
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

// 省略了代码
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// 触发ChannelRead事件
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
// AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 是否是EventLoop绑定线程
if (executor.inEventLoop()) {
// 触发ChannelRead事件
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用handler的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
// 向下传播ChannelRead事件
fireChannelRead(msg);
}
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 找到下一个节点继续执行它的invokeChannelRead,向下传播ChannelRead事件
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}

}
// DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 继续传播ChannelRead,会回到AbstractChannelHandlerContext的fireChannelRead,触发下一个节点的ChannelRead
ctx.fireChannelRead(msg);
}
}
}

ChannelInitializer

在Channel初始化过程中,有一步是为pipeline添加ChannelHandler处理器,它添加了一个ChannelInitializer类型的Handler,ChannelInitializer可以为pipeline添加其他的处理器,从代码上可以看到它实现的initChannel方法向Pipeline添加了一个ServerBootstrapAcceptor处理器,用来接收连接。

为什么要使用ChannelInitializer?因为初始化的时候,Channel还未注册到Selector上,所以只能使用ChannelInitializer等待注册完成后,再触发它的initChannel,向Pipeline添加ServerBootstrapAcceptor处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
// 获取pipeline
ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
// 添加ChannelHandler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 通过EventLoop执行任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加ServerBootstrapAcceptor处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是ServerBootstrap的内部类,可以看到它继承了ChannelInboundHandlerAdapter,在InBound事件传播过程中,可以知道如果如果有可读事件或者有连接事件时,会触发channelRead的事件传播,那么看一下ServerBootstrapAcceptor的channelRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 注意这里,将channel注册到了childGroup中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
}

Netty版本:4.1.42.Final

参考:

若地 :Netty 核心原理剖析与 RPC 实践

Netty(十一)源码解析 之 Channel 的 inBound 与 outBound 处理器

深入理解 Netty-Pipeline组件