Bootstrap

详细图解Netty Reactor启动全流程

本系列Netty源码解析文章基于 4.1.56.Final版本

大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。

在上篇文章中我们详细介绍了Netty服务端核心引擎组件以及的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:

现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。

我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组:,后,接下来就开始配置Netty服务端的启动辅助类了。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //创建主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
             .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上篇文章中我们对代码模板中涉及到的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。

其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:

  • Netty的核心引擎组件。通过配置。

  • Netty服务端使用到的Channel类型: ,通过配置。以及配置时用到的。用于设置底层JDK NIO Socket的一些选项。通过进行配置。

主ReactorGroup中的MainReactor管理的Channel类型为,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化,然后采用轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端进行绑定。

从ReactorGroup中的SubReactor管理的Channel类型为,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有上的IO事件。

  • 保存服务端和客户端对应中指定的。用于后续Channel向Reactor注册成功之后,初始化Channel里的pipeline。

不管是服务端用到的还是客户端用到的,每个都会有一个,中有多个用于编排处理对应上感兴趣的。

结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下的源码结构:

ServerBootstrap

的继承结构比较简单,继承层次的职责分工也比较明确。

主要负责对相关的配置进行管理,其中带是对的相关配置管理。中的负责管理的客户端相关配置存储在结构中。

父类则是主要负责对相关的配置进行管理,以及中的负责处理的服务端相关的配置管理。

1. 配置主从Reactor线程组

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor

public class ServerBootstrap extends AbstractBootstrap {

     //Main Reactor线程组
    volatile EventLoopGroup group;
    //Sub Reactor线程组
    private volatile EventLoopGroup childGroup;

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //父类管理主Reactor线程组
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

}

2. 配置服务端ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);

public class ServerBootstrap extends AbstractBootstrap {

    //用于创建ServerSocketChannel  ReflectiveChannelFactory
    private volatile ChannelFactory channelFactory;

    public B channel(Class channelClass) {
        return channelFactory(new ReflectiveChannelFactory(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    @Deprecated
    public B channelFactory(ChannelFactory channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

}

在向配置服务端的方法中,其实是创建了一个工厂实例,在Netty服务端启动的过程中,会通过这个去创建相应的实例。

我们可以通过这个方法来配置netty的IO模型,下面为在不同IO模型下的实现:

Reactor线程组在不同IO模型下的实现:

我们只需要将的这些核心接口对应的实现类改为对应的前缀,就可以轻松在Netty中完成对的切换。

2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory implements ChannelFactory {
    //NioServerSocketChannelde 构造器
    private final Constructor constructor;

    public ReflectiveChannelFactory(Class clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            //反射获取NioServerSocketChannel的构造器
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            //创建NioServerSocketChannel实例
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

从类的签名我们可以看出,这个工厂类是通过加的方式来创建对应的实例。

  • 泛型参数表示的是要通过工厂类创建的,这里我们初始化的是。

  • 在的构造器中通过的方式获取的构造器。

  • 在方法中通过构造器反射创建实例。

注意这时只是配置阶段,此时并未被创建。它是在启动的时候才会被创建出来。

3. 为NioServerSocketChannel配置ChannelOption

ServerBootstrap b = new ServerBootstrap();
//设置被MainReactor管理的NioServerSocketChannel的Socket选项
b.option(ChannelOption.SO_BACKLOG, 100)

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {

    //serverSocketChannel中的ChannelOption配置
    private final Map, Object> options = new LinkedHashMap, Object>();

    public  B option(ChannelOption option, T value) {
        ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {
            if (value == null) {
                options.remove(option);
            } else {
                options.put(option, value);
            }
        }
        return self();
    }
}

无论是服务端的还是客户端的它们的相关底层Socket选项配置全部存放于一个类型的数据结构中。

由于客户端是由中的来负责处理,所以涉及到客户端所有的方法和配置全部是以前缀开头。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

public class ServerBootstrap extends AbstractBootstrap {

   //客户端SocketChannel对应的ChannelOption配置
    private final Map, Object> childOptions = new LinkedHashMap, Object>();

    public  ServerBootstrap childOption(ChannelOption childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            if (value == null) {
                childOptions.remove(childOption);
            } else {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
}

相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。

public class ChannelOption extends AbstractConstant> {

    ..................省略..............

    public static final ChannelOption SO_BROADCAST = valueOf("SO_BROADCAST");
    public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
    public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF");
    public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF");
    public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR");
    public static final ChannelOption SO_LINGER = valueOf("SO_LINGER");
    public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG");
    public static final ChannelOption SO_TIMEOUT = valueOf("SO_TIMEOUT");

    ..................省略..............

}

4. 为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler

    //serverSocketChannel中pipeline里的handler(主要是acceptor)
    private volatile ChannelHandler handler;

    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }

向中的添加分为两种方式:

  • 显式添加的方式是由用户在main线程中通过的方式添加。如果需要添加多个,则可以通过向中进行添加。

关于后面笔者会有详细介绍,这里大家只需要知道是一种特殊的,用于初始化。适用于向pipeline中添加多个ChannelHandler的场景。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .handler(new ChannelInitializer() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(channelhandler1)
                      .addLast(channelHandler2)
                      
                      ......
                     
                      .addLast(channelHandler3);
                 }
             })

  • 隐式添加主要添加的就是的核心组件也就是下图中的,Netty中的实现为,本质上也是一种,主要负责在客户端连接建立好后,初始化客户端,在选取一个,将客户端 注册到中的上。

隐式添加是由Netty框架在启动的时候负责添加,用户无需关心。

在本例中,的中只有两个,一个由用户在外部显式添加的,另一个是由Netty框架隐式添加的。

其实我们在实际项目使用的过程中,不会向netty服务端添加额外的ChannelHandler,只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个只是为了向大家展示的配置方法。

5. 为客户端NioSocketChannel中的Pipeline配置ChannelHandler

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

    //socketChannel中pipeline中的处理handler
    private volatile ChannelHandler childHandler;

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

向客户端中的里添加完全是由用户自己控制显式添加,添加的数量不受限制。

由于在Netty的中,是由单个负责执行客户端中的,一个负责处理多个上的,如果中的添加的太多,就会影响执行其他上的,从而降低,降低吞吐量。

所以中的不易添加过多,并且不能再中执行耗时的业务处理任务。

在我们通过配置netty服务端启动信息的时候,无论是向服务端的pipeline中添加ChannelHandler,还是向客户端的pipeline中添加ChannelHandler,当涉及到多个ChannelHandler添加的时候,我们都会用到,那么这个究竟是何方圣神,为什么要这样做呢?我们接着往下看~~

ChannelInitializer

首先它继承于,它自己本身就是一个ChannelHandler,所以它可以添加到中。

其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。

那为什么不直接添加而是选择用呢?

这里主要有两点原因:

  • 前边我们提到,客户端是在服务端accept连接后,在服务端中被创建出来的。但是此时我们正处于配置阶段,服务端还没有启动,更没有客户端连接上来,此时客户端还没有被创建出来,所以也就没办法向客户端的pipeline中添加。

  • 客户端中里可以添加任意多个,但是Netty框架无法预知用户到底需要添加多少个,所以Netty框架提供了回调函数,使用户可以自定义的添加行为。

当客户端注册到对应的上后,紧接着就会初始化中的,此时Netty框架会回调执行用户自定义的添加逻辑。

public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        //当channelRegister事件发生时,调用initChannel初始化pipeline
        if (initChannel(ctx)) {
                 .................省略...............
        } else {
                 .................省略...............
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //此时客户单NioSocketChannel已经创建并初始化好了
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                 .................省略...............
            } finally {
                  .................省略...............
            }
            return true;
        }
        return false;
    }

    protected abstract void initChannel(C ch) throws Exception;
    
    .................省略...............
}

这里由netty框架回调的正是我们自定义的添加逻辑。

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

到此为止,Netty服务端启动所需要的必要配置信息,已经全部存入启动辅助类中。

接下来要做的事情就是服务端的启动了。

// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty服务端的启动

经过前面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。

如代码模板中的示例所示,Netty服务端的启动过程封装在函数中。

接下来我们看一下Netty服务端在启动过程中究竟干了哪些事情?

大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。

我们先来从netty服务端启动的入口函数开始我们今天的源码解析旅程:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        //校验Netty核心组件是否配置齐全
        validate();
        //服务端开始启动,绑定端口地址,接收客户端连接
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

   private ChannelFuture doBind(final SocketAddress localAddress) {
        //异步创建,初始化,注册ServerSocketChannel到main reactor上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {   

           ........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,               
             
        } else {
            //如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                   ........serverSocketChannel向Main Reactor注册成功后开始绑定端口...., 
            });
            return promise;
        }
    }

Netty服务端的启动流程总体如下:

  • 创建服务端并初始化。

  • 将服务端注册到中。

  • 注册成功后,开始初始化中的pipeline,然后在pipeline中触发channelRegister事件。

  • 随后由绑定端口地址。

  • 绑定端口地址成功后,向对应的中触发传播,在中向注册,开始等待客户端连接。服务端启动完成。

当netty服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor开始运转。

接下来,我们就来看下Netty源码是如何实现以上步骤的~~

1. initAndRegister

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //创建NioServerSocketChannel
            //ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel
            channel = channelFactory.newChannel();
            //初始化NioServerSocketChannel
            init(channel);
        } catch (Throwable t) {
            ..............省略.................
        }

        //向MainReactor注册ServerSocketChannel
        ChannelFuture regFuture = config().group().register(channel);

           ..............省略.................

        return regFuture;
    }

从函数命名中我们可以看出,这个函数主要做的事情就是首先创建,并对进行初始化,最后将注册到中。

1.1 创建NioServerSocketChannel

还记得我们在介绍启动辅助类配置服务端类型的时候提到的工厂类吗?

因为当时我们在配置启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体的时机。

所以Netty通过将要创建的的类型(通过泛型指定)以及 创建的过程()统统先封装在工厂类中。

通过,,的方式创建不同类型的

等待创建时机来临,我们调用保存在中的直接进行创建。

public class ReflectiveChannelFactory implements ChannelFactory {

    private final Constructor constructor;

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

下面我们来看下的构建过程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    //SelectorProvider(用于创建Selector和Selectable Channels)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    //创建JDK NIO ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

     //ServerSocketChannel相关的配置
    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

}

  • 首先调用创建JDK NIO 原生,这里调用了来创建JDK NIO 原生,我们在上篇文章中详细的介绍了相关内容,当时是用来创建中的。大家还记得吗??

  • 通过父类构造器设置感兴趣的,这里设置的是事件。并将JDK NIO 原生封装起来。

  • 创建的配置类,在配置类中封装了对的一些配置行为,以及JDK中的。以及创建接收数据用的分配器。

没什么重要的东西,我们这里也不必深究,它就是管理相关的配置,这里唯一需要大家注意的是这个用于接收数据用的AdaptiveRecvByteBufAllocator,我们后面在介绍Netty如何接收连接的时候还会提到。

的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是的核心信息,所以有必要了解一下。

在的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。

其中主要是对底层读写行为的封装和定义,比如accept接收客户端连接。这个我们后续会介绍到,这里我们并不展开。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
   //JDK NIO原生Selectable Channel
    private final SelectableChannel ch;
    // Channel监听事件集合 这里是SelectionKey.OP_ACCEPT事件
    protected final int readInterestOp;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //设置Channel为非阻塞 配合IO多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
            .............省略................
        }
    }
}

  • 封装由创建出来的JDK NIO原生。

  • 封装在创建时指定感兴趣的,对于来说感兴趣的为。

  • 设置JDK NIO原生为非阻塞模式, 配合IO多路复用模型。

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    //channel是由创建层次的,比如ServerSocketChannel 是 SocketChannel的 parent
    private final Channel parent;
    //channel全局唯一ID machineId+processId+sequence+timestamp+random
    private final ChannelId id;
    //unsafe用于封装对底层socket的相关操作
    private final Unsafe unsafe;
    //为channel分配独立的pipeline用于IO事件编排
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用于定义实现对Channel的底层操作
        unsafe = newUnsafe();
        //为channel分配独立的pipeline用于IO事件编排
        pipeline = newChannelPipeline();
    }
}

  • Netty中的是有层次的,这里的用来保存上一级的,比如这里的是顶级,所以它的。客户端是由创建的,所以它的。

  • 为分配全局唯一的。由机器Id(),进程Id(),序列号(),时间戳(),随机数()构成

   private DefaultChannelId() {
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }

  • 创建的底层操作类。这里创建的是。

