现在你已经知道了 Java NIO 里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞 IO 相对传统的阻塞 IO 给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。

查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于 Jakob Jenkov 个人的工作经验和构思。如果你有其他的解决方案或者是更好的创意,那么还请不吝赐教。你可以在文章下方的评论区回复。

虽然本文介绍的一些构思是为 Java NIO 设计的,但是我相信这些思路同样适用于其他编程语言,只要他们也存在和 Selector 类似结构,概念。就目前我的了解来说,这些结构底层 OS 提供的,所以基本上你可以运用到其他编程语言中去。

阅读文章的过程中如果有任何疑问,欢迎添加笔者为好友,拉您进【七日书摘】微信交流群,一起交流技术,一起打造高质量的职场技术交流圈子,抱团取暖,共同进步。
七日书摘官方群.jpg

非阻塞服务-GitHub 源码仓(Non-blocking Server - GitHub Repository)

为了演示本文探讨的一些技术,笔者已经在 GitHub 上面建立了相应的源码仓,地址如下:
https://github.com/jjenkov/java-nio-server

非阻塞 IO 管道(Non-blocking IO Pipelines)

非阻塞 IO 管道(Non-blocking IO Pipelines)可以看做是整个非阻塞 IO 处理过程的链条。包括在以非阻塞形式进行的读与写操作。下面有一张插图,简单的描述了一个基础的非阻塞 IO 管道(Non-blocking IO Pipelines):
non-blocking-server-1.png

一个组件(Component)通过 选择器(Selector) 检查当前 通道(Channel) 是否有数据需要写入。此时 component 读入数据,并且根据输入的数据 input 对外提供数据输出 output。这个对外的数据输出 output 被写到了另一个 Channel 中。

一个非阻塞的 IO 管道不必同时需要读和写数据,通常来说有些管道只需读数据,而另一些管道则只需写数据。

上面的这幅流程图仅仅展示了一个组件。实际上一个管道可能存在多个 component 在处理输入数据。管道的长度取决于管道具体要做的事情。

当然一个非阻塞的 IO 管道他也可以同时从多个 Channel 中读取数据,例如同时从多个 SocketChannel 中读取数据;

上面的流程图实际上被简化了,图中的 Component 实际上负责初始化 Selector,从 Channel 中读取数据,而不是由 Channel 往 Selector 推送数据(push),这是简化的上图容易给人带来的误解。

非阻塞式vs. 阻塞式管道(Non-blocking vs. Blocking IO Pipelines)

非阻塞 IO 和阻塞 IO 管道之间最大的区别是他们如何从 Channel(套接字 socket 或文件 file)读写数据。

IO 管道通常直接从流中(来自于 socket 或 file流)读取数据,然后把这些数据分割为一系列连续的消息。这和使用 tokenizer(解析器之类的意思)将数据流解析为 token(数据包的意思)类似。相反你只是将数据流分解为更大的消息体。我将拆分数据流成消息这一组件称为“消息读取器”(Message Reader)。下面是 Message Reader 拆分流为消息的示意图:
non-blocking-server-2.png

一个阻塞 IO 管道可以使用类似 InputStream 接口每次一个字节地从底层 Channel 读取数据,并且这个接口会阻塞直到有数据可以读取。这就是阻塞式 Message Reader 的实现过程。

使用阻塞式 IO 大大简化了 Message Reader 的实现成本。阻塞式 Message Reader 无需关注没有数据可读的情形,无需关注返回部分数据或者数据解析需要被复用的问题。

相似的,阻塞式 Message Writerr(一个将数据写入流中组件) 也不需要关注写入部分数据,和数据复用的问题。

阻塞 IO 管道的缺点(Blocking IO Pipeline Drawbacks)

阻塞式 Message Reader 易于实现,但是阻塞也给他带了不可避免的缺点,必须为每个要分解成消息的数据流分配一个独立线程。原因在于 IO 接口在读取数据时在有数据返回前会一直被阻塞,这直接导致我们无法用单线程来处理一个流没有数据返回时去读取其他的流。一旦一个线程尝试从一个流中读取数据,那么这个线程将会阻塞直到有数据可以读取。

