源码分析Netty:核心组件及启动过程分析
系列文章:
一 Netty核心组件
1.1 Channel
Channel(通道)是 NIO 基本的结构。JDK的NIO包中,有的介绍:
根据上述文档的描述,Channel是I/O操作的连接(关系),表示与能够执行一个或多个不同I/O操作(例如读取或写入)的实体(例如硬件设备、文件、网络套接字或程序组件)的开放连接。
通道是“打开”或“关闭”的。通道在创建时是开放的,一旦关闭它就会保持关闭。一旦通道关闭,对其调用I/O操作的任何尝试都将导致引发ClosedChannelException。通道是否打开可以通过调用其isOpen方法进行测试;通道通常是为了安全地进行多线程访问,如扩展和实现该接口的接口和类的规范所述。
1.2 Callback
回调。Callback已经是一种非常常见的异步实现方法,用于通知调用方操作已完成。可以简单理解为一个方法,提供给另一种方法作为引用,这样后者就可以在某个合适的时间调用前者。
1.3 Future
Future 提供了另外一种通知应用操作已经完成的方式。这个对象作为一个异步操作结果的占位符,它将在将来的某个时候完成并提供结果。
1.3.1 JDK的Future接口
JDK提供了java.util.concurrent.Future接口,Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future代表异步计算的结果,接口提供方法用于检查计算是否已完成,等待计算完成,然后取回计算结果。
计算结果只能通过get方法返回;如果有必要会堵塞直到它计算完成。
可以通过cancel方法取消。附加的方法可用于判断任务是否正常完成或者被取消。一旦计算完成,那么它不能被取消。
如果你想要使用Future 来取消,但是不提供一个可用的结果,你可以声明Futrue 的类型,但会返回null 作为一个基本任务的结果。
FutureTask 类是Futrue类的一个实现类,实现了Runnable接口,可以被Executor 执行。
1.3.2 Future提供的方法
1.3.2.1 cancel
boolean cancel(boolean mayInterruptIfRunning) 调用该方法将试图取消对任务的执行。如果任务已经完成了、已取消、无法取消这种尝试会失败。当该方法调用时任务还没有开始,方法调用成功而且任务将不会再执行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否执行此任务的线程应该以试图停止任务被中断。此方法返回后调用isDone 方法将返回 true 。后续调用 isCancelled 总是返回第一次调用的返回值。
1.3.2.2 isCancelled
boolean isCancelled() 如果任务在完成前被取消,将返回 true。
请注意任务取消是一种主动的行为。
1.3.2.3 isDone
boolean isDone() 任务已经结束,在任务完成、任务取消、任务异常的情况下都返回 true 。
1.3.2.4 get
V get() throws InterruptedException, ExecutionException 调用此方法会在获取计算结果前等待。一但计算完毕将立刻返回结果。它还有一个重载方法 V get(long timeout, TimeUnit unit) 在单位时间内没有返回任务计算结果将超时,任务将立即结束。
1.3.2 ChannelFuture
JDK的Future提供了一个异步获取执行结果的机制,但提供的方法有限,所以Netty自定义了Future接口。另外,Netty 中所有的 I/O 操作都是异步的,因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
ChannelFuture提供多个方法来允许一个或者多个 ChannelFutureListener 实例。这个回调方法 operationComplete() 会在操作完成时调用。事件监听者能够确认这个操作是否成功或者是错误。如果是后者,我们可以检索到产生的 Throwable。简而言之, ChannelFutureListener 提供的通知机制不需要手动检查操作是否完成的。
每个 Netty 的 outbound I/O 操作都会返回一个 ChannelFuture;这样就不会阻塞。这就是 Netty 所谓的“自底向上的异步和事件驱动”。ChannelFuture接口代码如下:
public interface ChannelFuture extends Future {
Channel channel();
ChannelFuture addListener(GenericFutureListener extends Future super Void>> var1);
ChannelFuture addListeners(GenericFutureListener... var1);
ChannelFuture removeListener(GenericFutureListener extends Future super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener... var1);
ChannelFuture sync() throws InterruptedException;
ChannelFuture syncUninterruptibly();
ChannelFuture await() throws InterruptedException;
ChannelFuture awaitUninterruptibly();
boolean isVoid();
}
1.4 Event与Handler
事件机制的重要组成部分,通过不同的事件来通知我们更改的状态或操作的状态。这使我们能够根据发生的事件触发适当的“行为”。行为可能包括日志记录、数据转换、流控制、应用程序逻辑等。
Netty是一个网络框架,事件很清晰的跟入站(inbound)或出站(outbound)的数据流相关。因为一些事件可能触发传入的数据或状态的变化包括:
活动或非活动连接
数据的读取
用户事件
错误
出站事件是由于在未来操作将触发一个动作。这些包括:
打开或关闭一个连接到远程
写或冲刷数据到 socket
每个事件都可以分配给用户实现处理程序类的方法。这说明了事件驱动的范例可直接转换为应用程序构建块。

上图是Netty的事流图(图片来自w3c School)。
Netty 的 ChannelHandler 是各种处理程序的基本抽象。想象下,每个处理器实例就是一个回调,用于执行对各种事件的响应。
在此基础之上,Netty 也提供了一组丰富的预定义的处理程序方便开箱即用。比如,各种协议的编解码器包括 HTTP 和 SSL/TLS。在内部,ChannelHandler 使用事件和 future 本身,创建具有 Netty 特性抽象的消费者。
1.5 Eventloop
EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。下图说明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系。

