前言

Netty是Java中非常常用的网络库,它能自定义网络通信协议,从而实现非常丰富多样的网络功能。Redis是开发中非常常用的中间件,常用于缓存、分布式锁等场合,Redis的协议也非常简单。尽管Netty中自带了Redis的客户端,但是,为了学习,我们将不使用Netty自带的实现,而是自己动手写一个基础版本的Redis客户端。

目标

Redis支持字符串、列表、集合等数据类型,考虑到是学习目的,只实现最最基础的功能,也就是设置与获取字符串的值。具体来说,就是下面两个命令:

1
2
3
4
# 设置指定key的值
SET key value
# 获取指定key的值
GET key

协议

了解Redis协议

可以从 Redis协议详细规范 来了解Redis的协议。Redis的协议是基于字符串的而不是二进制的,这为协议的处理带来了许多便利。

除了通过看文档来了解协议细节,还可以利用Wireshark抓包来查看客户端与服务端具体的通信细节。

首先,启动Redis服务,这里我在本地启动的;然后打开Wireshark,由于Redis服务是在本地启动的,因此选择捕获本地环回网络流量(loopback traffic);由于Redis监听6379端口,过滤器可以填写port 6379,这样就过滤掉了我们不关心的流量,只留下Redis的包;最后打开Redis客户端并测试几个命令,即可看到Wireshark抓到的Redis协议的包:

screenshot-20211024193029

Redis请求

Redis请求时多行字符串的格式,例如客户端发送命令get key,那么实际上客户端发出的是:

1
*2\r\n$3\r\nget\r\n$3\r\nkey\r\n

*开头,然后是命令的单词数量,之后对于命令的每个单词,先是$+单词长度,然后是单词,并且都用\r\n分隔。

响应字符串

响应字符串也是多行字符串的格式。如果不存在该key,返回nil,也就是:

1
$-1\r\n

如果值是字符串,例如“hello”,那么返回结果为:

1
$5\r\nhello\r\n

响应错误与消息

如果出现错误,则会返回错误消息。错误消息是单行字符串,以-开头,如:

1
-ERR wrong number of arguments for 'get' command\r\n

如果是正常的消息,例如设置值成功,那么消息也是单行字符串,但是以+开头,如:

1
+OK\r\n

实现

命令行客户端

模仿redis-cli,使用Java写一个这样的命令行客户端。首先新建maven项目,并导入Netty依赖:

1
2
3
4
5
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.12.Final</version>
</dependency>

新建类RedisClient,在main方法,初始化RedisClient对象并连接到6379端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class RedisClient {
    private final InetSocketAddress address;

    public RedisClient(String host, int port) {
        address = new InetSocketAddress(host, port);
    }

    public static void main(String[] args) {
        int port = 6379;
        RedisClient client = new RedisClient("127.0.0.1", port);
        client.start();
    }

    public void start() {
        //...
    }
}

start方法中,添加4个handler,分别用于Redis请求编码、Redis响应解码、客户端处理Redis响应以及发出Redis请求。prompt用于命令行提示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
String prompt = String.format("%s:%d> ", address.getHostString(), address.getPort());
bootstrap.group(group)
        .channel(NioSocketChannel.class)
        .attr(AttributeKey.newInstance("prompt"), prompt)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new RedisRequestEncoder());
                ch.pipeline().addLast(new RedisResponseDecoder());
                ch.pipeline().addLast(new RedisClientResponseHandler());
                ch.pipeline().addLast(new RedisClientRequestHandler());
            }
        });

之后,便可以连接到Redis服务器了。使用标准输入来获取要下发的Redis命令,当发送quit命令时客户端退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
try {
    // 连接Redis服务器
    Channel channel = bootstrap.connect(address).sync().channel();
    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
    System.out.print(prompt);
    ChannelFuture future = null;
    while (true) {
        String s = in.readLine();
        if (s == null || "quit".equalsIgnoreCase(s = s.trim())) {
            break;
        }
        if (s.length() == 0) {
            System.out.print(prompt);
            continue;
        }
        future = channel.writeAndFlush(s);
    }
    if (future != null) {
        future.sync();
    }
} catch (InterruptedException | IOException e) {
    e.printStackTrace();
} finally {
    group.shutdownGracefully();
}