如果这样的 IO 管道运用到服务器去处理高并发的连接请求,服务器将不得不为每一个到来的链接分配一个单独的线程。如果并发数不高比如任何时刻都只有几百并发连接请求,这确实不会有太大问题。但是如果服务器的并发数上升到百万级别,这种设计就缺乏伸缩性了。每个线程需要为堆栈分配 320KB(32位JVM)到 1024KB(64位JVM)的内存空间。这就是说如果有 1,000,000 个线程就需要 1TB 的内存。而这些在还没开始真正处理接收到的消息前就需要(消息处理中还需要为对象分配内存)。

为了减少线程数,很多服务器都设计了线程池,把所有接收到的请求放到队列内,每次读取一条连接进行处理。这种设计可以用下图表示:
non-blocking-server-3.png

然而这种设计需要入站链接合理地发送数据。如果入站链接长时间不活跃,那么大量的不活跃链接实际上就造成了线程池中所有线程阻塞。这意味着服务器响应变慢甚至是没有反应。

一些服务器为了减轻这个问题,采取的操作是适当增加线程池的弹性。例如,当线程池所有线程都处于饱和时,线程池可以自动扩容,启动更多的线程来处理事务。这个解决方案会使得服务器维护大量不活跃的链接。但是需要谨记服务器所能开辟的线程数是有限制的。所有当有 1,000,000 个低速的链接时,服务器还是不具备伸缩性的。

基础的非阻塞通道设计(Basic Non-blocking IO Pipeline Design)

一个非阻塞式 IO 管道可以使用一个单独的线程向多个流读取数据。前提是相关的流可以切换为非阻塞模式(并不是所有流都可以以非阻塞形式操作)。在非阻塞模式下,读取一个流可能返回 0 个或多个字节。如果流还没有可供读取的数据那么就会返回 0,其他大于 1 的返回都表明这是实际读取到的数据;

为了避开没有可读的数据流,我们可以使用 Java NIO 中的 Selector。一个 Selector 可以注册一个或多个 SelectableChannel 实例。当我们调用select() 或 selectorNow() 方法时 Selector 会返回一个有数据可读的 SelectableChannel 实例。这个设计可以如下插图:
non-blocking-server-4.png

读取部分信息(Reading Partial Messages)

当我们从一个 SelectableChannel 中读取一个数据包时,我们并不知道这个数据包是否是完整的一个 message。因为一个数据包可能包含部分 message,也就是说即可能少于一个 message,也可能多一个 message,正如下面这张插图所示意的那样:
non-blocking-server-5.png

要处理这种截断的 message,有两个问题:

  1. 检测数据包中是否包含一个完整的 message;
  2. 在 message 剩余部分获取到之前,我们如何处理不完整的message;

检测完整 message 要求 Message Reader 查看数据包中的数据是否至少包含一个完整的 message。如果包含一个或多个完整 message,这些 message 可以被下发到通道中处理。查找完整 message 的过程是个大量重复的操作,所以这个操作必须是越快越好的。

当数据段中有一个不完整的 message 时,无论不完整消息是整个数据包还是说在完整 message 前后,这个不完整的 message 数据都需要在剩余部分获得前存储起来。

检查 message 完整性和存储不完整 message 都是 Message Reader 的职责。为了避免混淆来自不同 Channel 的数据,我们为每一个 Channel 分配一个 Message Reader。整个设计大概是这样的:
non-blocking-server-6.png

当我们通过 Selector 获取到一个有数据可以读取的 Channel 之后,与该 Channel 关联的 Message Reader 会读取数据并尝试把它们分解为 Message 块,这样得到完整的 message 后就可以通过管道下发到需要处理这些消息的组件中进行处理。

一个 Message Reader 一定要满足特定的协议。它需要知道 message 的格式以便读取。如果我们的服务器是跨协议复用的,那他必须实现 Message Reader 的协议-可能类似于接收一个 Message Reader 工厂作为配置参数。

存储不完整的 Message(Storing Partial Messages)

