PG 中的事务: XLOG

Posted by w@hidva.com on February 8, 2020

本文章内容根据 PG9.6 代码而来.

XLOG, 也即 WAL 在 PG 中的别称, 我们这里不对 WAL 背后的细节背景做过多介绍, 更专注与 PG 中是如何实现 WAL 的, 即 PG 的 XLOG 模块. 在 PG 之中, 每一条 XLOG 都对应着一个 XLogRecord, 其内存放着所有与当前 XLOG 相关的信息, 大致来说主要就是:

  • 当前 XLOG 的归属者, 即当前 xlog 要交给哪个 resource manager 负责解释解析以及 REDO. 每个 XLOG 在生成时都必须指定对应的 resource manager id, 在 XLOG REDO 阶段, 会根据这个 id 找到对应的 redo 回调函数, 然后执行该函数来 REDO.

  • buffer 列表, buffer 列表记录着当前 WAL record 所影响到的 data page 相关元信息(如所在库, 所对应表, block no等). 在 WAL record 中, 每一个 buffer 都被一个唯一的 ID 标识着, 在当前 WAL 内可以通过这个 ID 来引用这个 buffer.

  • buffer data; 与指定 buffer 关联的数据. PG XLOG 本身不会对这个数据进行任何解析解释操作.

  • main data, 不与任何 buffer 关联的数据.

这些信息被 PG 按照一定的布局编码起来, 具体编码格式我个人觉得不必要过于细节地了解.

考虑到 WAL 一直追加的特性, 可以想象到一个 PG 实例生成的所有 XLogRecord 会一直追加到一个无限增长的文件中, 那么对于每一个 XLogRecord 来说, 可以用其在这个文件中的偏移来唯一标识着当前 XLogRecord. 这也就是 PG 的实际做法, PG 将这个可用来唯一标识 XLogRecord 的偏移又称为 LSN, log sequence number. 只不过现实世界中文件并不能一直无限增长, 所以 PG 会将这个 “无限增长的” 文件按照固定大小(默认 16M)切分成多个文件. 每个文件都会根据该文件第一个字节的 LSN 来命名. 但是文件个数一直无限增长也不是个事啊, 所以 PG 引入了 checkpoint 机制来回收文件. 简单来说每次 PG 执行 checkpoint 时, 都会先写入一条标识着 checkpoint record 的 XLogRecord, 之后 checkpoint 会对 checkpoint record 之前的 XLOG 都执行 REDO 操作并将相关副作用持久化保存起来. 这就意味着当 checkpoint 成功结束之后我们就不再需要 checkpoint record 之前的 XLogRecord 了, 因此存放着这些 XLogRecord 的文件就可以被安全地回收了. PG 会将最新的 checkpoint record 的 LSN 保存在 pg_control 文件之中, 当一个 PG 实例重启时, 会首先读取 pg_control 文件保存着的 LSN 信息, 之后只需要 REDO 这个 LSN 之后的 XLogRecord 即可.

接下来我们会详细介绍一下 XLOG 的写入链路, 考虑到现代 PG 代码中为了最大化性能引入了太多的复杂性, 我们会先实现一个最简单最朴素但也可以集成到 PG 中正常运作的 XLOG 写入链路, 来展示 XLOG 写入链路需要做哪些工作. 然后再一点点地引入一些复杂性来试图优化提升这条 xlog 写入链路的性能.

为此我们首先把目光聚焦到 XLogInsertRecord() 这个函数, PG 中所有可能会写入 XLOG 的模块最终都会掉用到这个函数来完成实际的写入工作. 该函数原型如下所示:

typedef struct XLogRecData
{
	struct XLogRecData *next;	/* next struct in chain, or NULL */
	char	   *data;			/* start of rmgr data to include */
	uint32		len;			/* length of rmgr data to include */
} XLogRecData;

XLogRecPtr XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn);

