可能是最快的基于 io-uring 的异步 IO 框架

Posted by w@hidva.com on September 14, 2021

缘起

在 KuiBaDB 完成了初步的骨架搭建, 以及原型验证之后, 接下来要做的一个事情就是异步化改造, 毕竟 KuiBaDB 的出发点是基于 Hologres 论文实现一个开源的 Hologres, 而 Hologres 最主要的一个能力便是全异步化. 经过一番调研, KuiBaDB 准备使用 tokio 作为其异步运行时, 当然 tokio 与 Hologres 论文中提到的 HOS, EC 还是有很大差距的:

我本来一直以为 HOLO HOS 类似于 TOKIO, EC 类似于 Tokio 中 Task 概念. 但实际上并不是这样的! 这里 EC 类似于 golang 中的协程. EC 负责执行 WU, 当 EC 发现没有待执行的 WU 或者 WU 由于自身逻辑会 block 时, EC 都会把当前线程让出来. 即 tokio 中是有两层: system thread, task. 而 HOLO 中是有 3 层: system thread, EC, task. HOLO WU 类似于 tokio Task 概念. HOLO EC 也有点像 Actor, EC 也是单线程运行的, 即 EC 串行执行其收到的 WU. EC 的存在是有必要的, 设想下如果没有 EC, 即我们直接使用 tokio 那种 2 层模型, 那么对于同一表的 write sync WU, read sync WU 该如何处理? 这里 write sync WU, read sync WU 可能会被同时创建处理, 在 tokio 这种 2 层模型中, 他们可能会被 2 个线程并行处理, 即意味着在 write sync WU, read sync WU 中, 我们不得不引入同步机制. 但通过引入 EC, 同一个表的 write sync WU, read sync WU 会被扔到 1 个 EC 的任务队列中, EC 则会串行的执行这些 WU, 即不需要额外的同步逻辑了.

但 tokio 有个问题, 就是它提供的异步化文件 IO 并不是真异步, 只是将同步 io 动作 offload 到一个线程池中去执行, 这个过程中也涉及到 io 缓冲区的拷贝动作. 另外正好最近 io_uring 很火热的样子, 就想着在 KuiBaDB 尝尝鲜用下 io_uring, 具体来说 KuiBaDB 中所有需要与 kernel 打交道的地方都希望使用 io_uring 来异步化. 所以就去看了下 rust 中与 io_uring 相关的几个 crate, 但这些 crate 都不甚令人满意. tokio-uring 只支持 current thread Tokio Runtime, 完全用不上 tokio multi-threading scheduler 的威力. rio, 其代码整体实现还是比较粗糙的, 其 Uring 实例在 drop() 时并未 close uring fd, 会导致文件描述符泄漏; 而且 rio sqpoll 模式存在 bug, 并不能正常工作. 另外 rio 提交请求这一过程, 当 sq ring buffer 中没有可用空间时, 其会阻塞当前线程, 导致当前线程上其他 task 也无法被调度执行. 所以就想着自己整个 io_uring wrapper.

需求

在开始设计这个 io_uring wrapper 之前, 首先捋了下 KuiBaDB 对这个新 io_uring wrapper 可能的使用姿势. 大概如下:

iopoll_urings: Vec<Uring>,  // 开启了 IORING_SETUP_IOPOLL 的 io_uring 实例集合.
non_iopoll_urings: Vec<Uring>, // 未开启 IORING_SETUP_IOPOLL 的 io_uring 实例集合.

在 KuiBaDB 业务逻辑执行过程中, 当需要向 io_uring 提交一个新的 sqe 时, 首先根据新 sqe opcode 判断是使用 iopoll_urings 集合还是 non_iopoll_urings 集合, 之后从集合中选择一个负载最小的 io_uring 实例, 并将 sqe 提交到这个 io_uring 实例中, 之后等待这个 sqe 完成即可. 如下:

let uring = get_iopoll_uring();  // 获取一个 io_uring 实例.
let ret = uring.pwritev(fd, buf, off).await;  // 在这个实例上执行 pwritev 命令.
return ret;

所以这要求一个 io_uring 实例支持多线程并发提交. 另外希望 io_uring wrapper 避免引入不必要的 overload, 尽量做到极限的高吞吐低时延.

设计