为的一个内部接口,用于定义实现对Channel底层的各种操作,定义的操作行为只能由Netty框架的调用,用户线程禁止调用。

interface Unsafe {
        
        //分配接收数据用的Buffer
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        //服务端绑定的端口地址
        SocketAddress localAddress();
        //远端地址
        SocketAddress remoteAddress();
        //channel向Reactor注册
        void register(EventLoop eventLoop, ChannelPromise promise);

        //服务端绑定端口地址
        void bind(SocketAddress localAddress, ChannelPromise promise);
        //客户端连接服务端
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        //关闭channle
        void close(ChannelPromise promise);
        //读数据
        void beginRead();
        //写数据
        void write(Object msg, ChannelPromise promise);

    }

  • 为分配独立的用于IO事件编排。其实是一个类型的双向链表。头结点,尾结点。中包装着。

保存 ChannelHandler上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。

这里只是为了让大家简单理解的一个大致的结构,后面会写一篇文章专门详细讲解。

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

到了这里就创建完毕了,我们来回顾下它到底包含了哪些核心信息。

1.2 初始化NioServerSocketChannel

   void init(Channel channel) {
        //向NioServerSocketChannelConfig设置ServerSocketChannelOption
        setChannelOptions(channel, newOptionsArray(), logger);
        //向netty自定义的NioServerSocketChannel设置attributes
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        
        //获取从Reactor线程组
        final EventLoopGroup currentChildGroup = childGroup;
        //获取用于初始化客户端NioSocketChannel的ChannelInitializer
        final ChannelHandler currentChildHandler = childHandler;
        //获取用户配置的客户端SocketChannel的channelOption以及attributes
        final Entry, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        //向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的逻辑
        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    //LoggingHandler
                    pipeline.addLast(handler);
                }
                //添加用于接收客户端连接的acceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

  • 向设置。

  • 向netty自定义的设置

Netty自定义的类型均继承接口以及类,正是它们定义了。用于向添加用户自定义的一些信息。

这个的用处大有可为,Netty后边的许多特性都是依靠这个来实现的。这里先卖个关子,大家可以自己先想一下可以用这个做哪些事情?

  • 获取从Reactor线程组,以及用于初始化客户端的,,,这些信息均是由用户在启动的时候向添加的客户端配置信息。这里用这些信息来初始化。因为后续会在中接收客户端连接以及创建。

  • 向中的添加用于初始化的。

问题来了,这里为什么不干脆直接将添加到中,而是又使用到了呢?

其实原因有两点:

  • 为了保证地初始化,所以初始化的动作需要由进行,而当前线程是的 并Reactor线程。这里不能立即初始化。

  • 初始化中的动作,需要等到注册到对应的中才可以进行初始化,当前只是创建好了,但并未注册到上。

初始化中的时机是:当注册到之后,绑定端口地址之前。

前边在介绍配置时也用到了,还记得吗??

问题又来了,大家注意下方法,在该初始化回调方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor为什么不是直接添加而是封装成异步任务呢?

这里先给大家卖个关子,笔者会在后续流程中为大家解答~~~~~

此时中的结构如下图所示:

1.3 向Main Reactor注册NioServerSocketChannel

从获取主Reactor线程组,将注册到中。

ChannelFuture regFuture = config().group().register(channel);

下面我们来看下具体的注册过程:

1.3.1 主Reactor线程组中选取一个Main Reactor进行注册

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

    //获取绑定策略
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    //采用轮询round-robin的方式选择Reactor
    @Override
    public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }

Netty通过方法根据上篇文章提到的,从中选取一个Reactor进行注册绑定。之后生命周期内的所有都由这个 负责处理,如 等 IO 事件。

一个只能绑定到一个上,一个负责监听。

由于这里是向进行注册绑定,所以主要负责处理的是事件。

1.3.2 向绑定后的Main Reactor进行注册

向进行注册的行为定义在的父类中,印象模糊的同学可以在回看下上篇文章中的小节内容。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //注册channel到绑定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe负责channel底层的各种操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

通过中的执行底层具体的注册动作。

protected abstract class AbstractUnsafe implements Unsafe {

        /**
         * 注册Channel到绑定的Reactor上
         * */
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //EventLoop的类型要与Channel的类型一样  Nio Oio Aio
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            //在channel上设置绑定的Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 执行channel注册的操作必须是Reactor线程来完成
             *
             * 1: 如果当前执行线程是Reactor线程,则直接执行register0进行注册
             * 2:如果当前执行线程是外部线程,则需要将register0注册操作 封装程异步Task 由Reactor线程执行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   ...............省略...............
                }
            }
        }
}

  • 首先检查是否已经完成注册。如果以完成注册,则直接设置代表注册操作结果的为。

  • 通过方法验证Reactor模型是否与的类型匹配。对应于。

上篇文章我们介绍过 Netty对三种:的支持,用户可以通过改变Netty核心类的前缀轻松切换。方法目的就是需要保证和使用的是同一种。

  • 在中保存其绑定的。

  • 执行向注册的动作必须要确保是在中执行。

  • 如果当前线程是则直接执行注册动作

  • 如果当前线程不是,则需要将注册动作封装成异步任务,存放在中的中,等待执行。

当前执行线程并不是,而是用户程序的启动线程。

1.3.3 Reactor线程的启动

上篇文章中我们在介绍的创建过程中提到了一个构造器参数,它用于启动,类型为。

当时笔者向大家卖了一个关子~~

那么现在就到了为大家揭晓谜底的时候了~~

的启动是在向提交第一个异步任务的时候启动的。

Netty中的主Reactor线程组中的Main Reactor是在用户程序向提交用于注册的异步任务时开始启动。

   eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });

接下来我们关注下的

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        //当前线程是否为Reactor线程
        boolean inEventLoop = inEventLoop();
        //addTaskWakesUp = true  addTask唤醒Reactor线程执行任务
        addTask(task);
        if (!inEventLoop) {
            //如果当前线程不是Reactor线程,则启动Reactor线程
            //这里可以看出Reactor线程的启动是通过 向NioEventLoop添加异步任务时启动的
            startThread();

            .....................省略.....................
        }
        .....................省略.....................
    }

}

  • 首先将异步任务添加到中的中。

  • 判断当前线程是否为,此时当前执行线程为用户程序启动线程,所以这里调用启动。

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //定义Reactor线程状态
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

     //Reactor线程状态  初始为 未启动状态
    private volatile int state = ST_NOT_STARTED;

    //Reactor线程状态字段state 原子更新器
    private static final AtomicIntegerFieldUpdater STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

}

  • 初始化状态为,首先更新状态为

  • 启动

  • 启动失败的话,需要将状态改回

    //ThreadPerTaskExecutor 用于启动Reactor线程
    private final Executor executor;

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Reactor线程开始启动
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }
              
                ................省略..............
        }

这里就来到了类型的的用武之地了。

  • 的核心工作之前介绍过:,,。Netty将这些核心工作封装在方法中。

  • 将封装在异步任务中,提交给执行,线程至此开始工作了就。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        //启动Reactor线程
        threadFactory.newThread(command).start();
    }
}

此时已经启动,后面的工作全部都由这个来负责执行了。

而用户启动线程在向提交完的注册任务后,就逐步退出调用堆栈,回退到最开始的启动入口处。

此时中的任务队列中只有一个任务,启动后,会从任务队列中取出任务执行。

至此的注册工作正式拉开帷幕~~

1.3.5 register0

       //true if the channel has never been registered, false otherwise 
        private boolean neverRegistered = true;

        private void register0(ChannelPromise promise) {
            try {
                //查看注册操作是否已经取消,或者对应channel已经关闭
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //执行真正的注册操作
                doRegister();
                //修改注册状态
                neverRegistered = false;
                registered = true;
                //回调pipeline中添加的ChannelInitializer的handlerAdded方法,在这里初始化channelPipeline
                pipeline.invokeHandlerAddedIfNeeded();
                //设置regFuture为success,触发operationComplete回调,将bind操作放入Reactor的任务队列中,等待Reactor线程执行。
                safeSetSuccess(promise);
                //触发channelRegister事件
                pipeline.fireChannelRegistered();
                //对于服务端ServerSocketChannel来说 只有绑定端口地址成功后 channel的状态才是active的。
                //此时绑定操作作为异步任务在Reactor的任务队列中,绑定操作还没开始,所以这里的isActive()是false
                if (isActive()) {
                    if (firstRegistration) {
                        //触发channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ............省略.............
            }
        }

是驱动整个注册绑定流程的关键方法,下面我们来看下它的核心逻辑:

  • 首先需要检查的注册动作是否在外被取消了已经。检查要注册的是否已经关闭。如果已经关闭或者注册操作已经被取消,那么就直接返回,停止注册流程。

  • 调用方法,执行真正的注册操作。最终实现在的子类中,这个我们一会在介绍,先关注整体流程。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   /**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {
        // NOOP
    }

}

  • 当向注册完毕后,调用方法,触发回调pipeline中添加的ChannelInitializer的handlerAdded方法,在handlerAdded方法中利用前面提到的初始化。

初始化的时机是当向对应的注册成功后,在中利用进行初始化。

  • 设置为,并回调注册在上的方法,在回调方法中将封装成异步任务,提交到的中。等待的执行。

还记得这个在哪里出现的吗?它是在哪里被创建,又是在哪里添加的呢? 大家还有印象吗?回忆不起来也没关系,笔者后面还会提到

  • 通过在中触发。

中的被回调。

  • 对于Netty服务端来说, 只有后 channel的状态才是的。此时在上注册的回调方法中被作为异步任务提交到了的任务队列中,还执行。所以这里的是。

当执行完后,才会去执行。

下面我们来看下方法中这些的具体实现:

1.3.6 doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel注册到Selector后获得的SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略....................
            }
        }
    }

}

调用底层方法,将Netty中包装的注册到中的上。

简单介绍下方法参数的含义:

  • 表示将要向哪个进行注册。

  • 表示上感兴趣的,当对应的时,会返回对应的。

可以理解为在上的特殊表示形式, 中封装了感兴趣的,以及, 同时也封装了对应的以及注册的。最后还有一个重要的属性,可以允许我们在上附加一些自定义的对象。

  • 向中添加用户自定义的附加对象。

这里向中的注册的为,这个操作的主要目的是先获取到在中对应的,完成注册。当绑定操作完成后,在去向添加感兴趣的~~~。

同时通过方法将Netty自定义的(这里的指针)附着在的属性上,完成Netty自定义与JDK NIO 的关系绑定。这样在每次对进行轮询时,Netty 都可以从 返回的中获取到自定义的对象(这里指的就是)。

1.3.7 HandlerAdded事件回调中初始化ChannelPipeline

当注册到上的后,Netty通过调用开始回调中里的ChannelHandler的。

此时的结构如下:

此时中只有在初始化时添加的。

我们来看下中具体作了哪些事情~~

public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作完成后,需要将自身从pipeline中移除
                removeState(ctx);
            }
        }
    }

    //ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline
    //通过Set集合保存已经初始化的ChannelPipeline,避免重复初始化同一ChannelPipeline
    private final Set initMap = Collections.newSetFromMap(
            new ConcurrentHashMap());

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                     //初始化完毕后,从pipeline中移除自身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

    //匿名类实现,这里指定具体的初始化逻辑
    protected abstract void initChannel(C ch) throws Exception;

    private void removeState(final ChannelHandlerContext ctx) {
        //从initMap防重Set集合中删除ChannelInitializer
        if (ctx.isRemoved()) {
            initMap.remove(ctx);
        } else {
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    initMap.remove(ctx);
                }
            });
        }
    }
}

中的初始化逻辑比较简单明了:

  • 首先要判断必须是当前已经完成注册后,才可以进行的初始化。

  • 调用的匿名类指定的执行自定义的初始化逻辑。

        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

还记得在初始化时。方法中向中添加的吗?

  • 当执行完后,的初始化就结束了,此时就没必要再继续呆在,所需要将从中删除。

当初始化完时,此时的结构再次发生了变化:

此时中的任务队列结构变化为:

添加的任务是在初始化的时候向main reactor提交过去的。还记得吗?

1.3.8 回调regFuture的ChannelFutureListener

在本小节《Netty服务端的启动》的最开始,我们介绍了服务端启动的入口函数,在函数的最开头调用了方法用来创建并初始化,之后便会将注册到中。

注册的操作是一个异步的过程,所以在方法调用后返回一个代表注册结果的。

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {

    private ChannelFuture doBind(final SocketAddress localAddress) {
        //异步创建,初始化,注册ServerSocketChannel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            //如果注册完成,则进行绑定操作
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            //添加注册完成 回调函数
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                         ...............省略...............
                          // 注册完成后,Reactor线程回调这里
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

之后会向添加一个。在回调函数中开始发起。

那么这个回调函数在什么时候?什么地方发起的呢??

让我们在回到本小节的主题方法的流程中:

当调用方法完成向的注册后,紧接着会调用方法中触发回调中对进行初始化。

最后在方法中,开始回调注册在上的。

   protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
           logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
        }
   }

   @Override
    public boolean trySuccess() {
        return trySuccess(null);
    }

    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }

   private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                //回调注册在promise上的listeners
                notifyListeners();
            }
            return true;
        }
        return false;
    }

的逻辑比较简单,首先设置结果为,并且回调注册在上的。

需要提醒的是,执行方法,以及后边回调上的这些动作都是由执行的。

关于Netty中的后边我会在写一篇专门的文章进行分析,这里大家只需清楚大体的流程即可。不必在意过多的细节。

下面我们把视角切换到上的回调中,看看在注册完成后,Netty又会做哪些事情?

2. doBind0

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

}

这里Netty又将的操作封装成异步任务,提交给执行。

但是这里有一个问题,其实此时执行方法的线程正是,那为什么不直接在这里去执行,而是再次封装成异步任务提交给中的呢?

反正最终都是由执行,这其中又有什么分别呢?

经过上小节的介绍我们知道,方法的调用是由方法在将注册到之后,并且的已经初始化完毕后,通过方法回调过来的。

这个过程全程是由来负责执行的,但是此时方法并没有执行完毕,还需要执行后面的逻辑。

而绑定逻辑需要在注册逻辑执行完之后执行,所以在方法中会将封装成异步任务先提交给中保存,这样可以使立马从中返回,继续执行剩下的方法逻辑。

        private void register0(ChannelPromise promise) {
            try {
                ................省略............

                doRegister();
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                //触发channelRegister事件
                pipeline.fireChannelRegistered();

                if (isActive()) {
                     ................省略............
                }
            } catch (Throwable t) {
                  ................省略............
            }
        }

当执行完方法后,就会从中取出异步任务执行。

此时中的结构如下:

  • 会先取出位于队首的任务执行,这里是指向的中添加的异步任务。

此时中的结构如下:

  • 执行绑定任务。

3. 绑定端口地址

对的操作行为全部定义在。

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}

方法由子类实现。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

}

调用在中传播,触发回调中所有的。

事件在中的传播具有方向性:

  • 从开始逐个向后传播直到。

  • 则是反向传播,从开始反向向前传播直到。

只能被中的响应处理只能被中的响应处理

然而这里的在Netty中被定义为,所以它在中是反向传播。先从开始反向传播直到。

然而的核心逻辑也正是实现在中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //触发AbstractChannel->bind方法 执行JDK NIO SelectableChannel 执行底层绑定操作
            unsafe.bind(localAddress, promise);
        }

}

在回调方法中,调用里的操作类执行真正的绑定操作。

protected abstract class AbstractUnsafe implements Unsafe {

      @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            .................省略................

            //这时channel还未激活  wasActive = false
            boolean wasActive = isActive();
            try {
                //io.netty.channel.socket.nio.NioServerSocketChannel.doBind
                //调用具体channel实现类
                doBind(localAddress);
            } catch (Throwable t) {
                .................省略................
                return;
            }

            //绑定成功后 channel激活 触发channelActive事件传播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //pipeline中触发channelActive事件
                        pipeline.fireChannelActive();
                    }
                });
            }
            //回调注册在promise上的ChannelFutureListener
            safeSetSuccess(promise);
        }

        protected abstract void doBind(SocketAddress localAddress) throws Exception;
}

  • 首先执行子类具体实现的方法,通过执行底层的绑定操作。

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //调用JDK NIO 底层SelectableChannel 执行绑定操作
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

  • 判断是否为首次绑定,如果是的话将封装成异步任务放入中的中。

  • 执行,回调注册在上的。

还是同样的问题,当前执行线程已经是了,那么为何不直接触发中的事件而是又封装成异步任务呢??

因为如果直接在这里触发,那么就会去执行中的的。

这样的话就影响了的执行,注册在上的的回调。

到现在为止,Netty服务端就已经完成了绑定端口地址的操作,的状态现在变为。

最后还有一件重要的事情要做,我们接着来看中对处理。

3.2 channelActive事件处理

在Netty中定义为,所以它在中的传播为正向传播,从一直到为止。

在回调中需要触发向指定需要监听的~~。

这块的逻辑主要在中实现。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中继续向后传播channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 则自动触发read事件传播
            //在read回调函数中 触发OP_ACCEPT注册
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 则触发read事件传播
                channel.read();
            }
        }

        //AbstractChannel
        public Channel read() {
                //触发read事件
                pipeline.read();
                return this;
        }

       @Override
        public void read(ChannelHandlerContext ctx) {
            //触发注册OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
   }

  • 在中的回调中触发中的。

  • 当再次传播到时,触发方法的回调。在中调用底层操作类的方法向注册监听。

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {

     @Override
        public final void beginRead() {
            assertEventLoop();
            //channel必须是Active
            if (!isActive()) {
                return;
            }

            try {
                // 触发在selector上注册channel感兴趣的监听事件
                doBeginRead();
            } catch (final Exception e) {
               .............省略..............
            }
        }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //子类负责继承实现
    protected abstract void doBeginRead() throws Exception;

}

  • 断言判断执行该方法的线程必须是。

  • 此时已经完成端口地址的绑定操作,

  • 调用实现向注册监听事件

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel注册到Selector后获得的SelectKey
    volatile SelectionKey selectionKey;
    // Channel监听事件集合
    protected final int readInterestOp;

    @Override
    protected void doBeginRead() throws Exception {
      
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //添加OP_ACCEPT事件到interestOps集合中
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}

  • 前边提到在在向中的注册后,会获得一个。这里首先要获取这个。

  • 从中获取感兴趣的,当时在注册的时候设置为。

  • 将在初始化时设置的,设置到中的集合中。这样中的就开始监听集合中包含的了。

中主要监听的是。

流程走到这里,Netty服务端就真正的启动起来了,下一步就开始等待接收客户端连接了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?

此时Netty的结构如下:

总结

本文我们通过图解源码的方式完整地介绍了整个Netty服务端启动流程,并介绍了在启动过程中涉及到的相关的属性以及配置方式。的创建初始化过程以及类的继承结构。

其中重点介绍了向的注册过程以及的启动时机和的初始化时机。

最后介绍了绑定端口地址的整个流程。

上述介绍的这些流程全部是异步操作,各种回调绕来绕去的,需要反复回想下,读异步代码就是这样,需要理清各种回调之间的关系,并且时刻提醒自己当前的执行线程是什么?

好了,现在Netty服务端已经启动起来,接着就该接收客户端连接了,我们下篇文章见~~~~