其中 XLogRecData 就是类似与 writev/readv 系列函数所用到的 iovec 结构体, rdata 指向着已经编码后的 XLogRecord, rdata 指向着的链表中每一个元素都存放着这个 XLogRecord 一段内容. fpw_lsn 参数与 PG full page write 特性有关, 不准备在这篇文章中做介绍, 暂时可以先忽略这个参数. XLogInsertRecord 返回的是当前 XLogRecord 最后一个字节下一个字节的 LSN. 可以简单看做是 “下一条” XLogRecord 的 LSN, 但考虑到对齐, page header 的存在, 实际上并不一定是.

然后再字节级别详细看下一个 XLOG 文件的具体布局. 与 PG 中其他类型的文件, 比如像 clog, heap file 等一样. XLOG 文件首先按照固定大小(默认 8K)分隔成块, 然后固定数目的块拼接成一个 segment, 一个 segment 就是一个物理文件. 每一个 segment 大小也是固定的, 默认 16M. segment 中每一个块(页)都有自己的 header, segment 中第一个块(页)的 header 用结构体 XLogLongPageHeaderData 表示, 其余块的 header 用结构体 XLogPageHeaderData 来表示. 关于这些结构体中每一个字段具体语义参考 PG 源码注释即可. 另外 PG 中写入的每条 XLogRecord 第一个字节在 segment 中的偏移都要求 MAXALIGNed, 在 64bit 机器上, 即 8 字节对齐, 所以有些时候在相邻 XLogRecord 之间可能需要加入一些 padding.

正是由于 page header 以及 padding 的存在, 使得对于 XLogRecord 中任意一个字节, 便有了两种标识方法. 第一个是抽象表示, 此时不考虑 page header 的存在, 任意字节的标识由此时其偏移决定, 下文以 bytepos 来称呼这种表示方法. 另外一种便是上面提到的 LSN. 之所以引入这两种表示方法, 我个人感觉是为了方便实现. 比如在为 XLogRecord 分配空间的时候, 使用 bytepos 更简单直观一下. 在基于 bytepos 实现的空间分配逻辑中, 我们只需要使用一个状态 uint64 CurrBytePos, 表示着下一个 record 的地址. 之后在空间分配时只需要:

currbytepos = CurrBytePos;
CurrBytePos += record_size;

这样 [currbytepos, CurrBytePos) 便是当前 XLogRecord 可以使用的空间.

但是基于 LSN 来实现空间分配逻辑, 那么 CurrBytePos += record_size; 这一步便不得不考虑 record_size 占了多少 page, 有哪些 page 的 header 是 XLogLongPageHeaderData 类型, 然后计算出 page header 所占的字节数, 再根据此来更新 CurrBytePos. 很显然这很是麻烦.

bytepos 与 LSN 之间可以互相转换. 这之间逻辑也不是很复杂, 在 bytepos 到 LSN 的转换中, 根据 bytepos 的值可以计算 bytepos 跨越过了多少 segment file, 以及跨越了多少 page, 以此计算出需要额外添加的 page header size. 然后再加到 bytepos 上结果便是 LSN 了. 函数 XLogBytePosToEndRecPtr() 便是实现了这一过程. 同样由于 page header 的存在, 当 XLogBytePosToEndRecPtr() 返回的 LSN 位于页边界时, 若此时我们想在对应 bytepos 处写入一个 xlogrecord, 那么此时该 xlogrecord 应该写入到返回的 LSN 加上 sizeof(page header) 之后的位置. 这也就是函数 XLogBytePosToRecPtr() 的做法. 当 XLogBytePosToEndRecPtr(bytepos) 返回的 LSN 位于页边界时, XLogBytePosToRecPtr(bytepos) 返回的 LSN 会在此基础上加上 page header size 来进行调整. 其他情况下, XLogBytePosToRecPtr(bytepos) 就等同于 XLogBytePosToEndRecPtr(bytepos).