编码器与请求Handler

尽管Redis支持内联命令,这里仍然将其转为多行字符串的格式,对于编码器只需要按格式构造拼接字符串即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class RedisRequestEncoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        String[] cmds = msg.split("\\s+");
        StringBuilder sb = new StringBuilder();
        sb.append("*").append(cmds.length).append("\r\n");
        for (String cmd : cmds) {
            sb.append("$").append(cmd.length()).append("\r\n")
                    .append(cmd).append("\r\n");
        }
        out.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
    }
}

而请求也很简单,只需要发送命令即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class RedisClientRequestHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        String message = (String) msg;
        ByteBuf buf = ctx.alloc().buffer(message.length());
        buf.writeBytes(message.getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(message);
        ReferenceCountUtil.release(buf);
    }
}

写入的命令会流到编码器转为多行字符串的格式再发到Redis服务端。

解码器与响应Handler

对于解码器,需要解析从Redis服务器返回的消息。

首先,根据第一个字符判断是哪种消息类型:如果是$开头,那么就是多行字符串,否则就是单行字符串。对于多行字符串,由于有nil的存在,因此使用Optional<String>类型,这样就能区分nil与空串。在对ByteBuf进行操作时,需要注意其readerIndex,确定读取的位置是想要的位置,以及对一个包的完整解析。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RedisResponseDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte start = in.readByte();
        if (start == '$') {
            // 读取长度
            int len = 0;
            byte cur;
            while ((cur = in.readByte()) != '\r') {
                if (cur == '-') {
                    // 说明是空值,跳过后续的1\r\n
                    out.add(Optional.empty());
                    in.readBytes(3);
                    return;
                }
                len = len * 10 + (cur - '0');
            }
            // 当前cur位于\r,跳过后面的\n
            in.skipBytes(1);

            CharSequence sequence = in.readCharSequence(len, StandardCharsets.UTF_8);
            String msg = sequence.toString();
            out.add(Optional.of("\"" + msg + "\""));
            // 跳过结尾的\r\n
            in.skipBytes(2);
        } else {
            // 单行字符串
            int index = ByteBufUtil.indexOf(Unpooled.buffer(1).writeByte('\r'), in);
            // 注意这里的消息格式应与上面的保持一致为Optional<String>
            if (index > 1) {
                out.add(Optional.of(in.readCharSequence(index - 1, StandardCharsets.UTF_8).toString()));
            } else {
                out.add(Optional.empty());
            }
            // 跳过结尾的\r\n
            in.skipBytes(2);
        }
    }
}

解码之后的处理也比较简单,打印结果即可。为了用户友好,还打印下命令行提示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class RedisClientResponseHandler extends SimpleChannelInboundHandler<Optional<String>> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Optional<String> msg) throws Exception {
        if (msg.isPresent()) {
            System.out.println(msg.get());
        } else {
            System.out.println("(nil)");
        }
        String prompt = (String) ctx.channel().attr(AttributeKey.valueOf("prompt")).get();
        System.out.print(prompt);
    }
}

效果

为了查看效果,先退出本地的redis-cli,并确认本地的redis服务是启动的。然后运行我们自己编写的RedisClient,效果如下:

screenshot-20211024203515

总结

本篇介绍了Redis最基本的协议,并基于Netty做了个非常简单的Redis客户端,从而巩固了Netty的基本用法。

不过,由于是简单的demo,本例还有一些不够完善的地方。例如,没有与服务端建立心跳,在进行解码时没有判断是否有足够的字节可用等等。限于篇幅的原因,留给读者自行尝试。(好吧,是因为笔者也是初学者)

代码GitHub:java-demo/netty-redis at master · jlice/java-demo

下一篇:基于Netty实现简单的Redis服务端