现在我们已经明确了由 Message Reader 负责存储不完整message 直到接收到完整的 message。现在我们还需要知道部分 message 的存储该如何来实现。

在设计的时候我们需要考虑两个关键因素:

  1. 我们希望尽可少的拷贝消息数据,拷贝越多则性能相对越低;
  2. 我们希望完整的消息存储在连续的字节序列中,使解析消息更容易。

为每个 Message Reade 分配 Buffer(A Buffer Per Message Reader)

显然不完整的消息数据需要存储在某种 buffer 中。比较直接的办法是我们为每个 Message Reader 都分配一个内部的 buffer,但是这个 buffer 分配多大才合适呢?这个 buffer 必须能存储下一个 message 最大的大小。如果一个 message 最大是 1MB,那每个 Message Reader 内部的 buffer 就至少是 1MB 大小。

当在百万级别的并发链接数下,1MB 的 buffer 基本没法正常工作的。举例来说,1,000,000 x 1MB 就是 1TB 的内存大小!如果消息的最大数据量是 16MB 又需要多少内存呢?128MB 呢?

可伸缩 Buffer(Resizable Buffers)

另一个方案是在每个 Message Reader 内部维护一个容量可变的 buffer。一个可变的 buffer 在初始化时占用较少空间,在消息变得很大超出容量时自动扩容。这样每个链接就不需要都占用比如 1MB 的空间。每个链接只使用承载下一个消息所必须的内存大小就可以了。

要实现一个可伸缩的 buffer 有几种不同的办法。每一种都有它的优缺点,下面几个小结会逐一讨论它们。

拷贝扩容(Resize by Copy)

第一种实现可伸缩 buffer 的办法是初始化 buffer 的时候只申请较少的空间,比如 4KB。如果消息超出了 4KB 时那么分配一个更大的空间,比如 8KB,然后把 4KB 中的数据拷贝到这个更大缓冲区中(8KB 的内存块)中。

以拷贝方式扩容的优点是一个消息的全部数据都被保存在了一个连续的字节数组中。这使得数据解析变得更加容易。

同时它的缺点是会增加大量的数据拷贝操作。

为了减少数据的拷贝操作,你可以分析整个消息流中的消息大小,以此来找到最适合当前机器的可以减少拷贝操作的 buffer 大小。例如,你可能会注意到绝大多数的消息都是小于 4KB 的,因为他们仅仅包含了一个很小的请求和响应。这意味着消息的初始值应该设置为4KB。

同时,你可能会发现如果有一个消息大于 4KB,很可能是因为他包含了一个文件。你会可能注意到大多数通过系统的数据都是小于 128KB 的。所以我们可以在第一次扩容设置为 128KB。

最后你可能会发现当一个消息大于 128KB 后,没有什么规律可循来确定下次分配的空间大小,这意味着最后的 buffer 容量应该设置为消息最大的可能数据量。

结合这三次扩容时的大小设置,可以一定程度上减少数据拷贝。4KB 以下的数据无需拷贝。在100万的连接下需要的空间例如 1,000,000 x 4KB=4GB,对于目前大多数的服务器都扛得住。4KB 到 128KB 仅需拷贝一次,即拷贝 4KB 数据到 128KB buffer 里面。消息大小介于 128KB 和最大容量的时需要拷贝两次。首先 4KB 数据被拷贝,第二次是 128KB 的数据被拷贝,所以总共需要拷贝 132KB 数据。假设没有很多的消息会超过 128KB,那么这个方案还是可以接受的。

一旦消息被完整的处理完毕后,那么分配的内存空间将会被释放。这样在同一链接接收到的下一条消息将会再次从最小缓冲区大小开始算,这样做的必要性是确保了不同连接间内存的有效共享。大多数情况下并不是所有的链接都会在同一时刻需要大容量的 buffer。

有一篇介绍如何实现这样支持可调整大小的数组的内存缓冲区的完整文章:

一个完整的教程阐述了如何实现一个内存 buffer 使其支持扩容:Resizable Arrays

追加扩容(Resize by Append)

