张伦聪的技术博客 Research And Development

netty源码解析

2018-06-02

bio和nio和aio

关键字 解释 举例
同步 指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪 自己上街买衣服,自己亲自干这件事,别的事干不了。
异步 异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知(异步的特点就是通知) 告诉朋友自己合适衣服的尺寸,大小,颜色,让朋友委托去卖,然后自己可以去干别的事。(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS)
阻塞 所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待 状态, 直到有东西可读或者可写为止 去公交站充值,发现这个时候,充值员不在(可能上厕所去了),然后我们就在这里等待,一直等到充值员回来为止。(当然现实社会,可不是这样,但是在计算机里确实如此。)
非阻塞 非阻塞状态下, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待(非阻塞的特点就是不需要自己操作io,由内核自动完成) 银行里取款办业务时,领取一张小票,领取完后我们自己可以玩玩手机,或者与别人聊聊天,当轮我们时,银行的喇叭会通知,这时候我们就可以去了。

bio(同步阻塞IO)

同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

nio(同步非阻塞/异步阻塞IO)

同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。用户进程也需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问。

异步阻塞,此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄(如果从UNP的角度看,select属于同步操作。因为select之后,进程还需要读写数据),从而提高系统的并发性!

aio(异步非阻塞IO)

在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。

reactor和proactor模式(线程模型)

reactor与nio

几乎所有网络连接都需要:read=》decode=》compute=》encode=》send这五个步骤。

反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。服务器端启动一条单线程,用于轮询IO操作是否就绪,当有就绪的才进行相应的读写操作,这样的话就减少了服务器产生大量的线程,也不会出现线程之间的切换产生的性能消耗。Reactor模式主要包含下面几部分内容。

  • Handles :表示操作系统管理的资源,我们可以理解为fd。

  • Synchronous Event Demultiplexer :同步事件分离器,阻塞等待Handles中的事件发生。

  • Initiation Dispatcher :初始分派器,作用为添加Event handler(事件处理器)、删除Event handler以及分派事件给Event handler。也就是说,Synchronous Event Demultiplexer负责等待新事件发生,事件发生时通知Initiation Dispatcher,然后Initiation Dispatcher调用event handler处理事件。

  • Event Handler :事件处理器的接口

  • Concrete Event Handler :事件处理器的实际实现,而且绑定了一个Handle。因为在实际情况中,我们往往不止一种事件处理器,因此这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态类似。

传统非Reactor模式

这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。 server导致阻塞的原因:

  • serversocket的accept方法,阻塞等待client连接,直到client连接成功。
  • 线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。
  • 线程向socket outputstream写入数据,会阻塞直到全部数据写完。

经典Reactor模式

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。

  • Reactor 将I/O事件发派给对应的Handler
  • Acceptor 处理客户端连接请求
  • Handlers 执行非阻塞读/写

一般情况下,单核cpu,启一个线程,N核cpu,启N个线程,不断轮询查看系统io资源是否就绪,就绪启动相应线程进行read=》decode=》compute=》encode=》send操作。

多工作线程Reactor模式

考虑到工作线程的复用,将工作线程设计为线程池。使用线程池多线程处理业务逻辑。经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。因此可以引入多线程,并行处理多个读/写操作。

一般情况下,单核cpu,启一个线程,N核cpu,启N个线程,不断轮询查看系统io资源是否就绪,就绪系统进行read或send(事件处理器)操作,再从线程池启动线程decode/compute/encode处理业务逻辑。

多个Reactor模式

mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。mainReactor只有一个,负责响应client的连接请求,并建立连接,它使用一个NIO Selector;subReactor可以有一个或者多个,每个subReactor都会在一个独立线程中执行,并且维护一个独立的NIO Selector。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

一般情况下,单核cpu,启一个线程,N核cpu,启N个线程,不断轮询查看系统io资源是否就绪,就绪系统进行read或send(事件处理器)操作,再从线程池启动线程decode/compute/encode处理业务逻辑。

proactor与aio

运用于异步I/O操作,Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备.

netty核心组件

  • Bootstrap和ServerBootstrap:Netty应用程序通过设置bootstrap引导类来完成,该类提供了一个用于应用程序网络层配置的容器。Bootstrap服务端的是ServerBootstrap,客户端的是Bootstrap。
  • Channel:Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。
  • ChannelHandler:ChannelHandler 支持很多协议,并且提供用于数据处理的容器,ChannelHandler由特定事件触发, 常用的一个接口是ChannelInboundHandler,该类型处理入站读数据(socket读事件)。
  • ChannelPipeline:ChannelPipeline 提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动。每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的。
  • EventLoop:EventLoop 用于处理 Channel 的 I/O 操作。一个单一的 EventLoop通常会处理多个 Channel事件。一个 EventLoopGroup 可以含有多于一个的 EventLoop 和 提供了一种迭代用于检索清单中的下一个。
  • ChannelFuture:Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法注册了一个 ChannelFutureListener ,当操作完成时,可以被通知( 不管成功与否) 。