对了, 除了 XLogInsertRecord() 之外还有个 XLogFlush() 函数, 其原型:

void XLogFlush(XLogRecPtr recptr);

该函数的语义是当 XLogFlush() 返回时, recptr 指向的 LSN 之前的内容已经全部 fsync 到持久化设备中了. 一般会在事务提交时调用该函数.

在了解完这些必须的前提知识之后, 我们便可以开始着手实现我们自己的 XLOG 写入链路了. 首先先将链路会用到的一些状态保存在共享内存中:

struct XLOGV1Insert {
	uint64 prev_pos;
	uint64 curr_pos;
	SharedMutex mux;
};

static XLOGV1Insert *g_insertctx;

这里 g_insertctx 指向着共享内存中一块区间, 可以被 PG 所有 backend 访问. g_insertctx->curr_pos 存放着下一条 xlogrecord 应写入的位置对应的 bytepos 表示值, curr_pos 总是 MAXALIGNed, 其起始值是 UsableBytesInSegment. g_insertctx->prev_pos 存放着最后一次写入的 xlogrecord 第一个字节对应的 bytepos 表示值, 即 XLogBytePosToRecPtr(g_insertctx->prev_pos) 便是最后一次写入的 xlogrecord 的 LSN. g_insertctx->mux 是一个可用于多进程同步的锁. 实际上这也是当前 PG 在实现 XLOG 写入链路时用到的状态. 每次实例启动时, 会调用 StartupXLOG(), 其内会读取当前现存的所有 segment file, 以此来初始化 prev_pos, curr_pos.

之后每当 XLogInsertRecord() 调用时, 我们会首先:

std::lock_guard<SharedMutex> _guard(g_insertctx->mux);

加个强有力的大锁, 确保任意时刻只有一个 backend 会实际写入 XLOG. 对, 此时性能表现并不是很好. 之后:

XLogRecord *rechdr = (XLogRecord *) rdata->data;
bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID && rechdr->xl_info == XLOG_SWITCH);
if (isLogSwitch) {
  elog(PANIC, "isLogSwitch: true");
}

判断当前写入的 XLogRecord 是否是一条 logswitch xlogrecord. logswitch xlogrecord 的语义是用来强制填满当前在写入的 segment file, 使得下一条 xlogrecord 会被写入到一个新的 segment file 中. 若 XLogBytePosToEndRecPtr(g_insertctx->curr_pos) 返回的 LSN 已经位于 segment 边界, 那么 logswitch xlogrecord 就没有必要了, 此时 PG 便会忽略这个 logswitch xlogrecord. 其他情况下, logswitch xlogrecord 会被写入并且占满当前 segment file 所有空间, 使得下一条 xlogrecord 会被写入到一个新的 segment file 中. 是的, 我们这里暂且不处理 logswitch record.

然后:

rechdr->xl_prev = XLogBytePosToRecPtr(g_insertctx->prev_pos);
COMP_CRC32C(rechdr->xl_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rechdr->xl_crc);

根据 g_insertctx 指向的状态计算出待写入 XLogRecord 字段 xl_prev 的取值, 并重新计算一下 xlogrecord 的 CRC.

char padding_buf[MAXIMUM_ALIGNOF]{};
uint32 aligned_size = MAXALIGN(rechdr->xl_tot_len);
uint32 padding_size = aligned_size - rechdr->xl_tot_len;
XLogRecData padding;
padding.next = NULL;
padding.data = padding_buf;
padding.len = padding_size;
AppendRecData(rdata, &padding);

对齐! 别忘了 xlogrecord 要求 MAXALIGNed, 以及 g_insertctx->curr_pos 始终 MAXALIGNed 这一不变量. 因此我们需要确保在当前 xlogrecord 写入之后, 额外填充必需的 padding, 使得调整后的 g_insertctx->curr_pos 仍是 MAXALIGNed, 使得下一条 xlogrecord 也是 MAXALIGNed. 考虑到目前在 XLogInsertRecord 的所有使用场景中, 在该函数返回值后, rdata 便不再被使用, 所以我们会将可能需要的 padding 追加到 rdata 末尾一起写入. 是的, 我们开始实际的写入工作了:

