前言

在上一篇的基于Netty实现简单的Redis客户端 - 木然轩 - 文剑木然的网络日志中我介绍了如何使用Netty来编写一个简单的Redis客户端。在本篇中,将使用Netty实现一个简单的Redis服务端。

目标

完整的Redis服务端的功能是很复杂的,由于是学习目的,这里只实现Redis最基本的一项功能,就是存取字符串,也就是下面的两个命令:

1
2
SET key value
GET key

实现

由于Redis的协议在上一篇已经提过了,本篇就不再赘述了,直接上实现的过程。

Redis服务端骨架

首先是编写服务端的样板代码,添加handler,然后绑定6379端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RedisServer {
    private final InetSocketAddress address;

    public RedisServer(String host, int port) {
        this.address = new InetSocketAddress(host, port);
    }

    public static void main(String[] args) {
        int port = 6379;
        new RedisServer("127.0.0.1", port).start();
    }

    public void start() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        RedisServerHandler serverHandler = new RedisServerHandler();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new RedisServerCodec());
                        // 使用单例,这样才能共享存储的数据
                        ch.pipeline().addLast(serverHandler);
                    }
                });

        try {
            bootstrap.bind(address).sync();
            System.out.println("Netty redis server start at " + address.getHostString() + ":" + address.getPort());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

由于上一篇已经了解了编码器与解码器,这里将二者结合到一起,也就是使用编解码器,兼具编码与解码的功能。

对于服务端的Handler,这里使用单例。这是因为,我们希望多个客户端连接到服务端时能共享数据。例如,客户端A设置key的值为"hello",然后当客户端B连接时查看key的值是"hello"而不是nill。如果不使用单例,那么每个客户端会被不同的Handler实例所处理,每个Handler实例持有各自的数据,那么也就是各个客户端看到的是各自的数据。

定义Redis响应

由上一篇对协议的介绍可知,字符串分多行字符串与单行字符串,而单行字符串又分为错误消息与提示消息。为了能区分这些消息类型,定义RedisResponse,其中valueType为字符串的类型,value为字符串的值,并且可以是null值。

1
2
3
4
5
public class RedisResponse {
    private ValueTypeEnum valueType;
    private Object value;
    // 省略getter与setter方法
}

为了可读性,不同的字符串类型用枚举表示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public enum ValueTypeEnum {
    ONE_LINE_STRING(1, "单行字符串"),
    ONE_LINE_MESSAGE(2, "单行消息提示"),
    MULTI_LINE_STRING(3, "多行字符串");

    private final int code;
    private final String desc;

    ValueTypeEnum(int code, String desc) {
        this.code = code;
        this.desc = desc;
    }
    // 省略getter方法
}

服务端编解码

继承ByteToMessageCodec来获得兼具编码与解码的功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class RedisServerCodec extends ByteToMessageCodec<RedisResponse> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RedisResponse msg, ByteBuf out) throws Exception {

    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

    }
}

服务端接收来自客户端的命令,并进行解码,随后处理请求,向客户端发送响应,并进行编码。服务端对客户端发送的响应有三种类型,也就是ValueTypeEnum里定义的三种,对着三种类型的响应分别进行相应的编码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
if (msg.getValueType() == ValueTypeEnum.MULTI_LINE_STRING) {
    out.writeByte('$');
    if (msg.getValue() == null) {
        out.writeCharSequence("-1", StandardCharsets.UTF_8);
    } else {
        String value = msg.getValue().toString();
        out.writeCharSequence(String.valueOf(value.length()), StandardCharsets.UTF_8);
        out.writeByte('\r').writeByte('\n');
        out.writeCharSequence(value, StandardCharsets.UTF_8);
    }
} else {
    if (msg.getValueType() == ValueTypeEnum.ONE_LINE_STRING) {
        out.writeByte('+');
    } else if (msg.getValueType() == ValueTypeEnum.ONE_LINE_MESSAGE) {
        out.writeByte('-');
    }
    if (msg.getValue() != null) {
        out.writeCharSequence(msg.getValue().toString(), StandardCharsets.UTF_8);
    }
}
out.writeByte('\r').writeByte('\n');