调整缓冲区(Buffer)大小的另一种方案是让缓冲区(Buffer)由多个数组组成。当需要调整缓冲区(Buffer)大小时只需要分配一个新的字节数组,然后把数据写入其中。

有两种方法可以实现这样的缓冲区,一种方法是分配单独的字节数组用来保留这些字节数组的列表。另一种方法是分配更大的共享字节数组切片,然后用列表把这些切片和缓冲区(Buffer)关联起来。就我个人而言,我觉得第二种切片方案稍好一点,但二者之间的差别并不大。(注:概念比较难懂,建议读者可参考原文)

这种追加扩容的方案不管是使用独立数组或切片来调整缓冲区(Buffer)的好处是,在写入数据的时候不需要额外的拷贝操作。所有的数据都可以直接从 socket(Channel) 中拷贝到数组活切片中。

这种方案的缺点也很明显,就是数据不是存储在一个连续的数组中。这会使得数据的解析变得更加复杂,因为解析器不得不同时查找每一个独立数组的结尾和所有数组的结尾。正因为我们需要在写数据时查找消息的结尾,这个模型在设计实现时会相对不那么容易。

TLV编码消息(TLV Encoded Messages)

一些协议消息格式使用 TLV 格式(Type, Length, Value)进行编码。这意味着当消息到达时,消息的总长度存储在消息的开始部分。这样就可以立即知道要为整个消息分配多少内存空间。

TLV编码使得内存管理变得更加容易。我们可以立刻知道为消息分配多少内存。即便是不完整的消息,缓冲区(Buffer)结尾后面也不会有浪费的内存。

TLV编码的一个缺点是我们需要在消息的全部数据接收到之前就分配好所需要用到的所有内存空间。因此少量连接慢但发送了大块数据的连接会占用较多内存,导致服务器无响应。

解决该问题的一个变通方法是使用包含有多个 TLV 字段的消息格式。这样我们为每个 TLV 分配内存而不是为整个的消息分配内存,并且只在消息的片段到达时才分配内存。不过消息片段很大时,任然会出现一样的问题。

另一种解决方法是为消息设置超时时间,如果在超时时间后还未接收到消息(比如10-15秒)。这可以让服务器从并发处理大块消息的巧合中恢复过来,不过还是会让服务器有一段时间无响应。另外恶意的DoS攻击会导致服务器分配大量内存。

TLV编码有不同的变种。具体使用了多少字节这样确切的类型和字段长度取决于每个独立的 TLV 编码。有的 TLV 编码把字段长度放在前面,接着放类型,最后放值。尽管字段的顺序不同,但他任然是一个TLV的类型。

TLV 编码使得内存管理更加简单,这是为什么 HTTP 1.1 是一个如此糟糕的协议的原因之一。这也是他们试图在 HTTP 2.0 协议设计中解决的问题之一,在 HTTP 2.0 中,数据以 TLV 编码的帧传输数据。也是因为这个原因我们自己设计了利用 TLV 编码的网络协议 VStack.co

写部分消息(Writing Partial Messages)

在非阻塞 IO 管道中写数据仍然是一个不小的挑战。当你调用一个非阻塞模式 Channel 的 write(ByteBuffer) 方法时,ByteBuffer 写入多少数据是无法保证的。write(ByteBuffer) 方法会返回实际写入的字节数,因此可以持续跟踪写入的字节数。这就是我们的挑战:持续记录被写入的不完整的消息直到一个消息中所有的数据都发送完毕。

为了管理部分消息的写操作,我们需要创建一个 Message Writer。正如前面的 Message Reader,我们也需要为每个 Channel 配备一个 Message Writer 来写数据。在每个 Message Writer 中我们记录准确的已经写入的字节数。

如果达到的消息量超过 Message Writer 可直接写入 Channel 的消息量,到达的消息就需要在 Message Writer 排队。然后 Message Writer 尽快将消息写入到 Channel 中。

下面是部分消息写入过程的流程图:
non-blocking-server-8.png

为了使 Message Writer 能够尽快发送数据,Message Writer 需要被一直调用,这样他就可以发送更多数据。