helloworld

public static void main(String[] args) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap boot = new ServerBootstrap();
        boot.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .localAddress(8080)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoServerHandler());
                }
            });

        // Start the server.
        ChannelFuture f = boot.bind().sync();
        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(in.toString(CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Channel实现概览

在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。

那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

在Netty中,ChannelEvent是数据或者状态的载体,例如传输的数据对应MessageEvent,状态的改变对应ChannelStateEvent。当对Channel进行操作时,会产生一个ChannelEvent,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。

例如,一个数据最开始是一个MessageEvent,它附带了一个未解码的原始二进制消息ChannelBuffer,然后某个Handler将其解码成了一个数据对象,并生成了一个新的MessageEvent,并传递给下一步进行处理。

ChannelPipeline的主流程

Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline接口包含了两个重要的方法:sendUpstream(ChannelEvent e)和sendDownstream(ChannelEvent e),就分别对应了Upstream和Downstream。

对应的,ChannelPipeline里包含的ChannelHandler也包含两类:ChannelUpstreamHandler和ChannelDownstreamHandler。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法:ChannelUpstreamHandler.handleUpstream和ChannelDownstreamHandler.handleDownstream。

Netty官方的javadoc里有一张图(ChannelPipeline接口里),非常形象的说明了这个机制(我对原图进行了一点修改,加上了ChannelSink,因为我觉得这部分对理解代码流程会有些帮助):

在一条“流”里,一个ChannelEvent并不会主动的”流”经所有的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream产生,并交给下一个Handler处理。也就是说,每个Handler接收到一个ChannelEvent,并处理结束后,如果需要继续处理,那么它需要调用sendUp(Down)stream新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有Handler没有执行。这个机制可以保证最大的灵活性,当然对Handler的先后顺序也有了更严格的要求。

netty启动流程

Netty的启动是从ServerBootstrap开始的,ServerBootstrap中有2个EventLoop,一个是bossEventLoop,另一个是workerEventLoop,前者负责处理连接请求,后者负责请求连接的读写事件。

EventLoopGroup是EventLoop的管理容器,二者的关系类似于线程池于线程的关系。

public NioEventLoopGroup() {
    this(0);
}
public NioEventLoopGroup(int nThreads) {
    this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
                         final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

// MultithreadEventLoopGroup构造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    // EFAULT_EVENT_LOOP_THREADS 为CPU个数的2倍。
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        // 创建默认线程工厂类
        threadFactory = newDefaultThreadFactory();
    }

    /**
     * children:EventExecutor数组,保存eventLoop。
     * chooser:从children中选取一个eventLoop的策略。
     */
    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        // 2的n次幂可以直接和children.length-1相&
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        // 否则只能取余children.length
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // newChild就是NioEventLoopGroup中的newChild方法,children类型为NioEventLoop
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
}

每个EventLoop都会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册和ServerSocket绑定操作等。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }

    // 这里的openSelector是不是有一丝丝熟悉,标准Java NIO用法
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
}

@SuppressWarnings("deprecation")
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,
        RejectedExecutionHandler rejectedHandler) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }

    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                // 对应NioEventLoop.run方法,该方法不断进行select和从tasksQueue中取出task去执行
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error(
                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 创建LinkedBlockingQueue,队列容量最大为maxPendingTasks
    taskQueue = newTaskQueue();
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

到这里为止,NioEventLoop就初始化完毕了,先总结一下,NioEventLoop的初始化流程所做的主要工作如下:

  • 创建Selector,初始化一个线程,并在线程内部执行NioEventLoop类的run方法,该线程不会立马开始执行。
  • 使用LinkedBlockingQueue初始化taskQueue,taskQueue类型为LinkedBlockingQueue。
  • 初始化丢弃策略。

Netty启动流程完成了EventLoopGroup和EventLoop的初始化之后,就该开始ServerBootstrap的初始化和bind操作了,其中重要的步骤是进行bind操作。

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

// Netty启动重要的流程都在这里了
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 步骤1 initAndRegister操作
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 步骤2 执行doBind0操作
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

总结:

  • 创建并初始化bossEventLoopGroup和workerEventLoopGroup,其中会伴随着NioEventLoop的初始化。
  • 初始化ServerBootstrap,并启动或者初始化相关的类,比如Channel、ChannelHandler、UnSafe等。
  • 进行bind操作,主要register/bind这些操作都是通过任务形式听交给EventLoop然后由它来进行触发调用的。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏博主

下一篇 55. 跳跃游戏

留言