客户端的命令是多行字符串的格式,一条命令是由多个单词组成,将其解码为单词的列表方便后续的处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
in.skipBytes(1);
int size = 0;
byte cur;
while ((cur = in.readByte()) != '\r') {
    size = size * 10 + cur - '0';
}
in.skipBytes(1);
String[] cmds = new String[size];
for (int i = 0; i < size; i++) {
    in.skipBytes(1);
    int len = 0;
    byte c;
    while ((c = in.readByte()) != '\r') {
        len = len * 10 + c - '0';
    }
    in.skipBytes(1);
    String cmd = in.readCharSequence(len, StandardCharsets.UTF_8).toString();
    cmds[i] = cmd;
    in.skipBytes(2);
}
out.add(cmds);

服务端处理请求

最后就到了服务端处理请求的过程了。

由于我们只考虑对字符串的存取,只支持GETSET命令,这里使用一个Map<String, String>来存储值,并且使用ConcurrentHashMap来保证并发安全。

需要注意的是,@Sharable注解并不是让Handler成为单例,需要我们自行让Handler为单例。@Sharable注解只是让Netty在检测到多个Pipeline使用同一个Handler实例时不抛出异常。

具体的处理过程比较简单,除了错误提示,就是对数据的存取了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@ChannelHandler.Sharable
public class RedisServerHandler extends SimpleChannelInboundHandler<String[]> {
    private final static String GET = "get";
    private final static String SET = "set";

    private final Map<String, String> redisStore = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String[] msg) throws Exception {
        if (msg.length == 0) {
            return;
        }
        RedisResponse response = new RedisResponse();
        if (GET.equalsIgnoreCase(msg[0])) {
            // 处理get请求
            if (msg.length == 2) {
                String key = msg[1];
                response.setValueType(ValueTypeEnum.MULTI_LINE_STRING);
                response.setValue(redisStore.getOrDefault(key, null));
            } else {
                response.setValueType(ValueTypeEnum.ONE_LINE_MESSAGE);
                response.setValue("(error) ERR wrong number of arguments for 'get' command");
            }
        } else if (SET.equalsIgnoreCase(msg[0])) {
            // 处理set请求
            if (msg.length == 3) {
                String key = msg[1];
                String value = msg[2];
                redisStore.put(key, value);
                response.setValueType(ValueTypeEnum.ONE_LINE_STRING);
                response.setValue("OK");
            } else {
                response.setValueType(ValueTypeEnum.ONE_LINE_MESSAGE);
                if (msg.length < 3) {
                    response.setValue("(error) ERR wrong number of arguments for 'set' command");
                } else {
                    response.setValue("(error) ERR syntax error");
                }
            }
        } else {
            // 未知的命令
            response.setValueType(ValueTypeEnum.ONE_LINE_MESSAGE);
            StringBuilder sb = new StringBuilder();
            sb.append("(error) ERR unknown command `");
            sb.append(msg[0]);
            sb.append("`, with args beginning with:");
            for (int i = 1; i < msg.length; i++) {
                sb.append(" `").append(msg[i]).append("`,");
            }
            response.setValue(sb.toString());
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

效果

编写完后,就测试下效果吧。首先退出本地的redis服务,然后运行我们自己编写的RedisServer。运行成功后,会监听6379端口,此时运行我们上一篇我们自己编写的RedisClient,并输入一些命令进行测试。我们还能打开redis-cli来连接我们自己编写的Redis服务端,也能使用简单的GETSET命令。

screenshot-20211024215540

总结

本篇实现了一个非常简单的Redis服务端,考虑是demo性质的案例,只是实现了基础的GETSET命令,也是Redis最为常用的命令与最为基础的功能,主要是起抛砖引玉的作用,读者可以在此基础上去实现更多Redis服务器的功能。

代码GitHub:java-demo/netty-redis at master · jlice/java-demo

上一篇:基于Netty实现简单的Redis客户端