编解码器
解码器
- 将字节解码为消息——
ByteToMessageDecoder
和ReplayingDecoder
;
- 将一种消息类型解码为另一种——
MessageToMessageDecoder

io.netty.handler.codec.LineBasedFrameDecoder
:这个类在Netty 内部也有使用,它使用了行尾控制字符(\n
或者\r\n
)来解析消息数据;
io.netty.handler.codec.http.HttpObjectDecoder
:一个HTTP 数据的解码器。在io.netty.handler.codec
子包下面,你将会发现更多用于特定用例的编码器和解码器实现。
Netty 提供了TooLongFrameException
类,其将由解码器在帧超出指定的大小限制时抛出。
编码器
MessageToByteEncoder
:encode()
方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为ByteBuf
的出站消息。该ByteBuf 随后将会被转发给ChannelPipeline
中的下一个ChannelOutboundHandler
MessageToMessageEncoder
:出站数据将如何从一种消息编码为另一种
编解码器
ByteToMessageCodec
:我们需要将字节解码为某种形式的消息,可能是POJO
,随后再次对它进行编码。ByteToMessageCodec
将为我们处理好这一切,因为它结合了ByteToMessageDecoder
以及它的逆向——MessageToByteEncoder
MessageToMessageCodec
:将一种消息格式转换为另外一种消息格式,可以在一个单个的类中实现该转换的往返过程
CombinedChannelDuplexHandler
:这个类充当了ChannelInboundHandler
和ChannelOutboundHandler
的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。
Web协议
SslChannelHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslChannelInitializer(SslContext context, boolean startTls) {
// 传入要使用的SslContext
this.context = context;
// 如果设置为true,第一个写入的消息将不会被加密(客户端应该设置为true)
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
// 对于每个SslHandler实例,都从SslContext获取一个新的SSLEngine
SSLEngine engine = context.newEngine(ch.alloc());
// 将SslHandler作为第一个ChannelHandler添加到ChannelPipeline中
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
}
}
|

HttpObjectAggregator
:聚合HTTP消息
HTTP压缩:
HttpContentDecompressor
(客户端使用)
HttpContentCompressor
(服务端使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpPipelineInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 方法一:分别添加编码器与解码器
if (isClient) {
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
}
// 方法二:直接添加编解码器
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
// 聚合HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
// HTTP压缩
if (isClient) {
pipeline.addLast("decompressor", new HttpContentDecompressor());
} else {
pipeline.addLast("compressor", new HttpContentCompressor());
}
}
}
|
使用HTTPS:启用HTTPS 只需要将SslHandler
添加到ChannelPipeline
的ChannelHandler
组合中。
WebSocket
:要想向你的应用程序中添加对于WebSocket
的支持,你需要将适当的客户端或者服务器WebSocketChannelHandler
添加到ChannelPipeline
中。这个类将处理由WebSocket
定义的称为帧的特殊消息类型。如表11-3 所示,WebSocketFrame
可以被归类为数据帧或者控制帧。

1
2
|
// 如果被请求的端点是/websocket 则处理该升级握手
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
|
空闲的连接,心跳消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
// 当连接空闲时间太长时,将会触发一个IdleStateEvent事件
ch.pipeline().addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
// 在ChannelInboundHandler中年重写userEventTriggered方法来处理IdleStateEvent事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
}
|

解码基于分隔符的协议和基于长度的协议
基于分隔符的协议
基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(通常被称为帧)的开头或者结尾

基于长度的协议
基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。
LengthFieldBasedFrameDecoder
提供了几个构造函数来支持各种各样的头部配置情
况。
写大型数据
NIO 的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在Netty 的核心中,所以应用程序所有需要做的就是使用一个FileRegion
接口的实现,其在Netty 的API 文档中的定义是:“通过支持零拷贝的文件传输的Channel 来发送的文件区域。”只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。
1
2
3
|
FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
channel.writeAndFlush(region);
|
在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler
,它支持异步写大型数据流,而又不会导致大量的内存消耗。
1
2
|
pipeline.addLast(new ChunkedWriteHandler());
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
|
序列化数据
JBoss Marshalling
是一种可选的序列化API,它修复了在JDK 序列化API 中所发现的许多问题,同时保留了与java.io.Serializable
及其相关类的兼容性,并添加了几个新的可调优参数以及额外的特性,所有的这些都是可以通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。
Protobuf
序列化:

网络协议
硬实时服务质量(QoS),硬实时服务质量是保证计算结果将在指定的时间间隔内被递交。
WebSocketServerProtocolHandler
Netty 的WebSocketServerProtocolHandler
处理了所有委托管理的WebSocket
帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler
将会被添加到ChannelPipeline
中,而那些不再需要的ChannelHandler
则将会被移除。
当WebSocket
协议升级完成之后,WebSocketServerProtocolHandler
将会把HttpRequestDecoder
替换为WebSocketFrameDecoder
,把HttpResponseEncoder
替换为WebSocketFrameEncoder
。为了性能最大化,它将移除任何不再被WebSocket
连接所需要的ChannelHandler
。这也包括HttpObjectAggregator
和HttpRequestHandler
。
UDP
UDP
提供了向多个接收者发送消息的额外传输模式:
- 多播——传输到一个预定义的主机组;
- 广播——传输到网络(或者子网)上的所有主机。
Netty 的DatagramPacket
是一个简单的消息容器,DatagramChannel
实现用它来和远程节点通信。
应该永远不要在Netty 的I/O 线程上执行任何非CPU 密集的代码——你将会从Netty 偷取宝贵的资源,并因此影响到服务器的吞吐量。