【Netty】NioEventLoop任务处理

在Channel注册的过程中,有一步是判断当前线程是否是EventLoop绑定的线程,如果不是,将会开启一个新线程,然后调用SingleThreadEventExecutor.this.run()方法,执行核心任务处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 开启线程
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 这里将NioEventLoop的thread成员变量设置为当前获取到的线程,也是实现了当前线程与NioEventLoop的绑定
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行核心任务,在NioEventLoop中实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略了部分代码
}
}
});
}

任务处理

准备知识

Channel和Selector关系

Select操作

(1)select():阻塞的操作,会一直阻塞直到至少一个channel有就绪的事件才返回,返回值为就绪的channel的数量。在以下情况下,可以中断阻塞:

  • 线程被中断
  • 调用了selector.wakeup()

(2) select(long time):和select()类似,会一直阻塞直到至少一个channel有就绪的事件才返回,但是它可以设置超时时间,如果超过了这个时间,也会返回。

(3)selectNow():非阻塞的操作,会立刻返回就绪的channel的数量。

NioEventLoop

进入到NioEventLoop的run方法,run方法中是一个无限循环:

  1. 先调用calculateStrategy方法选择策略,判断任务队列中是否有任务,如果有任务,执行非阻塞的selectNow,返回就绪的channel的数量(必然是大于等于0的,接下来就会跳出switch case走后面的流程),如果没有任务会返回返回SELECT策略:

    • 如果是CONTINUE,重新执行策略选择

    • 如果是BUSY_WAIT,Netty不支持

    • 如果是SELECT,调用select方法,获取准备就绪的 I/O 事件

    • 其他情况,进入default跳出策略选择,继续执行后面的代码
  2. 调用processSelectedKeys处理已经就绪的channel的I/O事件
  3. 处理任务队列,执行队列中的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// 选择策略