根据上面的需求可以看到 rio 其实是符合要求的, 所以 kbio, 这个新的 io_uring wrapper 整体上是参考了 rio 实现来搞的. 这里只介绍一些与 rio 不同的地方.

使用对象池避免内存的反复分配与释放. 在 rio 中每当提交一个新 sqe 时, 会涉及到若干小内存块的动态内存分配与释放, 比如需要分配 CompletionState, 需要分配 CondVar 等. 在 kbio 中通过对象池避免了这种反复分配释放操作. 并且 kbio 中将支撑一个 sqe 的对象数目降低为 1 个, 即只需要一个 Promise 对象:

struct Promise {
    res: Option<i32>,
    waker: Option<Waker>,
}

struct PromiseSlot {
    p: parking_lot::Mutex<Promise>,
    next: Option<NonNull<PromiseSlot>>,
}

每个 sqe 都有一个对应的 PromiseSlot 对象, PromiseSlot 对象地址会保存在 sqe user_data 中. 当 kbio 从内核中收到 cqe 时, 会从 cqe user_data 中取出对应的 PromiseSlot 地址, 之后设置 res, 并调用 waker.wake() 唤醒. sqe 对应 Future::poll() 实现则是检测 res 是否不为 None, 若为 None, 则等待被唤醒, 若不为 None, 则表明 Ready. 另外在 kbio::Uring 实例构造时, 其会根据用户的 sq entires 参数预先分配指定数目的 PromiseSlot 对象, 并通过 freelist 管理这些 PromiseSlot 对象. 当用户提交一个 sqe 时, kbio 首先从 freelist 中取出一个空闲 PromiseSlot, 之后将这个 PromiseSlot 与 sqe 绑定起来. 当 sqe 完成之后, 再将其关联的 PromiseSlot 放会到 freelist 中. freelist 会被锁保护, 通过 freelist 确保 PromiseSlot 只会被一个 sqe 使用.

freelist

freelist, 是我第一次将从内核学习到设计姿势用在实际项目中, 在 linux 内核中, 大量使用了侵入式链表结果来管理对象, 比如在空闲 page 的管理中等. 这种侵入式链表结果特别适合管理预先分配好的对象池. 考虑到 kbio::Uring 中, PromiseSlot pool 预先分配了, 所以这里使用了侵入式单链表来管理空闲 PromiseSlot 集合. 在 kbio::Uring 中, Uring::free 作为这个链表的头, 其指向着第一个空闲 PromiseSlot, 之后 PromiseSlot::next 指向着下一个空闲 PromiseSlot. 在 kbio::Uring 实例构造时, 其会在完成 PromiseSlot 的预先分配之后, 通过 Uring::free, PromiseSlot::next 将这些 PromiseSlot 串联起来.

fn alloc_slots(cap: usize) -> Vec<PromiseSlot> {
    let mut slots = Vec::<PromiseSlot>::with_capacity(cap);
    slots.resize_with(cap, Default::default);
    let mut next = None;
    for item in slots.iter_mut().rev() {
        item.next = next;
        next = Some(item.into());
    }
    return slots;
}

在用户提交一个 sqe 时, 会首先从 freelist 中取走第一个空闲 PromiseSlot:

let slot = {
    let mut free = self.free.lock();
    let slot: Option<NonNull<PromiseSlot>> = *free;
    if let Some(slot) = slot {
        // 若 free 指向空闲链表不为空, 则取走第一个空闲 PromiseSlot,
        *free = unsafe { slot.as_ref().next };
        slot
    } else {
        return None;
    }
};

在一个 sqe 完成之后, 会将 sqe 对应 PromiseSlot 重新放入到 freelist 头部:

fn mark_free(&self, mut slot: NonNull<PromiseSlot>) {
    let mut free = self.free.lock();
    unsafe {
        slot.as_mut().next = *free;
    }
    *free = Some(slot);
    return;
}

sqe 提交过程是发生在 Future::poll() 调用时, 当 kbio 发现此时 freelist 为空时, 并不会像 rio 那样阻塞线程, 而是返回 Pending:

