MiniDB 开发手札2 - 网络通信: PostgreSQL 服务端实现

要写一个能够进行网络通信的协议,我们需要有客户端和服务端,定义各种数据包格式以及它们的交互流程,然后需要考虑安全性、效率等各种因素……实在是太麻烦了!所以,与其从头开始设计一个通讯协议,为什么不先研究一下现有数据库系统的协议呢?

如果我们直接实现了某个数据库的协议,那么这款数据库的客户端就可以直接连接到我们的数据库上,这样的话,我们岂不是连客户端都可以不用写了。嘿嘿……

抱着这种想法,本人在网上进行了一番调研,最后决定使用 PostgreSQL 数据库的通信协议。

选择 PostgreSQL 主要是因为……

  1. 社区活跃,资料丰富(不知道的能 Google)
  2. 官网有较为详细的通信协议文档(我看得懂)
  3. 版权属于社区,不会因为某些商业决定波及到自己(然后你就白写了)
  4. 开源协议自由,允许基于它修改的代码商用(万一哪天我的项目成名了呢?)

还有一件值得一提的事情是,GitHub 上使用 Kotlin/Java 和 PostgreSQL 协议实现的项目不多,就算有,也都是作为客户端,因此我们的 MiniDB 是第一个这么干的(没有抄袭嫌疑!)

PostgreSQL 的通信协议文档在这里可以查阅:https://www.postgresql.org/docs/current/protocol.html

0x01 PostgreSQL 通信协议概览

翻到章节目录,我们可以看到它由以下几个部分组成:

  • Message Flow - 定义每一种数据包在什么时候发送,期望得到什么样的回应
  • Message Data Types - 定义数据包里会包含的数据类型
  • Message Formats - 定义每一种数据包的结构和含义
  • 通讯协议的更新日志
  • 一些其他的高级的东西,这不是我们目前需要考虑的东西

要让一个普通的 PostgreSQL 客户端能连上我们的服务端,我们只需要了解握手阶段查询阶段,至于别的细节,我们以后遇到了再考虑。

Start-up Phase:握手

最简单的握手流程如下图所示:

根据文档中的介绍,带有SSL支持的客户端在建立连接后,首先会发送一个 SSLRequest 数据包,询问服务端是否进行 SSL 加密。

服务端要么回复 S,然后双方进入 SSL 加密,或者回复 N,只进行明文通信。

接下来,客户端发送一个 StartupMessage,包含一些基本设置、用户名和数据库名字等信息,代表正式握手。

服务端会从 Authentication 系列的数据包中发送一个给客户端,表示请求客户端以某种方式进行认证。如果认证成功,则发送 AuthenticationOk 的数据包,代表认证成功。 如果认证失败,发送 ErrorResponse。

认证成功以后,客户端会等待服务端发送 ReadyForQuery 数据包,代表服务端准备完毕,可以进入查询阶段。在此期间,服务端可以发送一些 ParameterStatus 数据包给客户端,这些数据包中包含一些服务端的默认设置(如编码、时区等)。

Query Phase:查询阶段

PostgreSQL 有多种不同的查询模式,在这里我们先学习一下最简单的查询过程。

首先,客户端发送一个 Query 数据包,内含我们要查询的 SQL 语句(有可能是多条喔!)

服务端接到查询语句后进行查询,得到结果后返回。

