This article also has an English version.
本系列文章主要介绍如何设计和实现一个基于 io-uring 的 Thread-per-core 模型的 Runtime。
我们的 Runtime 最终产品 Monoio 现已开源,你可以在 github.com/bytedance/monoio 找到它。
- Rust Runtime 设计与实现-科普篇
- Rust Runtime 设计与实现-设计篇-Part1
- Rust Runtime 设计与实现-设计篇-Part2
- Rust Runtime 设计与实现-组件篇
- Rust Runtime 设计与实现-IO兼容篇
本文是系列的第四篇,前面该讲的设计基本讲完了,这里主要说说 channel 等组件。
Channel in Go and in Rust
在我们熟悉的 Golang 中,所有 Task 一定是 goroutine,通信基本是靠 channel 的。但是在 Rust 中,Future 和 Task 不一定是一个东西,多个 Future 可以通过 select 等嵌套,然后其本质上还是单个 Task,性能上也相比 Golang 的模型要好。
但是即便是如此,在 Rust 中我们还是有很强的使用 channel 的需求。在 thread-per-core 模型下,我们可以使用仅本线程的 channel 来提升性能。
这部分的内容已经开源为 local-sync crate,它并不依赖我们的 Runtime,你可以在合适的场景独立使用它。
https://github.com/monoio-rs/local-sync
MPSC Channel
MPSC = Multiple Producer Single Consumer。
我们的 mpsc channel 需要提供两种模式,一个是 bounded 一个是 unbounded。
在 bounded 模式下,我们的 send 可能会 await,因为可能容量不够写入,需要等消费。
在 unbounded 模式下,send 不需要等待。因为不限制存储空间,所以即便消费端卡住,我们在空间不够时也可以随意地开辟新的空间来存储。
当然,因为 channel 里可能没有数据,所以两种模式下的 recv 都是异步接口。
最终实现效果:
1 |
|
存储数据结构
我们需要先设计底层存储数据结构。为了能够低成本地扩容,并且不产生大量内存申请和释放,我们设计出了两种方案可以选择。
链表 + 固定大小 Block
这种设计下每个 block 大小一致,block 之间使用单向链表维护。
读数据时从 head 的 begin 位置读,写数据时向 tail 的 end 位置写。当一块数据被度完时,它会被插入到 tail 之后继而可以循环使用,避免重复释放和分配。
这样当缓存数据量增多时会付出 数据峰值 / BLOCK_SIZE
次的内存分配。
一个额外的好处是,我们可以全局缓存 Block 继而进一步优化性能。
链表 + 指数扩大 Block
这个方案也是使用链表维护 blocks,理解起来可以认为平滑扩容,即扩容时仅写入新的 ring,旧 ring 在消费完时再释放。
每次 ring 扩容即可扩容为原有的两倍大小,这样可以在峰值流量下只需要付出 log(数据峰值) 次内存分配。
结合实现复杂度,我们最后决定采用方案1:链表 + 固定大小 Block。Tokio 的 mpsc 也是这种结构,不过它这么做更多的是为了无锁并发考虑。
多 Block 通过单向链表组成 Queue,Queue 提供 push 和 pop 能力(无大小限制)。
Queue 在 push 和 pop 时会保证指针指向内容的合法性,在适当的时机会移动 block 指针。
等待机制
我们的等待需要两种:
- Bounded 实现下,发送者需要等待空槽
- 接收者需要等待数据
第二种等待非常易与实现:因为 mpsc 本身就只有一个 consumer,当它需要等待数据时,只需要将自己对应 waker 设置在 channel 中,当有数据写入时,会额外检查这个 waker 并 wake 它。
第一种等待的实现略复杂,复杂度在于可以有一堆等待者。并且为了避免饥饿,还需要先到先得地唤醒。那么显然这个结构可以直接利用 VecDeque 或链表存储。当等待者被 drop 的时候,还需要将自己摘掉,这样一来就只有类似 Slab 或者链表可以使用了。
我们这里参考 Tokio 的做法,使用链表存储。将这个结构抽象为 Semaphore,既可以满足 mpsc 之用,又可以单独暴露给用户使用,还可以拿来实现其他同步结构。
分层抽象
前面有了 Block
,我们在 Block
的基础上构建了与异步无关的同步存储 Queue
,它是无限容量的。
前面也提到有两种等待,但发送方的等待只有在 bounded 的情况下使用。
所以我们在 Queue
的基础上抽象出一个底层结构 Chan
,提供大家的共用的逻辑,只实现第二种等待:
- 暴露 recv 接口,允许等待。
- 暴露同步 send 接口,上层调用者需要自己维护容量限制和 sender 等待。
在 Chan
的基础上我们要封装出 bounded 和 unbounded 两种实现。unbounded 下这个约等于原样转发,因为我们的 Chan
本身就可以理解为一个 unbounded channel。
对于 bounded channel,我们需要实现容量限制和发送方的唤醒机制。这部分就要引入前一小节提到的 semaphore。我们利用 semaphore 管理和等待空槽数量(同时还顺便拿来管理接收者状态)。
所以我们在 Chan
层面集成 semaphore。由于上层需求不同,所以这里 Semaphore 其实是个 trait。对于 bounded channel,我们直接使用 Semaphore 实现;对于 unbounded,我们可以实现一个空的 Semaphore。
1 | pub trait Semaphore { |
Oneshot Channel
TODO
Once Cell
TODO
Semaphore
TODO
Q & A
Q:都 thread local 了为啥要搞同步能力?
A:并发 != 并行,所以即便是单线程,也会同时执行多个 task,而 task 之间需要通信,有依赖关系,所以需要一个组件来支持它们异步等待。
Q:为啥不用开源库?
A:目前没有基本能用的同类开源组件。目前主流 Rust Runtime 是 Tokio,非 thread per core 模型,所以使用自带跨线程同步能力的数据结构。
有一个还算可用的是 local-channel 但是其实现上一来性能不佳(使用 Vec 存储,扩容时会产生数据拷贝),二来功能上有所欠缺(只有 unbounded mpsc 实现,无法做 back pressure 等),所以我们在考虑之后决定自己造轮子。
结语
本系列文章到此结束。文章里难免会有一些笔误或者我理解上的错误。如果你想与我讨论,可以直接邮件我(在关于页有联系方式);如果是 Monoio 相关相关的问题,也可以直接在 Github 上提 Issue 或 Discussion。
Monoio 目前仍处于非常不完善的阶段,期待你的贡献:)
另外,我们还搭建了一个国内的 crates.io 和 rustup 的镜像,欢迎使用 RsProxy !
如果你想转载本博客的文章,麻烦注明出处谢谢。
UPDATE: 又写了个第五篇,别急走。