int filefd = -1;
uint32 remain_size = aligned_size;
uint32 rdata_left = rdata->len;
XLogRecPtr curfilepos = XLogBytePosToEndRecPtr(g_insertctx->curr_pos);
/* rdata */

在此过程中, 我们需要如上几个状态. 其中 remain_size 记录着当前 xlogrecord 剩下未被写入的字节数. rdata 指向着当前正在写入的 XLogRecData 结构, rdata_left 记录着当前 rdata 中尚未被写入的字节数. curfilepos 记录着下一个待写入字节对应的 LSN. 若 filefd >= 0, 那么其指向着 curfilepos 所在的 segment file, 并且此时文件偏移已经被调整到 curfilepos 所对应的位置. 实际的写入我们会使用一个循环来实现:

while (remain_size > 0) {

若发现当前还有未写入的内容:

  if (filefd < 0) {
    filefd = Open(curfilepos);
  }

首先看下是否打开了 curfilepos 所在的文件, 若没有, 则调用 Open() 打开指定的文件, 并调整文件偏移指针.

然后计算当前 curfilepos 所处的 page 剩余的空间, 若此时 curfilepos 正处于 page 边界, 则还需要填充 page header, 若 curfilepos 同时也处于 segment 边界, 那还需要填充 long page header:

  uint32 freespace;  /* free space in current page */
  if (curfilepos % XLOG_BLCKSZ == 0) {
    XLogLongPageHeaderData pageheader;
    MemSet(&pageheader, 0, sizeof(pageheader));
    pageheader.std.xlp_magic = XLOG_PAGE_MAGIC;
    pageheader.std.xlp_tli = ThisTimeLineID;
    pageheader.std.xlp_pageaddr = curfilepos;
    if (remain_size < aligned_size) {
      Assert(remain_size > padding_size);
      pageheader.std.xlp_rem_len = remain_size - padding_size;
      pageheader.std.xlp_info |= XLP_FIRST_IS_CONTRECORD;
    }
    pageheader.std.xlp_info |= XLP_BKP_REMOVABLE;  /* set only when forcePageWrite is true. */
    if (curfilepos % XLogSegSize == 0) {
      pageheader.xlp_sysid = ControlFile->system_identifier;
      pageheader.xlp_seg_size = XLogSegSize;
      pageheader.xlp_xlog_blcksz = XLOG_BLCKSZ;
      pageheader.std.xlp_info |= XLP_LONG_HEADER;
      freespace = XLOG_BLCKSZ - sizeof(pageheader);
    } else {
      freespace = XLOG_BLCKSZ - sizeof(pageheader.std);
    }
    uint32 writen = XLOG_BLCKSZ - freespace;
    Write(filefd, &pageheader, writen);
    curfilepos += writen;
  } else {
    freespace = XLOG_BLCKSZ - curfilepos % XLOG_BLCKSZ;
  }

实际上 PG 这里要求 page 中第一个存放着 xlogrecord 内容的字节的位置也要是 MAXALIGNed, 即我们可能需要在 page header 实际内容之后额外填充一些 padding. 幸运的是这里无论是 XLogLongPageHeaderData 还是 XLogPageHeaderData, 其 size 正好是 MAXALIGNed, 所以就不需要额外 padding 了.

  uint32 writen = rdata_left > freespace ? freespace : rdata_left;
  Write(filefd, rdata->data + (rdata->len - rdata_left), writen);
  remain_size -= writen;
  rdata_left -= writen;
  curfilepos += writen;

然后就可以开始实际的写入操作, 并之后更新相关状态. 这里若当前 segment file 已经写满, 即调整后的 curfilepos 处于了 segment 边界:

  if (curfilepos % XLogSegSize == 0) {
    Close(filefd);
    filefd = -1;
  }

那么我们便需要关闭当前已经打开的 segment 文件. 有必要的话, 下次循环会打开新的 segment file.

最后若当前 rdata 已经被消费完, 那要记得同时更新 rdata 与 rdata_left.

  if (rdata_left == 0 && remain_size > 0) {
    rdata = rdata->next;
    rdata_left = rdata->len;
  }

如此这般, 等 xlogrecord 所有内容都已经被写入完成之后, 若 filefd 仍有效, 那么记得关闭. 之后再按照目前 PG 的要求更新一些全局状态, 就可以直接返回啦

if (filefd >= 0) {
  Close(filefd);
}
g_insertctx->prev_pos = g_insertctx->curr_pos;
g_insertctx->curr_pos += aligned_size;

MarkCurrentTransactionIdLoggedIfAny();
ProcLastRecPtr = XLogBytePosToRecPtr(g_insertctx->prev_pos);
XactLastRecEnd = curfilepos;
return curfilepos;

然后针对 XLogFlush() 的实现, 由于在 XLogInsertRecord() 中, 每次 Close(fd) 时都会先 fdatasync(fd) 一下, 即每次 XLogInsertRecord() 返回时, curfilepos 之前的内容都已经持久化了. 所以 XLogFlush() 我们直接 return 就行了, 不需要做额外的动作.

至此我们的 XLOG 写入链路就已经实现完毕了. 完整的 patch 可以见 这里. 来, 集成到 PG 中跑下 pgbench 看下数据咋样.

$ pgbench -i -s 100 -n pgbench
...
10000000 of 10000000 tuples (100%) done (elapsed 32.30 s, remaining 0.00 s)
set primary keys...
done.
$ pgbench -c 64 -j 8 -n -r -T 180 pgbench
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 100
query mode: simple
number of clients: 64
number of threads: 8
duration: 180 s
number of transactions actually processed: 12505
latency average = 924.323 ms
tps = 69.239885 (including connections establishing)
tps = 69.245412 (excluding connections establishing)
...

与 PG 原生的比对一下:

$ pgbench -i -s 100 -n pgbench
...
10000000 of 10000000 tuples (100%) done (elapsed 36.56 s, remaining 0.00 s)
set primary keys...
done.
$ pgbench -c 64 -j 8 -n -r -T 180 pgbench
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 100
query mode: simple
number of clients: 64
number of threads: 8
duration: 180 s
number of transactions actually processed: 234102
latency average = 49.264 ms
tps = 1299.132980 (including connections establishing)
tps = 1299.274954 (excluding connections establishing)
...

果然并不怎么样嘛. 毕竟我们一没 cache, 二一把大锁, 最后每次 insert xlogrecord 还都要 fdatasync 一把.

最后, 我们这里就不准备详细介绍 PG 当前 XLOG 写入链路了, 毕竟我个人认为代码这个还是要自己亲手看一下更有效果一点. 而且我们上面也已经把一些必须的概念背景详细介绍出来了, 相信看下来应该难度也不大.

为啥要划分 block?

老实说, 本来我以为 xlog 只需要纯粹地把 record 追加到 xlog 文件中就行, 为啥这里还划分了 block? 另外也想起来 leveldb xlog 也是划分了 block, 而且也给了几点理由. 但没感受到这几点理由的说服力… 这里也大概捋了下 leveldb Reader::ReadRecord, 以及 pg XLogBeginRead(), ReadRecord() 链路可以看到对于跨越到多个 block 的 record 仍需要一个个 block 读取, 读取之后把 block 拷贝到另外一个缓冲区中. 所以完全没有感受到 block 的必要性来着.. 我们完全可以以 record 写入, 不划分 block. 然后读取时每次读取 4096 这种对文件系统友好的 size.