如果得到的结果是一个表格(比如说执行了 SELECT 或者 EXPLAIN,服务端首先会发一个 RowDescription 的数据包介绍每一列的含义,然后每行数据一个 DataRow 数据包,然后以一个 CommandComplete 数据包作为结束,最后是一个 ReadyForQuery 等待下一条查询请求。

Message Formats: 数据包格式

通常来说,通信协议中的数据包一般要有这些信息:

  • 数据包标识符 - 用于区分数据包之间的类型
  • 数据包长度 - 用于告诉另外一端要准备多大的内存空间去接收数据
  • 正文 - 实际要发的内容

PostgreSQL 的数据包格式正是由这些要素组成的。

0x02 Netty 下的简易实现

说了这么多,该开始写代码了。

MiniDB 的网络部分代码结构是这样的:

MiniDB
│
├─com
│  └─lss233
│      └─minidb
│          └─networking
│              │  MessageType.kt
│              │  NettyServer.kt
│              │  NettyServerInitializer.kt
│              │  Session.kt
│              │
│              │  ├─query
│              │  │      QueryHandler.kt
│              │  │
│              │  └─startup
│              │          SSLRequestRejectHandler.kt
│              │          StartupMessageHandler.kt
│              │
│              └─packets
│                      AuthenticationOk.kt
│                      AuthenticationSASL.kt
│                      CommandComplete.kt
│                      EmptyQueryResponse.kt
│                      ErrorResponse.kt
│                      NotificationResponse.kt
│                      ParameterStatus.kt
│                      PostgresSQLPacket.kt
│                      Query.kt
│                      ReadyForQuery.kt
│                      RowDescription.kt
│                      SSLRequest.kt
│                      StartupMessage.kt
│                      Terminate.kt
│
└─Main.kt

NettyServerInitializer

这个类定义了数据包的处理顺序。

    @Throws(Exception::class)
    public override fun initChannel(ch: SocketChannel) {
        val session = Session()
        val pipeline = ch.pipeline()
        pipeline.addLast(PostgreSQLDecoder(session), PostgreSQLEncoder(session))
        pipeline.addLast(SSLRequestRejectHandler(session), StartupMessageHandler(session))
        pipeline.addLast(QueryHandler(session))
        pipeline.addLast(TerminateHandler(session))
    }

在 Netty 中,服务端每收到一个新的连接,就会生成一个新的 SocketChannel,代表这个连接。

这个连接收发的数据包会像工厂中的流水线一样一个环节一个环节地处理下去,这里的流水线就是 pipeline。

PostgreSQL 的数据包都是有结构的,所以在这条流水线的最前端,我们添加了数据解析器 PostgreDecoder 和数据编码器 PostgreEncoder,实现我们的对象和数据包之间的转化。

Session

PostgreSQL 中绝大多数的数据包都是以 1 字节的标识符开头(通常是一个有含义的字符),然后是 4 字节的数据包长度,后面才是数据包的具体信息。 而按照文档中的说法, SSLRequest、 StartupMessage 和 CancelRequest 这几个数据包由于历史原因,它们一开头直接就是数据包的长度,没有数据包标识符。

所以,我们需要一个独立的对象来保存连接的状态,这个对象在连接创建时产生,连接断开时消亡。

class Session {
    var state = State.Startup
    var user: String? = null
    var database: String? = null
    val properties = HashMap<String, String>()
    enum class State {
        Startup, Authenticating, Query, Terminated
    }
}

在这里,我定义了  Startup 握手、 Authenticating 认证、Query 查询和 Terminated 终止四种不同的状态。

客户端与MiniDB建立连接以后,先进入握手状态。此时发送的数据都是开头没有数据包标识符的。

当客户端发送 StartupMessage 之后,进入认证状态。接下来发送的数据包都是开头有标识符的。

认证成功以后进入查询状态,接下来才允许客户端发送查询命令。

当连接断开以后,进入终止状态,释放资源。

后续过程中,可能会加入更多的状态。

PostgreDecoder

在 decode 方法中,我们先判断状态,然后根据状态来选择数据包类型。

            val mType = if(session.state == Session.State.Startup) {
                val position = `in`.readerIndex()
                val length = `in`.readInt()
                val magicNumber = `in`.readInt()
                `in`.readerIndex(position)
                if(length == 8 && magicNumber == 80877103) { // 这是他们规定好的
                    MessageType.SSLRequest
                } else {
                    MessageType.StartupMessage
                }
            } else {
                MessageType.getType(`in`.readByte())
            }

根据数据包类型,构造专门的数据包对象,然后把剩下的数据交给对象处理。

            fun parse(type: MessageType?, payload: ByteBuf): IncomingPacket? {
                return when(type) {
                    MessageType.SSLRequest -> SSLRequest().parse(payload)
                    MessageType.StartupMessage -> StartupMessage().parse(payload)
                    MessageType.Query -> Query().parse(payload)
                    MessageType.Terminate -> Terminate().parse(payload)
                    else -> null
                }
            }

数据包

按照数据的传输方向,MiniDB 把数据分成了两个类型:IncomingPacket 和 OutgoingPacket,分别表示服务端会收到的数据包和服务端会发送的数据包。

interface PostgreSQLPacket {
}
interface OutgoingPacket: PostgreSQLPacket {
    fun write(buf: ByteBuf): OutgoingPacket
}
interface IncomingPacket: PostgreSQLPacket {
    fun parse(buf: ByteBuf): IncomingPacket
}

IncomingPacket#parse 方法将从数据流中读取数据,复制到自身的成员中;

OutgoingPacket#write 将根据自身成员变量的值往数据流中写入数据。

这两个方法的返回值都是对象自身,方便链式调用(完全是个人喜好)。

一个具体的类就像下面这样:

class ParameterStatus(private val key: String, private val value: String): OutgoingPacket {
    override fun write(buf: ByteBuf): OutgoingPacket {
        buf.writeCharSequence(key, StandardCharsets.UTF_8)
        buf.writeByte(0)
        buf.writeCharSequence(value, StandardCharsets.UTF_8)
        buf.writeByte(0)
        return this
    }
}

PostgreSQLEncoder

在编码过程中,我们先将数据存入一个空的数据流中(仅仅是为了知道它到底有多长)。

然后我们按照官方文档中描述的顺序往输出流写数据就行了。

    override fun encode(ctx: ChannelHandlerContext?, msg: OutgoingPacket?, out: ByteBuf?) {
        val mType = msg?.javaClass?.simpleName?.let { MessageType.valueOf(it) }

        val buf = Unpooled.buffer()
        msg?.write(buf)

        val type = mType?.type?.toInt() ?: '?'.code
        // '?' 代表未知标识符
        val len = buf.writerIndex() + 4
        println("<- ${msg?.javaClass?.simpleName}(${type.toChar()}) len $len")

        out?.writeByte(type)
        out?.writeInt(len)
        out?.writeBytes(buf, buf.writerIndex())
    }

Handlers

说完了数据包和编解码器,接下来就剩下包处理器了。

Netty 可以很聪明地把一些包只交给一些特定的处理器来处理。

我们只要根据包的类型来作出正确的反应,就可以让客户端成功地连接上我们。

举几个例子。

在刚开始握手的时候,客户端会问我们要不要 SSL 加密。

这种事情对我们来说为时尚早,所以我们果断回 NO!

class SSLRequestRejectHandler(private val session: Session) : SimpleChannelInboundHandler<SSLRequest>() {
    override fun channelRead0(ctx: ChannelHandlerContext?, msg: SSLRequest?) {
       // 咱们这幼小的 MiniDB 可玩不来 SSL 这种东西
        ctx?.writeAndFlush(Unpooled.copiedBuffer("N", StandardCharsets.UTF_8))?.sync()
    }

}

接下来客户端会给我们发送一个 StartupMessage 的数据包,里面会包含用户名和数据库名。

理论上,我们应该让客户端证明一下自己能访问数据库,我们会切换到认证阶段,给它发一个认证方法,然后等它的回复。

不过,因为这个过程比较复杂,我不想让繁琐的认证流程破坏我诱骗客户端的兴致,所以我决定很敷衍地告诉它:行了行了,你登录成功了。

并进入查询阶段。

class StartupMessageHandler(private val session: Session) : SimpleChannelInboundHandler<StartupMessage>() {
    override fun channelRead0(ctx: ChannelHandlerContext?, msg: StartupMessage?) {
//        session.state = Session.State.Authenticating
//        ctx?.writeAndFlush(AuthenticationSASL(listOf("SCRAM-SHA256")))?.sync()
//        极其敷衍告诉客户端:你登录成功了
        ctx?.writeAndFlush(AuthenticationOk())?.sync()
        ctx?.writeAndFlush(ParameterStatus("client_encoding", "UTF8"))?.sync()
        ctx?.writeAndFlush(ParameterStatus("DataStyle", "ISO, YMD"))?.sync()
        ctx?.writeAndFlush(ParameterStatus("TimeZone", "Asia/Shanghai"))?.sync()
        ctx?.writeAndFlush(ParameterStatus("server_encoding", "UTF8"))?.sync()
        ctx?.writeAndFlush(ParameterStatus("server_version", "14.5"))?.sync()
        session.state = Session.State.Query
        ctx?.writeAndFlush(ReadyForQuery())?.sync()
    }
}

QueryHandler

进入了查询阶段之后,我们的MiniDB就要处理客户端发过来的 SQL 语句了。

SQL 语句的解析部分由我的队友完成。我只要负责将 SQL 语句喂给他写好的 SQL 解析器,然后读取它的解析结果就行了。

按照官方文档的介绍,我们的服务端收到客户端的 SQL 语句之后要做出反应,否则客户端会一直傻傻地等待。

但由于我们的引擎部分啥也没写,所以现在就只能假惺惺回两句啦。

class QueryHandler(private val session: Session) : SimpleChannelInboundHandler<Query>() {
    private val REGEX_STMT_SET = Regex("set (.+) to (.+)");
    override fun channelRead0(ctx: ChannelHandlerContext?, msg: Query?) {
        try {

            var queryString = msg?.queryString

            // 先把查询语句转化为 MySQL 风格
            if(REGEX_STMT_SET.matches(queryString!!)) {
                queryString = queryString.replace(REGEX_STMT_SET, "SET $1=$2")
            }

            // 交给词法解析器
            val ast = SQLParserDelegate.parse(queryString)
            println("  Q(${ast.javaClass.simpleName}: $queryString")

            // 分析解析后的 SQL 语句,作出不同的反应
            when(ast) {
                is DMLSelectStatement -> {
                    // 这是一条查询语句
                    ctx?.writeAndFlush(RowDescription())?.sync()
                    //  查到了 0 条结果也是一种查
                    ctx?.writeAndFlush(CommandComplete("SELECT 0"))?.sync()
                }
                is DALSetStatement -> {
                    // 这是一条设置语句
                    for (pair in ast.assignmentList) {
                        // 更新设置
                        session.properties[(pair.key as SysVarPrimary).varText] =
                            pair.value.evaluation(emptyMap()).toString()

                        // 告知客户端设置成功
                        ctx?.writeAndFlush(CommandComplete("SET"))?.sync()
                        ctx?.writeAndFlush(ParameterStatus(
                            (pair.key as SysVarPrimary).varText,
                            session.properties[(pair.key as SysVarPrimary).varText]!!
                        ))?.sync()
                    }
                }
            }

        } catch (e: SQLSyntaxErrorException) {
            System.err.println(" Q(Error): ${msg?.queryString}")
            e.printStackTrace()
            // 告诉客户端你发的东西有问题
            val err = ErrorResponse()
            err.message = e.message!!
            ctx?.writeAndFlush(err)?.sync()
        }
        // 等待下一条语句
        ctx?.writeAndFlush(ReadyForQuery())?.sync()
    }

}

0x03 测试

使用 Navicat 连接我们的 MiniDB,没有出现任何报错就算成功!

不过在服务端,我们能看见 MiniDB 和 Navicat 聊得很开心!

虽然 Navicat 没有显示出任何数据库,但是根据他俩的聊天记录来看, Navicat 读取数据库列表其实是通过一条 SELECT 语句来实现的。

这提示我们在实现数据库引擎的时候,可以把数据库信息写到一个表里,这样就不需要再做额外的工作了。

0x04 花絮

在实际编写的过程中,我发现实际中的客户端和服务器会发一些文档里没提到的信息(或者说我没翻到),导致我写的服务端不能被正常连接。

遇到这种情况时,我们需要搭建一个真正的服务器,然后使用抓包工具来找到不一样的数据包。

在这里,我使用的是 sokit 的中继模式。上图为 Navicat 连接 PostgreSQL 数据库时双方的数据,下图是 Navicat 连接 MiniDB 时双方的数据。

至此, MiniDB 的网络通信部分先告一段落了。接下来就进入数据库引擎的开发,然后实现真正的数据库了。