Netty 编解码器
# 什么是编解码器
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何 将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区 别是什么呢?
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列—它的数据。 那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。
# 入站和出站
以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站。反之称为入站。
# Netty 编解码
Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等。
ChannelHandler
ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline
ChannelPipeline提供了ChannelHandler链的容器。对于出站,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。
通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。
# 解码器
将字节解码为消息——ByteToMessageDecoder
将一种消息类型解码为另一种——MessageToMessageDecoder。
因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以 Netty 的解码器实现了 ChannelInboundHandler。什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。
将字节解码为消息
抽象类 ByteToMessageDecoder
将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 为它提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。
decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out)
这是必须实现的唯一抽象方法。decode()方法被调用时将会传入一个包含了传入数据 的 ByteBuf,以及一个用来添加解码消息的 List。对这个方法的调用将会重复进行,直到确 定没有新的元素被添加到该 List,或者该 ByteBuf 中没有更多可读取的字节时为止。如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。
例如:
public class FrameChunkDecoder extends ByteToMessageDecoder {
private final int maxFrameSize;
//指定将要产生的帧的最大允许大小
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxFrameSize) {
//如果该帧超出允许的大小,则丢弃它并抛出一个 TooLongFrameException
in.clear();
throw new TooLongFrameException();
}
//否则,从 ByteBuf 中读取一个新的帧
ByteBuf buf = in.readBytes(readableBytes);
//将该帧添加到解码 读取一个新的帧消息的 List 中
// todo 可以将消息转换成Java 对象
out.add(buf);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
将一种消息类型解码为另一种
在两个消息格式之间进行转换(例如,从 String->Integer)
decode(ChannelHandlerContext ctx,I msg,List<Object> out)
对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler
MessageToMessageDecoder<T>
,T 代表源数据的类型
Netty中的几种常用 Decoder
- LineBasedFrameDecoder(行分割数据包解码器,最基础的一种解码器)
- FixedLengthFrameDecoder(固定长度数据包解码器)
- DelimiterBasedFrameDecoder(自定义分隔符数据包解码器)
- LengthFieldBasedFrameDecoder(自定义长度数据包解码器,最为复杂的一种解码器)
# 编码器
将消息编码为字节:MessageToByteEncoder<I>
将消息编码为消息: MessageToMessageEncoder<T>
,T 代表源数据的类型
将消息编码为字节
encode(ChannelHandlerContext ctx,I msg,ByteBuf out)
encode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的出站消息(类型为 I 的)。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)
throws Exception {
//写 Short 到 ByteBuf
out.writeShort(msg);
}
}
2
3
4
5
6
7
8
将消息编码为消息
encode(ChannelHandlerContext ctx,I msg,List<Object> out)
这是你需要实现的唯一方法。每个通过 write()方法写入的消息都将会被传递给 encode() 方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler。
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out)
throws Exception {
//转 Integer 为 String,并添加到 MessageBuf
out.add(String.valueOf(msg));
}
}
2
3
4
5
6
7
8
9
LengthFieldPrepender
大多数的协议(私有或者公有),协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求 的通用性,Netty提供了LengthFieldBasedFrameDecoder
/LengthFieldPrepender
,自动屏蔽TCP底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。
发送方使用LengthFieldPrepender给实际内容Content进行编码添加报文头Length字段,接受方使用LengthFieldBasedFrameDecoder进行解码。协议格式如下所示:
+--------+----------+
| Length | Content |
+--------+----------+
2
3
Length字段:
表示Conent部分的字节数,例如Length值为100,那么意味着Conent部分占用的字节数就是100。
Length字段本身是个整数,也要占用字节,一般会使用固定的字节数表示。例如我们指定使用2个字节(有符号)表示length,那么可以表示的最大值为32767(约等于32K),也就是说,Content部分占用的字节数,最大不能超过32767。当然,Length字段存储的是Content字段的真实长度。
Content字段:
是我们要处理的真实二进制数据。 在发送Content内容之前,首先需要获取其真实长度,添加在内容二进制流之前,然后再发送。Length占用的字节数+Content占用的字节数,就是我们总共要发送的字节。
事实上,我们可以把Length部分看做报文头,报文头包含了解析报文体(Content字段)的相关元数据,例如Length报文头表示的元数据就是Content部分占用的字节数。当然,LengthFieldBasedFrameDecoder并没有限制我们只能添加Length报文头,我们可以在Length字段前或后,加上一些其他的报文头,此时协议格式如下所示:
+---------+--------+----------+----------+
|........ | Length | ....... | Content |
+---------+--------+----------+----------+
2
3
不过对于LengthFieldBasedFrameDecoder而言,其关心的只是Length字段。因此当我们在构造一个LengthFieldBasedFrameDecoder时,最主要的就是告诉其如何处理Length字段。
LengthFieldPrepender提供了多个构造方法,最终调用的都是:
public LengthFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);
}
public LengthFieldPrepender(
ByteOrder byteOrder, int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength)
2
3
4
5
6
7
其中:
- byteOrder:表示Length字段本身占用的字节数使用的是大端还是小端编码
- lengthFieldLength:表示Length字段本身占用的字节数,只可以指定 1, 2, 3, 4, 或 8
- lengthAdjustment:表示Length字段调整值
- lengthIncludesLengthFieldLength:表示Length字段本身占用的字节数是否包含在Length字段表示的值中。
例如:对于以下包含12个字节的报文
+----------------+
| "HELLO, WORLD" |
+----------------+
2
3
假设我们指定Length字段占用2个字节,lengthIncludesLengthFieldLength指定为false,即不包含本身占用的字节,那么Length字段的值为0x000C(即12)。
+--------+----------------+
+ 0x000C | "HELLO, WORLD" |
+--------+----------------+
2
3
如果我们指定lengthIncludesLengthFieldLength指定为true,那么Length字段的值为:0x000E(即14)=Length(2)+Content字段(12)
+--------+----------------+
+ 0x000E | "HELLO, WORLD" |
+--------+----------------+
2
3
关于lengthAdjustment字段的含义,参见下面的LengthFieldBasedFrameDecoder。
LengthFieldPrepender尤其值得说明的一点是,其提供了实现零拷贝的另一种思路(实际上编码过程,是零拷贝的一个重要应用场景
)。
- 在Netty中我们可以使用ByteBufAllocator.directBuffer()创建直接缓冲区实例,从而避免数据从堆内存(用户空间)向直接内存(内核空间)的拷贝,这是系统层面的零拷贝;
- 也可以使用
CompositeByteBuf
把两个ByteBuf合并在一起,例如一个存放报文头,另一个存放报文体。而不是创建一个更大的ByteBuf,把两个小ByteBuf合并在一起,这是应用层面的零拷贝。
而LengthFieldPrepender,由于需要在原来的二进制数据之前添加一个Length字段,因此就需要对二者进行合并发送。但是LengthFieldPrepender并没有采用CompositeByteBuf,其编码过程如下:
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//1 获得Length字段的值:真实数据可读字节数+Length字段调整值
int length = msg.readableBytes() + lengthAdjustment;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
...
//2 根据lengthFieldLength指定的值(1、2、3、4、8),创建一个ByteBuffer实例,写入length的值,
//并添加到List类型的out变量中
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
...
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
//3 最后,再将msg本身添加到List中(msg.retain是增加一次引用,返回的还是msg本身)
out.add(msg.retain());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
可以看到,LengthFieldPrepender实际上是先把Length字段(报文头)添加到List中,再把msg本身(报文)添加到List中。而在发送数据时,LengthFieldPrepender的父类MessageToMessageEncoder会按照List中的元素下标按照顺序发送,因此相当于间接的把Length字段添加到了msg之前。从而避免了创建一个更大的ByteBuf将Length字段和msg内容合并到一起。作为开发者的我们,在编写编码器的时候,这种一种重要的实现零拷贝的参考思路。