1.6 Netty异步模型
1.6.1 Future和Callback
Netty 的异步编程模型是建立在 Future 和 Callback 的概念上的。
拦截操作和转换入站/出站数据,只需要开发者提供回调或利用 Future 操作返回。这使得链操作简单、高效,促进编写可重用的、通用的代码。Netty 的一个主要设计目标是促进“关注点分离”,即把业务逻辑从网络基础设施程序中分离出来。
1.6.2 Selector, Event 和 Eventloop
Netty 通过触发事件从应用程序中抽象出 Selector,从而避免手写调度代码。EventLoop 分配给每个 Channel 来处理所有的事件,包括
注册感兴趣的事件
调度事件到 ChannelHandler
安排进一步行动
EventLoop 本身是由一个线程驱动,它给一个 Channel 处理所有的 I/O 事件,并且在 EventLoop 的生命周期内不会改变(参考上篇介绍的串行化设计概念)。这个简单而强大的线程模型消除可能对 ChannelHandler同步是否正确和合理性的关注,这样就可以专注于提供正确的回调逻辑。
二 Netty启动过程分析
2.1 一个Netty Server示例
public class HttpServer {
public static int DEFAULT_PORT = 8080;
public static void main(String[] args) throws Exception {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
// 多线程事件循环器
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
try {
ServerBootstrap b = new ServerBootstrap(); // 引导程序
b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(NioServerSocketChannel.class) // 指明新的Channel的类型
.childHandler(new HttpServerChannelInitializer()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();
System.out.println("HttpServer已启动,端口:" + port);
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
// 优雅关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
这里还有一个依赖,HttpServerChannelInitializer,代码如下:
public class HttpServerChannelInitializer extends ChannelInitializer {
public HttpServerChannelInitializer() {
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("codec", new HttpServerCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
ch.pipeline().addLast("serverHandler", new HttpServerHandler());
}
}
HttpServerHandler:
public class HttpServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
this.readRequest(msg);
String sendMsg;
String uri = msg.uri();
switch (uri) {
case "/":
sendMsg = "Netty HTTP Server
Welcome to waylau.com!
";
break;
case "/hi":
sendMsg = "Netty HTTP Server
Hello Word!
";
break;
case "/love":
sendMsg = "Netty HTTP Server
I Love You!
";
break;
default:
sendMsg = "Netty HTTP Server
I was lost!
";
break;
}
this.writeResponse(ctx, sendMsg);
}
private void readRequest(FullHttpRequest msg) {
//请求行信息
System.out.println(msg.method() + " " + msg.uri() + " " + msg.protocolVersion());
//请求头信息
for (String name : msg.headers().names()) {
System.out.println(name + ": " + msg.headers().get(name));
}
//消息体
System.out.println(msg.content().toString(CharsetUtil.UTF_8));
}
private void writeResponse(ChannelHandlerContext ctx, String msg) {
ByteBuf bf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, bf);
res.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.length());
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
我们可以直接执行这段代码,启动一个HTTP服务,端口作为参数输入,或者直接使用默认的8080端口。
2.2 配置分析
2.2.1 引导(BootStrap)
引导一个应用程序是指对它进行配置,并使它运行起来的过程——尽管该过程的具体细节可能并不如它的定义那样简单,尤其是对于一个网络应用程序来说。上面示例中的ServerBootstrap就是一个“引导”类,可以看出,ServerBootstrap是负责串联章节一中介绍过的Channel、EventLoopGroup、ChannelHandler、ChannelFuture 以及 参数配置等组件的。
引导类的层次结构:

2.2.2 引导过程
步骤如下:
1、设置 EventLoopGroup:提供了用于处理 Channel 事件的 EventLoop,在示例中我们定义了两个EventLoopGroup,boss和work;
2、channel(NioServerSocketChannel.class):指定要使用的Channel实现;
3、childHandler(new HttpServerChannelInitializer()):设置用于处理已被接受的子 Channel 的 I/O 及数
据的 ChannelInbound-Handler;
4、b.bind(port):通过配置好的ServerBootstrap的实例绑定该Channel
通过图片看引导过程如下:

2.2.3 ChannelInitializer
在Server启动时,在childHandler()方法中设置了一个自定义的HttpServerChannelInitializer()。在代码中,可以看到在SockerChannel中设置了以下三个内容:
ch.pipeline().addLast("codec", new HttpServerCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
ch.pipeline().addLast("serverHandler", new HttpServerHandler());
2.2.3.1 codec
编解码的处理,HttpServerCodec是netty针对http编解码的处理类,但是这些只能处理像http 的请求,也就是数据带在的http请求;
2.2.3.2 aggregator
用POST方式请求服务器的时候,对应的参数信息是保存在message body中的,如果只是单纯的用HttpServerCodec是无法完全的解析Http POST请求的,因为HttpServerCodec只能获取uri中参数,所以需要加上HttpObjectAggregator。它把HttpMessage和HttpContent聚合成为一个FullHttpRquest或者FullHttpRsponse,大致结构如下图所示:

2.2.3.3 serverHandler
示例使用了自定义的HttpServerHandler,在channelRead0方法中针对请求的uri进行处理,在生成响应(writeResponse)时使用对应的信息。
三 总结
本篇从实例出发,了解Netty核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握Netty的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。