如果你有大量的连接那么你将会持有大量的 Message Writer 实例。检查比如100万的 Message Writer 实例来确定写数据是否缓慢。首先,许多 Message Writer 可能根本就没有数据需要发送,我们并不想检查这些实例。其次,不是所有的 Channel 都处于可写状态。我们不想浪费时间在这些非写入状态的Channel。

为了检查 Channel 是否可写,可以使用 Selector 注册 Channel。然而我们并不想把所有的 Channel 实例都注册到 Selector。试想一下,如果你有 100 万的连接且大多都是空闲的,并把 100 万连接都注册到 Selector 上。然后调用 select() 方法时就会有很多的 Channel 处于可写状态。你需要检查所有这些连接中的 Message Writer 以确认是否有数据可写。

为了避免检查所有消息的 Message Writer 实例以及那些根本没有消息需要发送给他们的 Channel 实例,我么可以采用入校两步策略:

  1. 当有消息被写入 Message Writer ,Message Writer 向 Selector 注册其相关 Channel(如果还未注册的话)。
  2. 当服务器有空闲时,可以检查 Selector 注册在上面的 Channel 实例是否处于可写状态。对于每个可写的 Channel,可以请求 Message Writer 将数据写入 Channel。如果 Message Writer 已经把所有的消息都写入 Channel,则把 Channel 从 Selector 上解绑。

这两个步骤确保了只有有消息要写入的 Channel 实际上才会被注册到 Selector。

汇总(Putting it All Together)

正如你所知到的,一个被阻塞的服务器需要时刻检查当前是否有任何的新的完整的消息发送过来。在一条或多条消息被完整的收到前,服务器可能需要检查多次。检查一次是不够的。

类似的,服务器也需要时刻检查当前是否有任何需要写入的数据。如果有的话,服务器需要检查相应的连接看他们是否处于可写状态。仅仅在消息第一次进入队列时检查是不够的,因为一个消息可能被部分写入。

总而言之,一个非阻塞式的服务器要三个管道,并且经常执行:

  1. 读数据管道,用来检查打开的连接是否有新的数据到达;
  2. 处理数据管道,负责处理接收到的完整消息;
  3. 写数据管道,用于检查是否有数据可以写入打开的连接中;

这三个管道在循环中重复执行。你也可以尝试优化它的执行。比如,如果没有消息在队列中等候时,那么可以跳过写数据管道。或者,如果没有收到新的完整消息,你甚至可以跳过处理数据管道。

下面这张流程图阐述了整个服务器循环过程:
non-blocking-server-9.png

如果你还是感觉这个比较复杂难懂,可以 clone 我们的源码仓:Java NIO Server 也许亲眼看到了代码会帮助你了解这一块是如何实现的。

服务器线程模型(Server Thread Model)
GitHub 资源库里面的非阻塞式服务器实现使用了一个包含两条线程的线程模型。第一个线程负责接收来自 ServerSocketChannel 的传入连接。第二个线程负责处理接受的连接,意思包括读取消息,处理消息并将响应写回连接。这两个线程模型的图解如下:
non-blocking-server-10.png

英文原文链接:http://tutorials.jenkov.com/java-nio/non-blocking-server.html

------完------

推荐阅读:

Java NIO 简明教程 之 Java NIO 通道之间的数据传输(Channel to Channel Transfers)

Java NIO 简明教程 之 Java NIO 选择器(Selector)

Java NIO 简明教程 之 Java NIO 文件通道(FileChannel)

Java NIO 简明教程 之 Java NIO 套接字通道(SocketChannel)

Java NIO 简明教程 之 Java NIO 服务端套接字通道(ServerSocketChannel)

Java基础知识面试题篇(2020年2月最新版)

更技术学习请进入七日书摘官方群: 七日书摘官方群

七日书摘官方群群聊二维码.png

参考资源:
https://blog.csdn.net/Andrew_Yuan/article/details/80215164
https://wiki.jikexueyuan.com/project/java-nio-zh/java-nio-socketchannel.html