前言
EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面的源码分析中我们对它已经的使用非常熟悉了,凡事涉及到耗时的,可能阻塞线程的操作,netty都使用了eventLoop来异步执行,我们进入到它的内部来看看它是如何实现的。
如何使用
我们在之前的源码中,常见的用法如下示例所示:
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();//1 final CountDownLatch downLatch = new CountDownLatch(1); eventLoopGroup.next().execute(new Runnable() {//2 public void run() { for(int i=1; i<1000; i++){ //TODO 只是模拟一个耗时操作。 } System.out.println("耗时操作执行完毕"); downLatch.countDown(); } }); downLatch.await(); System.out.println("等待异步线程耗时操作执行完毕"); }
1处构造一个EventLoopGroup事件起循环器组对象。该接口有多种实现类,而我们常用的Nio相关的实现类是NioEventLoopGroup,下面的类结构章节中会详细介绍它的类结构。
2处先调用了next()方法从事件循环器组获得下一个EventLoop对象,然后就可以调用它的execute方法,该方法传递一个Runnbale对象作为参数,最后会用线程异步执行其run方法;execute方法是继承自jdk自带的并发包中的Executor接口。
类结构
上图是netty中的类结构图,它是一个庞大的类继承体系。有篇文章介绍得非常不错,本文直接引用它,不做详细介绍。
其它的接口,抽象类和实现类的分析参考上述两篇文章,本文最终只分析NioEventLoop和NioEventLoopGroup两个类的实现。
NioEventLoop
介绍
NioEventLoop是基于jdk的nio包实现的事件循环器,它包含了一个线程,该线程会循环处理io事件和其它任务。
NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
I/O任务
即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。非IO任务
添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。
让我一起来看看NioEventLoop的关键代码。
关键属性
selector:这个就是jdk中nio的多路复用器Selector对象,熟悉nio类的人应该很清楚了,每个NioEventLoop会绑定一个selector对象。调用它的select()或者selectNow方法来读取io事件。
selectStrategy:选择策略SelectStrategy对象。它提供了一种控制select循环行为的能力,要么是select方法阻塞线程,要么是继续循环两种行为。
provider:SelectorProvider类的对象,它是Selector的工厂类,调用它的openSelector方法即可打开一个新的selector对象。
ioRatio:表示处理io事件时间占比,默认值是50,即各占一半时间。
关键方法
接下来我们看看它的一些关键方法。
run方法
@Override protected void run() { for (;;) {//1 try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//2 case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));//3 if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) {//4 try { processSelectedKeys(); } finally {//5 // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys();//6 } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//7 } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
run方法是当前线程执行的方法。
1处是一个for循环,没有退出条件。这和事件循环器的概念对应起来了,该线程会一直循环,检查selector有没有新的io事件,如果有则处理,或者执行提交到该事件循环器的其它task;为了避免线程一直循环空跑,浪费CPU资源,那么在获取新的io事件或者其它task要阻塞线程,避免一直循环,等待有新的io事件或者task的时候才唤醒线程。
2处计算出选择策略,如果是运算处的值是“CONTINUE"则会继续下一次循环。默认策略是:如果有普通任务待执行,使用selectNow();否则使用select(boolean oldWakenUp)。NIO的Selector有三个select()方法,它们的区别如下:
select() 阻塞直到有一个感兴趣的IO事件就绪
select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪
3处如果计算出的策略是“SELECT”,则调用select方法处理,后面会详细介绍该方法。
4处如果io事件比例是100%,则先调用方法processSelectedKeys处理选择到的key,我们下面会详细介绍该方法。
5处调用runAllTasks方法处理其它任务。确保始终能够执行到其它任务,但是没有传入参数,该方法后面也将继续介绍。
6处则是io事件比例小于100%。也是先调用processSelectedKeys方法处理io事件。
7处调用runAllTasks方法,但是传入了按照比例分配给任务的事件片,限制了任务执行的事件,通过ioTime * (100 - ioRatio) / ioRatio计算得到任务执行事件片。
select方法
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//delayNanos是最近一个调度任务的到期时间,没有调度任务返回1秒。selectDeadLineNanos指可以进行select操作的截止时间点 for (;;) {//进入for循环。 // 四舍五入将select操作超时时间换算为毫秒单位 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {//超时时间小于等于0,则不再进行其它的select操作。 if (selectCnt == 0) {//如果没有进行过select操作。 selector.selectNow(); //调用非阻塞的selectNow后返回。 selectCnt = 1;//计数器=1. } break;//退出for循环。 } //如果任务提交的时候wakenUp的值是ture,则该任务没有机会执行。 // Selector#wakeup.因此我们再次检查任务队列,在执行select操作之间。 // 如果我们不检查的话,则该任务会被延迟到select操作超时之后才会被执行。 //它还可能会延迟到空闲超时之后执行。(如果pipline中有IdleStateHandler处理器。) if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow();//队列中有任务,执行一次selectNow操作返回,以免影响任务执行。 selectCnt = 1; break; } //调用带超时时间的select操作。 int selectedKeys = selector.select(timeoutMillis); selectCnt ++;//select在循环中的操作次数加1. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - 有就绪的io事件,或 // - 外部用户唤醒, 或 // - 有待执行普通任务。 或 // - 有待执行调度任务。 // 如果为true,则跳出循环,执行其它任务。 break; } if (Thread.interrupted()) { //线程被中断因此会导致重置选中的io事件key并且会中断,因此我们也会退出循环。 //这很有很有可能是用户代码或者客户端包的bug,所以打印日志记录该问题。 //bug详情参考:https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug..... } selectCnt = 1; break;// 退出循环。 } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //超时时间已到,重新恢复select操作次数为1.此处难道不应该退出循环吗? selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //已经设置了阀值,如果select操作次数超过了阀值,则会重建一个新的selector对象,避免jdk的bug导致进入死循环,消耗过多的cpu. rebuildSelector(); selector = this.selector; //用新的selecotr对象selectNow一次后退出循环。 selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } }
本来select操作的代码不会这么复杂,主要是由于导致select()方法并不阻塞而直接返回且返回值为0,从而出现空轮询使CPU完全耗尽。Netty解决的办法是:对select返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。这里注意for()循环中大量使用了break,含有break的部分才是关键操作,其他部分(其实就只有一处)是为了解决JDK BUG。
processSelectedKeys
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip());//使用优化处理的方法。 } else { processSelectedKeysPlain(selector.selectedKeys());//普通处理key的方法。 } }
我们回到了run方法中调用processSelectedKeys来处理有事件的key列表。
有优化和普通的处理方法,我们逐一来介绍。
processSelectedKeysPlain
private void processSelectedKeysPlain(SetselectedKeys) { if (selectedKeys.isEmpty()) {//无有事件待处理的key列表,直接返回。 return; } Iterator i = selectedKeys.iterator();//获得迭代器。 for (;;) {//循环迭代每个待处理的key。 final SelectionKey k = i.next();//获得一个有事件的key。 final Object a = k.attachment();//获得附加对象。 i.remove();//主动将自己从迭代器中删除。 if (a instanceof AbstractNioChannel) {//IO事件由netty框架负责处理。 processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask task = (NioTask ) a;//IO事件由用户自定义任务处理。 processSelectedKey(k, task); } if (!i.hasNext()) {//无下一个有事件的key,则停止循环。 break; } if (needsToSelectAgain) {//需要再次选择。 selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
这一部分代码功能就是遍历选择键,其中对选择键的处理有两种方式:Netty框架处理和用户自定义处理。这两种处理方式由register()方式决定:
// Netty框架处理 public ChannelFuture register(final Channel channel, final ChannelPromise promise);
// 用户自定义处理 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task);
接下来我们一起看看processSelectedKey代码的实现。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//得到对应的unsafe对象。 if (!k.isValid()) {//检查key是否合法,若不合法则进入异常处理。 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } //key不合法,直接关闭连接通道。 unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps();//准备就绪的操作。 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//处理客户端连接事件。 //从感兴趣的事件中将连接事件删除。不再监听连接事件。 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); //完成连接。 unsafe.finishConnect(); } //先处理 OP_WRITE 写事件,可以写入队列缓存区。 if ((readyOps & SelectionKey.OP_WRITE) != 0) { //调用强制刷新 forceFlush 防止缓存去无足够内存。 ch.unsafe().forceFlush(); } //读事件处理,其中readyOps == 0为对JDK Bug的处理, 防止死循环 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
可以看出对IO事件的具体处理,委托给NioUnsafe对象处理,由read()、forceFlush()、finishConnect()和close()方法处理具体的IO事件,具体的处理过程,我们将在分析NioUnsafe时讲解。
runAllTasks
该方法是在timeoutNanos时间内执行所有的普通任务。
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue();//从定时任务队列中读取任务。 Runnable task = pollTask();//拉取任务。 if (task == null) {//无任务则可以跳出。 afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//执行任务截至时间。 long runTasks = 0;//执行任务的个数。 long lastExecutionTime; for (;;) {//循环执行任务。 safeExecute(task);//安全执行任务,会捕获异常,避免因为执行一个任务异常导致程序中断。 runTasks ++; //每隔64个任务检查一次时间,由于nanoTime操作非常耗时,因此不能每次都检查。64这个值当前是硬编码的,无法配置,可能会成为一个问题。 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask();//继续取下一个任务。 if (task == null) {//队列中无任务了则跳出循环。 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //执行完所有任务后处理,继续执行尾部队列的任务。 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime;//记录上次执行时间。 return true; }
NioEventLoopGroup
NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:
@Override protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } public void setIoRatio(int ioRatio) { for (EventExecutor e: this) { ((NioEventLoop) e).setIoRatio(ioRatio); } } public void rebuildSelectors() { for (EventExecutor e: this) { ((NioEventLoop) e).rebuildSelector(); } }
该实现代码就是创建了一个NioEventLoop对象。
此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的。其实现也是依次设置各线程的状态,故不再列出。