Netty 的核心组件

Channel

可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

回调

一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后者可以在适当的时候调用前者。回调在广泛的编程场景中都有应用,而且也是在操作完成后通知相关方最常见的方式之一。

Future

Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

ChannelFuture

Netty 提供了它自己的实现—— ChannelFuture,用于在执行异步操作的时候使用。 ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个ChannelFutureListener实例。 由ChannelFutureListener提供的通知机制消除了手动检查对应的操作是否完成的必要。 每个 Netty 的出站 I/O 操作都将返回一个ChannelFuture,也就是说,它们都不会阻塞。

事件和Channel Handler

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。 每个事件都可以被分发给ChannelHandler类中的某个用户实现的方法。

screenshot-20211017230453

screenshot-20211017230733

ChannelPipeline中的每个ChannelHandler将负责把事件转发到链中的下一个ChannelHandler。这些适配器类(及它们的子类)将自动执行这个操作,所以你可以只重写那些你想要特殊处理的方法和事件。

虽然ChannelInboundHandlerChannelOutboundHandlr 都扩展自ChannelHandler,但是Netty 能区分ChannelInboundHandler实现和ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个ChannelHandler 之间传递。

@Sharable注解Handler,标识一个Handler可以被多个Channel安全地共享。

发送消息的方式

在Netty中,有两种发送消息的方式。你可以直接写到Channel中,也可以写到和ChannelHandler相关联的ChannelHandlerContext对象中。前一种方式将会导致消息从ChannelPipeline的尾端开始流动,而后者将导致消息从ChannelPipeline中的下一个ChannelHandler开始流动。

写入Channel

通过ChannelHandlerContext获取到Channel的引用。调用Channel上的write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline

1
2
3
ChannelHandlerContext ctx = ...;
Channel channel = ctx.channel();
channel.write(...);
1
2
3
ChannelHandlerContext ctx = ...;
ChannelPipeline pipeline ctx.pipeline();
pipeline.write(...);

写入ChannelHandlerContext

1
2
ChannelHandlerContext ctx = ...;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

使用NIO

使用epoll 替代NIO,只需要将NioEventLoopGroup替换为EpollEventLoopGroup ,并且将NioServerSocketChannel.class替换为EpollServerSocketChannel.class即可。

Netty所提供的传输:

screenshot-20211017232908

应用程序的最佳传输:

screenshot-20211017233024

派生缓冲区

派生缓冲区为ByteBuf 提供了以专门的方式来呈现其内容的视图。

  • duplicate()
  • slice()
  • slice(int, int)
  • Unpooled.unmodifiableBuffer()
  • order(ByteOrder)
  • readSlice(int)

channel生命周期

  • ChannelUnregistered:Channel 已经被创建,但还未注册到EventLoop
  • ChannelRegistered:Channel 已经被注册到了EventLoop
  • ChannelActive:Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
  • ChannelInactive:Channel 没有连接到远程节点

Channel的状态模型:

screenshot-20211017233210

ChannelHandler 的生命周期

  • handlerAdded:当把ChannelHandler 添加到ChannelPipeline 中时被调用
  • handlerRemoved:当从ChannelPipeline 中移除ChannelHandler 时被调用
  • exceptionCaught:当处理过程中在ChannelPipeline 中有错误产生时被调用

Netty 定义了下面两个重要的ChannelHandler 子接口:

  • ChannelInboundHandler:处理入站数据以及各种状态变化
  • ChannelOutboundHandler:处理出站数据并且允许拦截所有的操作

ChannelInboundHandler的方法:

screenshot-20211017233409

一个更加简单的方式是使用SimpleChannelInboundHandler: 由于SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

出站

如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline 中的下一个 ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()

如果消息到达了实际的传输层,那么当它被写入时或者Channel 关闭时,都将被自动释放。 不仅要释放资源,还要通知ChannelPromise

1
promise.setSuccess();

否则可能会出现ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。

ChannelOutboundHandler的方法:

screenshot-20211017233533

ChannelPipeline 传播事件时,它会测试ChannelPipeline 中的下一个ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。

ChannelHandler的用于修改ChannelPipeline的方式:

  • addFirstaddBefore addAfteraddLast —> 将一个ChannelHandler添加到ChannelPipeline中
  • removereplace

ChannelPipeline的入站操作:fireXXXX

ChannelPipeline的出站操作,和ChannelOutboundHandler 基本一致。

ChannelPipeline 保存了与Channel 相关联的ChannelHandlerChannelPipeline 可以根据需要,通过添加或者删除ChannelHandler 来动态地修改; ChannelPipeline 有着丰富的API 用以被调用,以响应入站和出站事件。

ChannelHandlerContextChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的; 相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

Channel、ChannelPipeline、ChannelHandler以及ChannelHandlerContext之间的关系:

screenshot-20211017233654

要想调用从某个特定的ChannelHandler 开始的处理过程,必须获取到在该ChannelHandler 之前的ChannelHandler 所关联的ChannelHandlerContext。这个ChannelHandlerContext 将调用和它所关联的ChannelHandler 之后的ChannelHandler

异常处理

处理入站异常:exceptionCaught

处理出站异常

每个出站操作都将返回一个ChannelFuture。注册到ChannelFutureChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。

1
2
3
4
5
ChannelFuture future = channel.write(msg);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) {
    //...

几乎所有的ChannelOutboundHandler 上的方法都会传入一个ChannelPromise的实例。 作为ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:

  • ChannelPromise setSuccess();
  • ChannelPromise setFailure(Throwable cause);
1
2
3
4
5
6
7
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                //...

EventLoop

一个EventLoop 将由一个永远都不会改变的Thread 驱动。 提供了一个更加简单的执行体系架构,并且消除了在多个ChannelHandler 中进行同步的需要

任务调度

稍后执行:

1
2
3
4
5
6
ScheduledFuture<?> future = ch.eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("60 seconds later");
    }
}, 60, TimeUnit.SECONDS);

用于非阻塞传输(如NIO和AIO)的EventLoop分配方式:

screenshot-20211017234012

上图显示了一个EventLoopGroup,它具有3 个固定大小的EventLoop(每个EventLoop都由一个Thread 支撑)。在创建EventLoopGroup 时就直接分配了EventLoop(以及支撑它们的Thread),以确保在需要时它们是可用的。

EventLoopGroup 负责为每个新创建的Channel 分配一个EventLoop。在当前实现中,使用顺序循环的方式进行分配以获取一个均衡的分布。

一旦一个Channel 被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread

Bootstrap

Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中。

对于NIO 以及OIO 传输两者来说,都有相关的EventLoopGroupChannel 实现。不能混用具有不同前缀的组件,否则将会导致IllegalStateException

ServerBootstrapServerChannel

screenshot-20211017234159

Channel实现

ChannelInitializer 定义了下面的方法:

1
protect ed abstract void initChannel(C ch) throws Exception;

这个方法提供了一种将多个ChannelHandler 添加到一个ChannelPipeline 中的简便 方法。

Netty提供了各种DatagramChannel的实现,不再调用connect方法,而是只调用bind方法。

EmbeddedChannel:用于测试ChannelHandler。这个传输是一种特殊的Channel 实现。