在Channel注册的过程中,有一步是判断当前线程是否是EventLoop绑定的线程,如果不是,将会开启一个新线程,然后调用SingleThreadEventExecutor.this.run()方法,执行核心任务处理:
1 | // 开启线程 |
任务处理
准备知识
Channel和Selector关系
Select操作
(1)select():阻塞的操作,会一直阻塞直到至少一个channel有就绪的事件才返回,返回值为就绪的channel的数量。在以下情况下,可以中断阻塞:
- 线程被中断
- 调用了selector.wakeup()
(2) select(long time):和select()类似,会一直阻塞直到至少一个channel有就绪的事件才返回,但是它可以设置超时时间,如果超过了这个时间,也会返回。
(3)selectNow():非阻塞的操作,会立刻返回就绪的channel的数量。
NioEventLoop
进入到NioEventLoop的run方法,run方法中是一个无限循环:
先调用calculateStrategy方法选择策略,判断任务队列中是否有任务,如果有任务,执行非阻塞的selectNow,返回就绪的channel的数量(必然是大于等于0的,接下来就会跳出switch case走后面的流程),如果没有任务会返回返回SELECT策略:
如果是CONTINUE,重新执行策略选择
如果是BUSY_WAIT,Netty不支持
如果是SELECT,调用select方法,获取准备就绪的 I/O 事件
- 其他情况,进入default跳出策略选择,继续执行后面的代码
- 调用processSelectedKeys处理已经就绪的channel的I/O事件
- 处理任务队列,执行队列中的任务
1 | // 选择策略 |
Selector获取 I/O 事件
在select方法中,用来轮询注册的 I/O 事件,它首先记录了当前时间保存在currentTimeNanos变量里,然后开始for循环:
- 定时任务队列中的任务是按照开始执行任务的延迟时间从小到大排序的,首先会从定时任务队列中,获取距离当前时间最近一个待执行的任务,计算出还需要多久开始执行,如果距离开始执行任务的时间已经超过了0.5ms,说明定时任务执行的优先级比较高,所以会退出当前的循环,回到上一步的调用方法run()中,重新判断selector策略。
- 如果定时任务中没有需要立刻执行的任务,会继续往下走,由于在这个过程中,可能会产生新的任务,所以会调用hasTasks判断普通任务队列taskQueue和tailTasks任务队列是否为空,如果为空,并且将成功的设置了线程唤醒状态,先执行一次非阻塞的selectNow操作,然后中断循环,同样回到上一步的调用方法run()中,重新判断selector策略。
- 如果上两个条件都没有满足,则执行一次select(timeoutMillis)阻塞等待 I/O 事件,它在以下条件满足时会被中断:
- 有就绪的Channel
- seleter.wakeup()被调用
- 当前线程被中断
- 阻塞时间超时
- 出现空轮询时
- 在第3步结束后,会判断是否有就绪的事件、seleter.wakeup()被调用或者有任务队列中有任务,如果满足之一将中断循环,回到上一步,重新判断selector策略。
- 处理空轮询的问题,获取当前时间减去select阻塞的超时时间,判断是否大于等于currentTimeNanos(本次select方法开始执行的时间,也就是for循环开始前的时间)
- 如果大于等于currentTimeNanos,说明select阻塞是在超时情况下被打断的,这种属于正常情况,只需将selectCnt重置为1结束本次循环(因为要进行下一次循环,所以重置)。
- 如果小于currentTimeNanos,判断执行select的执行次数是否超过了阈值,如果超过则重建Selector,所以Netty通过在某个时间段内判断select的执行次数是否超过阈值来处理空轮询的问题。
1 | // 轮询注册的 I/O 事件 |
处理I/O事件
处理I/O事件是在processSelectedKeys方法中实现的:
- 首先判断selectedKeys是否为空,它是一个Set集合,如果不为空,走优化后的方法,如果为空,就从selector中获取selectedKeys进行处理,走未优化过的方法。
- 遍历selectedKeys,获取当前key的所有就绪操作,有以下几种类型:
- SelectionKey.OP_CONNECT :处理请求建立连接的事件
- SelectionKey.OP_WRITE : 处理可写事件
- SelectionKey.OP_READ :处理可读事件
- SelectionKey.OP_ACCEPT :处理接受连接请求的事件
1 | private SelectedSelectionKeySet selectedKeys; |
处理任务队列
- 从定时任务队列中合并任务到普通任务队列taskQueue中
- 在无限循环中,从taskQueue中不断获取任务开始执行,直到获取任务为空
- 进行收尾工作,处理tailTask中的任务
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
tailTask任务处理
1 | public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { |
Netty版本:4.1.42.Final
参考: