博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码分析系列——EventLoop
阅读量:6938 次
发布时间:2019-06-27

本文共 13919 字,大约阅读时间需要 46 分钟。

hot3.png

前言

EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面的源码分析中我们对它已经的使用非常熟悉了,凡事涉及到耗时的,可能阻塞线程的操作,netty都使用了eventLoop来异步执行,我们进入到它的内部来看看它是如何实现的。

如何使用

我们在之前的源码中,常见的用法如下示例所示:

public static void main(String[] args) throws InterruptedException {        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();//1        final CountDownLatch downLatch = new CountDownLatch(1);        eventLoopGroup.next().execute(new Runnable() {//2            public void run() {                for(int i=1; i<1000; i++){                    //TODO 只是模拟一个耗时操作。                }                System.out.println("耗时操作执行完毕");                downLatch.countDown();            }        });        downLatch.await();        System.out.println("等待异步线程耗时操作执行完毕");    }

1处构造一个EventLoopGroup事件起循环器组对象。该接口有多种实现类,而我们常用的Nio相关的实现类是NioEventLoopGroup,下面的类结构章节中会详细介绍它的类结构。

2处先调用了next()方法从事件循环器组获得下一个EventLoop对象,然后就可以调用它的execute方法,该方法传递一个Runnbale对象作为参数,最后会用线程异步执行其run方法;execute方法是继承自jdk自带的并发包中的Executor接口。

类结构

上图是netty中的类结构图,它是一个庞大的类继承体系。有篇文章介绍得非常不错,本文直接引用它,不做详细介绍。

其它的接口,抽象类和实现类的分析参考上述两篇文章,本文最终只分析NioEventLoop和NioEventLoopGroup两个类的实现。

NioEventLoop

介绍

NioEventLoop是基于jdk的nio包实现的事件循环器,它包含了一个线程,该线程会循环处理io事件和其它任务。

NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

I/O任务

即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。

非IO任务

添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

让我一起来看看NioEventLoop的关键代码。

关键属性

selector:这个就是jdk中nio的多路复用器Selector对象,熟悉nio类的人应该很清楚了,每个NioEventLoop会绑定一个selector对象。调用它的select()或者selectNow方法来读取io事件。

selectStrategy:选择策略SelectStrategy对象。它提供了一种控制select循环行为的能力,要么是select方法阻塞线程,要么是继续循环两种行为。

provider:SelectorProvider类的对象,它是Selector的工厂类,调用它的openSelector方法即可打开一个新的selector对象。

ioRatio:表示处理io事件时间占比,默认值是50,即各占一半时间。

关键方法

接下来我们看看它的一些关键方法。

run方法

@Override    protected void run() {        for (;;) {//1            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//2                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        select(wakenUp.getAndSet(false));//3                        if (wakenUp.get()) {                            selector.wakeup();                        }                    default:                        // fallthrough                }                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                if (ioRatio == 100) {//4                    try {                        processSelectedKeys();                    } finally {//5                        // Ensure we always run tasks.                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();//6                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//7                    }                }            } catch (Throwable t) {                handleLoopException(t);            }            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                handleLoopException(t);            }        }    }

run方法是当前线程执行的方法。

1处是一个for循环,没有退出条件。这和事件循环器的概念对应起来了,该线程会一直循环,检查selector有没有新的io事件,如果有则处理,或者执行提交到该事件循环器的其它task;为了避免线程一直循环空跑,浪费CPU资源,那么在获取新的io事件或者其它task要阻塞线程,避免一直循环,等待有新的io事件或者task的时候才唤醒线程。

2处计算出选择策略,如果是运算处的值是“CONTINUE"则会继续下一次循环。默认策略是:如果有普通任务待执行,使用selectNow();否则使用select(boolean oldWakenUp)。NIO的Selector有三个select()方法,它们的区别如下:

select() 阻塞直到有一个感兴趣的IO事件就绪

select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout
selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪

3处如果计算出的策略是“SELECT”,则调用select方法处理,后面会详细介绍该方法。

4处如果io事件比例是100%,则先调用方法processSelectedKeys处理选择到的key,我们下面会详细介绍该方法。

5处调用runAllTasks方法处理其它任务。确保始终能够执行到其它任务,但是没有传入参数,该方法后面也将继续介绍。

6处则是io事件比例小于100%。也是先调用processSelectedKeys方法处理io事件。

7处调用runAllTasks方法,但是传入了按照比例分配给任务的事件片,限制了任务执行的事件,通过ioTime * (100 - ioRatio) / ioRatio计算得到任务执行事件片。

select方法

private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        try {            int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//delayNanos是最近一个调度任务的到期时间,没有调度任务返回1秒。selectDeadLineNanos指可以进行select操作的截止时间点            for (;;) {//进入for循环。                // 四舍五入将select操作超时时间换算为毫秒单位                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {//超时时间小于等于0,则不再进行其它的select操作。                    if (selectCnt == 0) {//如果没有进行过select操作。                        selector.selectNow(); //调用非阻塞的selectNow后返回。                        selectCnt = 1;//计数器=1.                    }                    break;//退出for循环。                }                //如果任务提交的时候wakenUp的值是ture,则该任务没有机会执行。                // Selector#wakeup.因此我们再次检查任务队列,在执行select操作之间。                // 如果我们不检查的话,则该任务会被延迟到select操作超时之后才会被执行。                //它还可能会延迟到空闲超时之后执行。(如果pipline中有IdleStateHandler处理器。)                if (hasTasks() && wakenUp.compareAndSet(false, true)) {                    selector.selectNow();//队列中有任务,执行一次selectNow操作返回,以免影响任务执行。                    selectCnt = 1;                    break;                }                //调用带超时时间的select操作。                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;//select在循环中的操作次数加1.                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                    // - 有就绪的io事件,或                    // - 外部用户唤醒, 或                    // - 有待执行普通任务。 或                    // - 有待执行调度任务。                     // 如果为true,则跳出循环,执行其它任务。                    break;                }                if (Thread.interrupted()) {                    //线程被中断因此会导致重置选中的io事件key并且会中断,因此我们也会退出循环。                    //这很有很有可能是用户代码或者客户端包的bug,所以打印日志记录该问题。                    //bug详情参考:https://github.com/netty/netty/issues/2426                    if (logger.isDebugEnabled()) {                        logger.debug.....                    }                    selectCnt = 1;                    break;// 退出循环。                }                long time = System.nanoTime();                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                    //超时时间已到,重新恢复select操作次数为1.此处难道不应该退出循环吗?                    selectCnt = 1;                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                    //已经设置了阀值,如果select操作次数超过了阀值,则会重建一个新的selector对象,避免jdk的bug导致进入死循环,消耗过多的cpu.                    rebuildSelector();                    selector = this.selector;                    //用新的selecotr对象selectNow一次后退出循环。                    selector.selectNow();                    selectCnt = 1;                    break;                }                currentTimeNanos = time;            }            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                            selectCnt - 1, selector);                }            }        } catch (CancelledKeyException e) {            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);            }        }    }

本来select操作的代码不会这么复杂,主要是由于导致select()方法并不阻塞而直接返回且返回值为0,从而出现空轮询使CPU完全耗尽。Netty解决的办法是:对select返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。这里注意for()循环中大量使用了break,含有break的部分才是关键操作,其他部分(其实就只有一处)是为了解决JDK BUG。

processSelectedKeys

private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized(selectedKeys.flip());//使用优化处理的方法。        } else {            processSelectedKeysPlain(selector.selectedKeys());//普通处理key的方法。        }    }

我们回到了run方法中调用processSelectedKeys来处理有事件的key列表。

有优化和普通的处理方法,我们逐一来介绍。

processSelectedKeysPlain

 
private void processSelectedKeysPlain(Set
selectedKeys) { if (selectedKeys.isEmpty()) {//无有事件待处理的key列表,直接返回。 return; } Iterator
i = selectedKeys.iterator();//获得迭代器。 for (;;) {//循环迭代每个待处理的key。 final SelectionKey k = i.next();//获得一个有事件的key。 final Object a = k.attachment();//获得附加对象。 i.remove();//主动将自己从迭代器中删除。 if (a instanceof AbstractNioChannel) {//IO事件由netty框架负责处理。 processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask
task = (NioTask
) a;//IO事件由用户自定义任务处理。 processSelectedKey(k, task); } if (!i.hasNext()) {//无下一个有事件的key,则停止循环。 break; } if (needsToSelectAgain) {//需要再次选择。 selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }

这一部分代码功能就是遍历选择键,其中对选择键的处理有两种方式:Netty框架处理和用户自定义处理。这两种处理方式由register()方式决定:

// Netty框架处理 public ChannelFuture register(final Channel channel, final ChannelPromise promise);
// 用户自定义处理 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task);

接下来我们一起看看processSelectedKey代码的实现。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//得到对应的unsafe对象。        if (!k.isValid()) {//检查key是否合法,若不合法则进入异常处理。            final EventLoop eventLoop;            try {                eventLoop = ch.eventLoop();            } catch (Throwable ignored) {                return;            }            if (eventLoop != this || eventLoop == null) {                return;            }            //key不合法,直接关闭连接通道。            unsafe.close(unsafe.voidPromise());            return;        }        try {            int readyOps = k.readyOps();//准备就绪的操作。            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//处理客户端连接事件。                //从感兴趣的事件中将连接事件删除。不再监听连接事件。                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                //完成连接。                unsafe.finishConnect();            }            //先处理 OP_WRITE 写事件,可以写入队列缓存区。            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                //调用强制刷新 forceFlush 防止缓存去无足够内存。                ch.unsafe().forceFlush();            }            //读事件处理,其中readyOps == 0为对JDK Bug的处理, 防止死循环            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)                 unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }

可以看出对IO事件的具体处理,委托给NioUnsafe对象处理,由read()、forceFlush()、finishConnect()和close()方法处理具体的IO事件,具体的处理过程,我们将在分析NioUnsafe时讲解。

runAllTasks

该方法是在timeoutNanos时间内执行所有的普通任务。

protected boolean runAllTasks(long timeoutNanos) {        fetchFromScheduledTaskQueue();//从定时任务队列中读取任务。        Runnable task = pollTask();//拉取任务。        if (task == null) {//无任务则可以跳出。            afterRunningAllTasks();            return false;        }        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//执行任务截至时间。        long runTasks = 0;//执行任务的个数。        long lastExecutionTime;        for (;;) {//循环执行任务。            safeExecute(task);//安全执行任务,会捕获异常,避免因为执行一个任务异常导致程序中断。            runTasks ++;            //每隔64个任务检查一次时间,由于nanoTime操作非常耗时,因此不能每次都检查。64这个值当前是硬编码的,无法配置,可能会成为一个问题。            if ((runTasks & 0x3F) == 0) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                if (lastExecutionTime >= deadline) {                    break;                }            }            task = pollTask();//继续取下一个任务。            if (task == null) {//队列中无任务了则跳出循环。                lastExecutionTime = ScheduledFutureTask.nanoTime();                break;            }        }        //执行完所有任务后处理,继续执行尾部队列的任务。        afterRunningAllTasks();        this.lastExecutionTime = lastExecutionTime;//记录上次执行时间。        return true;    }

NioEventLoopGroup

NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:

@Override    protected EventExecutor newChild(ThreadFactory threadFactory, Object... args)                    throws Exception {        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],            ((SelectStrategyFactory) args[1]).newSelectStrategy(),             (RejectedExecutionHandler) args[2]);    }    public void setIoRatio(int ioRatio) {        for (EventExecutor e: this) {            ((NioEventLoop) e).setIoRatio(ioRatio);        }    }    public void rebuildSelectors() {        for (EventExecutor e: this) {            ((NioEventLoop) e).rebuildSelector();        }    }

该实现代码就是创建了一个NioEventLoop对象。

此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的。其实现也是依次设置各线程的状态,故不再列出。

转载于:https://my.oschina.net/ywbrj042/blog/889748

你可能感兴趣的文章
isinstance, type, issubclass
查看>>
[扫雷][游戏] 交互*2
查看>>
Python函数
查看>>
python 开发技巧(4)-- 用PyCharm实用技巧(我自己常用的)
查看>>
Path Sum II
查看>>
[转载]STM32高级定时器(TIM1和TIM8)、通用定时器(TIMx) 、 基本定时器(TIM6和TIM7)的区别...
查看>>
初入前端2
查看>>
python ----列表、字符串、元组之间转换小结
查看>>
python基础之socket编程
查看>>
NYOJ 45( 分治,大数)
查看>>
网络语音技术
查看>>
【酷熊科技】工作积累 ----------- C#自动添加using引用命名空间
查看>>
通达信公式-涨幅限制
查看>>
VMware-Linux(RedHat 6.7)增加数据库使用空间(Linux&Linux LVM)
查看>>
左神算法进阶班3_1构造数组的MaxTree
查看>>
SQL Server中的锁类型及用法(转载)
查看>>
CodeVS 1008 选数(DFS)
查看>>
SQL Server日常积累
查看>>
C++强制类型转换
查看>>
搭建FTP服务器 window7
查看>>