public interface SelectStrategy {

int SELECT = -1;

int CONTINUE = -2;

int BUSY_WAIT = -3;
}
// NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
// 无限循环
for (;;) {
try {
try {
// select策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT: // 不支持

case SelectStrategy.SELECT:
// 获取准备就绪的 I/O 事件,这里先将wakenUp置为false
select(wakenUp.getAndSet(false));
// 如果wakenUp为true代表已经执行了select操作
if (wakenUp.get()) {
// 中断select阻塞操作
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// ioRatio用来控制处理I/O时间与处理任务队列时间的占比
if (ioRatio == 100) {
try {
// 处理 I/O 事件
processSelectedKeys();
} finally {
// 处理任务队列
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

int selectNow() throws IOException {
try {
// 非阻塞的select,返回值是已经就绪的channel的数量,如果没有返回0
return selector.selectNow();
} finally {
// wakenUp如果状态为true
if (wakenUp.get()) {
// wakeup()方法用来中断select阻塞操作
selector.wakeup();
}
}
}

}

// DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
// 判断选择策略
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
//如果有任务,调用selectSupplier的get方法执行非阻塞的selectNow操作,否则返回SELECT策略
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

Selector获取 I/O 事件

在select方法中,用来轮询注册的 I/O 事件,它首先记录了当前时间保存在currentTimeNanos变量里,然后开始for循环:

  1. 定时任务队列中的任务是按照开始执行任务的延迟时间从小到大排序的,首先会从定时任务队列中,获取距离当前时间最近一个待执行的任务,计算出还需要多久开始执行,如果距离开始执行任务的时间已经超过了0.5ms,说明定时任务执行的优先级比较高,所以会退出当前的循环,回到上一步的调用方法run()中,重新判断selector策略。
  2. 如果定时任务中没有需要立刻执行的任务,会继续往下走,由于在这个过程中,可能会产生新的任务,所以会调用hasTasks判断普通任务队列taskQueue和tailTasks任务队列是否为空,如果为空,并且将成功的设置了线程唤醒状态,先执行一次非阻塞的selectNow操作,然后中断循环,同样回到上一步的调用方法run()中,重新判断selector策略。
  3. 如果上两个条件都没有满足,则执行一次select(timeoutMillis)阻塞等待 I/O 事件,它在以下条件满足时会被中断:
    • 有就绪的Channel
    • seleter.wakeup()被调用
    • 当前线程被中断
    • 阻塞时间超时
    • 出现空轮询时
  4. 在第3步结束后,会判断是否有就绪的事件、seleter.wakeup()被调用或者有任务队列中有任务,如果满足之一将中断循环,回到上一步,重新判断selector策略。
  5. 处理空轮询的问题,获取当前时间减去select阻塞的超时时间,判断是否大于等于currentTimeNanos(本次select方法开始执行的时间,也就是for循环开始前的时间)
    • 如果大于等于currentTimeNanos,说明select阻塞是在超时情况下被打断的,这种属于正常情况,只需将selectCnt重置为1结束本次循环(因为要进行下一次循环,所以重置)。
    • 如果小于currentTimeNanos,判断执行select的执行次数是否超过了阈值,如果超过则重建Selector,所以Netty通过在某个时间段内判断select的执行次数是否超过阈值来处理空轮询的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 // 轮询注册的 I/O 事件
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
// 获取当前时间
long currentTimeNanos = System.nanoTime();
// delayNanos返回定时任务队列中最近需要执行的任务,距离执行开始的时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
// 开始for循环
for (;;) {
// 判断定时任务中距离开始执行任务的时间是否超过0.5ms
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超过执行时间0.5ms以上,说明定时任务队列中的任务需要立刻执行
if (timeoutMillis <= 0) {
// 判断是否执行过select操作,如果没有,会先进行一次非阻塞的selectNow操作
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
// 退出循环
break;
}

// 判断普通任务队列taskQueue和tailTasks任务队列是否为空,如果不为空,尝试将wakenUp设置为true,表示已退出select阻塞操作
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// 非阻塞的selectNow操作
selector.selectNow();
selectCnt = 1;
// 中断循环
break;
}
// select 阻塞操作获取等待获取 I/O 事件
int selectedKeys = selector.select(timeoutMillis);
// 记录执行select的次数
selectCnt ++;
// 如果有就绪的事件、调用了seleter.wakeup()方法或者有任务队列中有任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 如果当前线程被中断
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 获取当前时间
long time = System.nanoTime();
// 当前时间减去select阻塞的超时时间,如果大于等于currentTimeNanos(select方法开始执行的时间),说明阻塞时间超时情况下被打断的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 重置selectCnt置为1
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 如果执行select的执行次数超过了阈值
// 重建Selector
selector = selectRebuildSelector(selectCnt);
// 重置
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

/**
* 返回最近一个待执行任务距离任务开始执行的时间
*/
protected long delayNanos(long currentTimeNanos) {
// 从定时任务队列中取出位于队头的任务,任务是按照延迟时间从小到大排序的
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
// 返回距离任务开始执行的时间
return scheduledTask.delayNanos(currentTimeNanos);
}

// io.netty.channel.SingleThreadEventLoop中实现的
@Override
protected boolean hasTasks() {
// 判断普通任务队列taskQueue和tailTasks任务队列是否为空
return super.hasTasks() || !tailTasks.isEmpty();
}

/**
* io.netty.util.concurrent.SingleThreadEventExecutor
*/
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}

处理I/O事件

处理I/O事件是在processSelectedKeys方法中实现的:

  1. 首先判断selectedKeys是否为空,它是一个Set集合,如果不为空,走优化后的方法,如果为空,就从selector中获取selectedKeys进行处理,走未优化过的方法。
  2. 遍历selectedKeys,获取当前key的所有就绪操作,有以下几种类型:
    • SelectionKey.OP_CONNECT :处理请求建立连接的事件
    • SelectionKey.OP_WRITE : 处理可写事件
    • SelectionKey.OP_READ :处理可读事件
    • SelectionKey.OP_ACCEPT :处理接受连接请求的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private SelectedSelectionKeySet selectedKeys;

private void processSelectedKeys() {
if (selectedKeys != null) {
// 优化后的
processSelectedKeysOptimized();
} else {
// 普通的
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
// 遍历selectedKeys,处理已经就绪的selectedKeys
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 如果是AbstractNioChannel类型
if (a instanceof AbstractNioChannel) {
// 处理I/O事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 是否需要重新轮询
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
// 获取当前key的所有就绪操作
int readyOps = k.readyOps();
// 处理连接,客户端请求建立连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//
unsafe.finishConnect();
}
// 处理可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理可读事件和连接请求接受
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

处理任务队列

  1. 从定时任务队列中合并任务到普通任务队列taskQueue中
  2. 在无限循环中,从taskQueue中不断获取任务开始执行,直到获取任务为空
  3. 进行收尾工作,处理tailTask中的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { 
protected boolean runAllTasks(long timeoutNanos) {
// 将定时任务队列中的任务放到taskQueue
fetchFromScheduledTaskQueue();
// 从taskQueue中获取一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// 循环处理任务
for (;;) {
// 执行任务
safeExecute(task);

runTasks ++;

if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 取出下一个任务
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
// 从定时任务队列中获取任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
// 将定时任务添加到taskQueue
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
// io.netty.util.concurrent.AbstractScheduledEventExecutor
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
// 如果定时任务还未到截止执行时间,先不处理
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
// 走到这里,说明已经过了截止时间,需要紧急处理
// 取出需要执行的定时任务
scheduledTaskQueue.remove();
return scheduledTask;
}
}
// AbstractEventExecutor
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
protected static void safeExecute(Runnable task) {
try {
// 运行任务
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
}

tailTask任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected void afterRunningAllTasks() {
// 处理tailTask的任务
this.runAllTasksFrom(this.tailTasks);
}
}

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从队列中获取任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 获取下一个任务
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
}

Netty版本:4.1.42.Final

参考:

Netty(九)源码解析 之 NioEventLoop 任务的执行

若地 :Netty 核心原理剖析与 RPC 实践

深入理解 NioEventLoop启动流程

Java NIO之Selector(选择器)