match future.state {
    State::Init(sqe) => {
        if let Some(slot) = future.uring.submit(sqe) {
            // 此时表明成功地从 freelist 中拿到了一个空闲 PromiseSlot. Future 进入 Wait 状态.
            future.state = State::Wait(slot);
            return future.on_wait(slot, cx);
        } else {
            // 此时表明 freelist 为空, 此时会返回 Pending. 但返回 Pending 之前会先 wake() 一下, 使得
            // 调度器会再次调度 Future::poll() 执行. 类似于 thread::yield_now().
            //
            // We expect that wake_by_ref() will put the task at tail of task queue.
            // But Tokio will put the task at "next task" slot, so it will run again immediately.
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }
    }
}

如注释所示, 在 freelist 为空时, 我们期望 wake() 将 task 放在 running queue 尾部, 先运行其他 task, 就像 thread::yield_now() 那样. 但实际上像 tokio scheduler, golang scheduler 都有个 next task slot 这种机制, wake() 会将 task 放在 next task slot 中, 导致 task 会立即再次被调度运行… 不太符合预期.

或许 wake() 应该加个 hint 类似参数, 用于告知调度器期望的运行时机? 但换而言之, rust task wake() 这套设计是有个前提, 当 wake() 调用时, 一定是已经就绪了. 但我们这里当 wake() 调用时, 资源并未就绪, 我们只是调用 wake() 希望 scheduler 能再次找机会调用下 Future::poll() 起到了一个轮询的意图, 所以也可能是我们使用姿势不对. 那么针对这种需要轮询的场景, 有个更好的解决方案么?

结论

这里简单使用了 examples/ 下的 nopbench, rio-nopbench 测试下 kbio 与 rio 的性能对比. bench 工具支持参数以及各个参数语义:

  • -t 指定了有多少个线程并发地向一个 io_uring 实例提交 sqe.
  • -T 指定了压测程序运行多久, 单位: 秒.
  • -b 指定了 batch size, 当前压测程序运行时, 每个负责提交 sqe 的线程会一次性构造 batch size 个 sqe(算作一个 batch), 之后将他们提交给 io uring, 并等待所有 sqe 完成之后再次重复这一行为.
  • -n 指定了 io_uring 实例 sq entires 的数目.

如下表格中 len 指的是完成的 nop 请求量, 这里单位是 batch, 即实际完成的 nop 请求量为表格中数字再乘以 batch size, 即 -b 参数对应的值, 该指标越大表示吞吐越高. 其余指标表示一个 batch 从创建, 到其中所有 nop 请求都完成时的耗时情况, 该指标越小表明时延越低.

测试参数 -b 32 -T 30 -t 1 -n 64, 即只有 1 个线程周期性向 io_uring 中提交 sqe.

  kbio rio
len 1124105 492191
max 85us 21135us
min 18us 45us
mean 26.094us 60.365us
90% 28us 69us
99% 32us 82us
99.9% 37us 107us
99.99% 55us 207us
99.999% 70us 9975us
99.9999% 85us 21135us

测试参数 -b 32 -T 30 -t 4 -n 1024, 即 4 个线程周期性向 io_uring 中提交 sqe.

  kbio rio
len 1766914 550614
max 20511us 29631us
min 19us 41us
mean 67.314us 217.273us
90% 82us 296us
99% 106us 383us
99.9% 130us 470us
99.99% 198us 791us
99.999% 15783us 20639us
99.9999% 20319us 29631us

cpu affinity

如上介绍可知, 一个 Uring 实例对应着 2 个线程, 1 个在用户态, 负责轮询 cq ring buffer, 从 cq ring buffer 取走 cqe, 之后唤醒对应的 sqe. 另一个在内核态, 即内核的 sq poll thread. 我有个大胆的想法, 就是如果我们将这两个线程都绑定到一个核上面运行, 会不会由于缓存命中率提升, 局部性原理得到大幅应用, 使得性能更上一层楼?!

  with cpu affinity without cpu affinity
len 590 1211824
max 27183us 9895us
min 3094us 1us
mean 16958.883us 7.690us
90% 18015us 12us
99% 18031us 16us
99.9% 27183us 21us
99.99% 27183us 48us
99.999% 27183us 290us
99.9999% 27183us 9319us

可以看到不光没有提升, 反而性能骤降地惊人… 其实也很明白的事儿, Uring 的这俩线程都是非常繁忙的线程, 绑核之后只能使用一个核运行, 而不绑核则可以用满两个核心… 盏一啊, 你糊涂了啊.