编解码器

解码器

  • 将字节解码为消息——ByteToMessageDecoderReplayingDecoder
  • 将一种消息类型解码为另一种——MessageToMessageDecoder

screenshot-20211024223038

io.netty.handler.codec.LineBasedFrameDecoder:这个类在Netty 内部也有使用,它使用了行尾控制字符(\n 或者\r\n)来解析消息数据;

io.netty.handler.codec.http.HttpObjectDecoder:一个HTTP 数据的解码器。在io.netty.handler.codec 子包下面,你将会发现更多用于特定用例的编码器和解码器实现。

Netty 提供了TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

编码器

MessageToByteEncoderencode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为ByteBuf 的出站消息。该ByteBuf 随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

MessageToMessageEncoder:出站数据将如何从一种消息编码为另一种

编解码器

ByteToMessageCodec:我们需要将字节解码为某种形式的消息,可能是POJO,随后再次对它进行编码。ByteToMessageCodec 将为我们处理好这一切,因为它结合了ByteToMessageDecoder 以及它的逆向——MessageToByteEncoder

MessageToMessageCodec:将一种消息格式转换为另外一种消息格式,可以在一个单个的类中实现该转换的往返过程

CombinedChannelDuplexHandler:这个类充当了ChannelInboundHandlerChannelOutboundHandler的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。

Web协议

SslChannelHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class SslChannelInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean startTls;

    public SslChannelInitializer(SslContext context, boolean startTls) {
        // 传入要使用的SslContext
        this.context = context;
        // 如果设置为true,第一个写入的消息将不会被加密(客户端应该设置为true)
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        // 对于每个SslHandler实例,都从SslContext获取一个新的SSLEngine
        SSLEngine engine = context.newEngine(ch.alloc());
        // 将SslHandler作为第一个ChannelHandler添加到ChannelPipeline中
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}

screenshot-20211024223612

HttpObjectAggregator:聚合HTTP消息

HTTP压缩:

  • HttpContentDecompressor (客户端使用)
  • HttpContentCompressor (服务端使用)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;

    public HttpPipelineInitializer(boolean isClient) {
        this.isClient = isClient;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 方法一:分别添加编码器与解码器
        if (isClient) {
            pipeline.addLast("decoder", new HttpResponseDecoder());
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }

        // 方法二:直接添加编解码器
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }

        // 聚合HTTP消息
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));

        // HTTP压缩
        if (isClient) {
            pipeline.addLast("decompressor", new HttpContentDecompressor());
        } else {
            pipeline.addLast("compressor", new HttpContentCompressor());
        }
    }
}

使用HTTPS:启用HTTPS 只需要将SslHandler 添加到ChannelPipelineChannelHandler 组合中。

WebSocket:要想向你的应用程序中添加对于WebSocket 的支持,你需要将适当的客户端或者服务器WebSocketChannelHandler 添加到ChannelPipeline 中。这个类将处理由WebSocket 定义的称为帧的特殊消息类型。如表11-3 所示,WebSocketFrame 可以被归类为数据帧或者控制帧。

screenshot-20211024223650

1
2
// 如果被请求的端点是/websocket 则处理该升级握手
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));

空闲的连接,心跳消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        // 当连接空闲时间太长时,将会触发一个IdleStateEvent事件
        ch.pipeline().addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        ch.pipeline().addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));

        // 在ChannelInboundHandler中年重写userEventTriggered方法来处理IdleStateEvent事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // 发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

screenshot-20211024223735

解码基于分隔符的协议和基于长度的协议

基于分隔符的协议

基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(通常被称为帧)的开头或者结尾

screenshot-20211024223801

基于长度的协议

基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。

LengthFieldBasedFrameDecoder 提供了几个构造函数来支持各种各样的头部配置情 况。

写大型数据

NIO 的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在Netty 的核心中,所以应用程序所有需要做的就是使用一个FileRegion 接口的实现,其在Netty 的API 文档中的定义是:“通过支持零拷贝的文件传输的Channel 来发送的文件区域。”只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。

1
2
3
FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
channel.writeAndFlush(region);

在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。

1
2
pipeline.addLast(new ChunkedWriteHandler());
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));

序列化数据

JBoss Marshalling 是一种可选的序列化API,它修复了在JDK 序列化API 中所发现的许多问题,同时保留了与java.io.Serializable 及其相关类的兼容性,并添加了几个新的可调优参数以及额外的特性,所有的这些都是可以通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。

Protobuf序列化:

screenshot-20211024223900

网络协议

硬实时服务质量(QoS),硬实时服务质量是保证计算结果将在指定的时间间隔内被递交。

WebSocketServerProtocolHandler

Netty 的WebSocketServerProtocolHandler 处理了所有委托管理的WebSocket 帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler 将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler 则将会被移除。

WebSocket 协议升级完成之后,WebSocketServerProtocolHandler 将会把HttpRequestDecoder 替换为WebSocketFrameDecoder,把HttpResponseEncoder 替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被WebSocket 连接所需要的ChannelHandler。这也包括HttpObjectAggregatorHttpRequestHandler

UDP

UDP 提供了向多个接收者发送消息的额外传输模式:

  • 多播——传输到一个预定义的主机组;
  • 广播——传输到网络(或者子网)上的所有主机。

Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。

应该永远不要在Netty 的I/O 线程上执行任何非CPU 密集的代码——你将会从Netty 偷取宝贵的资源,并因此影响到服务器的吞吐量。