基于 Kafka 的 WAL

架构

在本节中,将会介绍一种分布式 WAL 实现(基于 Kafka)。表的预写日志(write-ahead logs,以下简称日志)在本实现中是按 region 级别管理的,region 可以简单理解为多个表的共享日志文件。

如下图所示,在本实现中将 region 映射到 Kafka 中的 topic(只有一个 partition)。 通常一个 region 需要两个 topic ,一个用于存储日志,另一个用于存储元数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
                                                 ┌──────────────────────────┐
                                                 │         Kafka            │
                                                 │                          │
                                                 │         ......           │
                                                 │                          │
                                                 │ ┌─────────────────────┐  │
                                                 │ │      Meta Topic     │  │
                                                 │ │                     │  │
                                         Delete  │ │ ┌─────────────────┐ │  │
               ┌──────────────────────┐  ┌───────┼─┼─►    Partition    │ │  │
               │       HoraeDB        │  │       │ │ │                 │ │  │
               │                      │  │       │ │ └─────────────────┘ │  │
               │ ┌──────────────────┐ │  │       │ │                     │  │
               │ │       WAL        │ │  │       │ └─────────────────────┘  │
               │ │      ......      │ │  │       │                          │
               │ │ ┌──────────────┐ │ │  │       │ ┌──────────────────────┐ │
               │ │ │    Region    │ │ │  │       │ │     Data Topic       │ │
               │ │ │              ├─┼─┼──┘       │ │                      │ │
               | | | ┌──────────┐ │ │ │          │ │ ┌──────────────────┐ │ │
               │ │ │ │ Metadata │ │ │ │          │ │ │    Partition     │ │ │
               │ │ │ └──────────┘ │ │ │    Write │ │ │                  │ │ │
Write ─────────┼─┼─►              ├─┼─┼───┐      │ │ │ ┌──┬──┬──┬──┬──┐ │ │ │
               │ │ │ ┌──────────┐ │ │ │   └──────┼─┼─┼─►  │  │  │  │  ├─┼─┼─┼────┐
               │ │ │ │  Client  │ │ │ │          │ │ │ └──┴──┴──┴──┴──┘ │ │ │    │
Read ◄─────────┼─┼─┤ └──────────┘ │ │ │          │ │ │                  │ │ │    │
               │ │ │              │ │ │          │ │ └──────────────────┘ │ │    │
               │ │ └──▲───────────┘ │ │          │ │                      │ │    │
               │ │    │ ......      │ │          │ └──────────────────────┘ │    │
               │ └────┼─────────────┘ │          │         ......           │    │
               │      │               │          └──────────────────────────┘    │
               └──────┼───────────────┘                                          │
                      │                                                          │
                      │                                                          │
                      │                        Read                              │
                      └──────────────────────────────────────────────────────────┘

数据模型

日志格式

日志格式采用了在 基于 RocksDB 的 WAL 中定义的通用格式。

元数据

每个 region 都将在内存和 Kafka 中维护其元数据,我们在这里称之为 RegionMeta。它可以被认为是一张映射表,以表 ID 作为键,以 TableMeta 作为值。我们简要介绍一下 TableMeta 中的变量:

  • next_seq_num,为下一条写入日志分配的 sequence number。
  • latest_marked_deleted,表最后一次触发 flush 时对应的 sequence number, 所以对应 sequence number 小于该值的日志都将被标记为可以删除。
  • current_high_watermark, 该表最近一次日志写入后,Kafka 对应 topic 的高水位。
  • seq_offset_mapping,sequence number 和 Kafka 对应 topic offset 的映射,每次 flush 后,会将 latest_marked_deleted 前的条目进行清理。
┌─────────────────────────────────────────┐
│              RegionMeta                 │
│                                         │
│ Map<TableId, TableMeta> table_metas     │
└─────────────────┬───────────────────────┘
                  │
                  │
                  │
                  └─────┐
                        │
                        │
 ┌──────────────────────┴──────────────────────────────┐
 │                       TableMeta                     │
 │                                                     │
 │ SequenceNumber next_seq_num                         │
 │                                                     │
 │ SequenceNumber latest_mark_deleted                  │
 │                                                     │
 │ KafkaOffset high_watermark                          │
 │                                                     │
 │ Map<SequenceNumber, KafkaOffset> seq_offset_mapping │
 └─────────────────────────────────────────────────────┘

主要流程

我们主要关于对于单个 region 的主要操作,会介绍以下操作的主要流程:

  • 打开或创建 region。
  • 读写日志。
  • 删除日志。

打开或创建 region

步骤

  • 在打开的 namespace 中搜索 region。
  • 如果 region 存在,最重要的事是去恢复其元数据,恢复过程将在之后介绍。
  • 如果 region 不存在并且需要自动创建,则需要在 Kafka 上创建对应的 topic。
  • 在 cache 中插入相应 region 并将其返回。

恢复

上面提到,RegionMeta 实际就是以表 ID 为键,以 TableMeta 为值的映射表。因此,我们在本节中只关注特定 TableMeta 的恢复即可,将在每步的介绍中加入例子以作更好的说明。

  • 从快照中恢复。我们会在某些场景下为 RegionMeta制作快照(例如当标记日志为可删除时,真正清理日志时),并且将其写到 meta topic 中,快照实际上就是在某个时间点的 RegionMeta。当恢复 region 时,我们可以使用快照来避免扫描 data topic 的全部数据。下面为上述过程对应的例子,我们从在 Kafka 高水位为 64 的时间点时制作的快照中恢复 RegionMeta
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
high watermark in snapshot: 64

 ┌──────────────────────────────┐
 │         RegionMeta           │
 │                              │
 │          ......              │
 │ ┌──────────────────────────┐ │
 │ │       TableMeta          │ │
 │ │                          │ │
 │ │ next_seq_num: 5          │ │
 │ │                          │ │
 │ │ latest_mark_deleted: 2   │ │
 │ │                          │ │
 │ │ high_watermark: 32       │ │
 │ │                          │ │
 │ │ seq_offset_mapping:      │ │
 │ │                          │ │
 │ │ (2, 16) (3, 16) (4, 31)  │ │
 │ └──────────────────────────┘ │
 │          ......              │
 └──────────────────────────────┘
  • 从日志数据中恢复。 当从快照中恢复的过程完成后,我们以快照被制作时 data topic 中的高水位为起点,扫描其中的日志数据进行后续恢复,明显这能够避免扫描 data topic 中的全部数据。以下为上述过程的例子:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌────────────────────────────────────┐
│                                    │
│    high_watermark in snapshot: 64  │
│                                    │
│  ┌──────────────────────────────┐  │
│  │         RegionMeta           │  │
│  │                              │  │
│  │          ......              │  │
│  │ ┌──────────────────────────┐ │  │
│  │ │       TableMeta          │ │  │
│  │ │                          │ │  │
│  │ │ next_seq_num: 5          │ │  │                  ┌────────────────────────────────┐
│  │ │                          │ │  │                  │          RegionMeta            │
│  │ │ latest_mark_deleted: 2   │ │  │                  │                                │
│  │ │                          │ │  │                  │            ......              │
│  │ │ high_watermark: 32       │ │  │                  │ ┌────────────────────────────┐ │
│  │ │                          │ │  │                  │ │         TableMeta          │ │
│  │ │ seq_offset_mapping:      │ │  │                  │ │                            │ │
│  │ │                          │ │  │                  │ │ next_seq_num: 8            │ │
│  │ │ (2, 16) (3, 16) (4, 31)  │ │  │                  │ │                            │ │
│  │ └──────────────────────────┘ │  │                  │ │ latest_mark_deleted: 2     │ │
│  │          ......              │  │                  │ │                            │ │
│  └──────────────────────────────┘  ├──────────────────► │ high_watermark: 32         │ │
│                                    │                  │ │                            │ │
│ ┌────────────────────────────────┐ │                  │ │ seq_offset_mapping:        │ │
│ │          Data topic            │ │                  │ │                            │ │
│ │                                │ │                  │ │ (2, 16) (3, 16) (4, 31)    │ │
│ │ ┌────────────────────────────┐ │ │                  │ │                            │ │
│ │ │        Partition           │ │ │                  │ │ (5, 72) (6, 81) (7, 90)    │ │
│ │ │                            │ │ │                  │ │                            │ │
│ │ │ ┌────┬────┬────┬────┬────┐ │ │ │                  │ └────────────────────────────┘ │
│ │ │ │ 64 │ 65 │ ...│ 99 │100 │ │ │ │                  │             ......             │
│ │ │ └────┴────┴────┴────┴────┘ │ │ │                  └────────────────────────────────┘
│ │ │                            │ │ │
│ │ └────────────────────────────┘ │ │
│ │                                │ │
│ └────────────────────────────────┘ │
│                                    │
└────────────────────────────────────┘

读写日志

读写流程比较简单。

写流程:

  • 打开指定的 region,如果不存在则需要创建。
  • 利用 client 将日志写入到 region 对应的 data topic 中。
  • 更新 TableMeta 中的 next_seq_num, current_high_watermarkseq_offset_mapping等元数据,

读流程:

  • 打开指定的 region。
  • 读取 region 的所有日志数据,按表切分数据和回放等工作需要调用者实现。

删除日志

日志的删除可以划分为两个步骤:

  • 标记日志为可删除。
  • 利用后台线程做延迟清理。

标记

  • 更新在 TableMeta 中的 latest_mark_deletedseq_offset_mapping(需要进行维护,使得每一条目的 sequence number 大于等于更新后的 latest_mark_deleted)。
  • 或许我们需要在删除表的时候,制作并及时同步 RegionMeta 的快照到 Kafka 中。

清理

清理逻辑如下,会在后台线程中执行:

  • 制作 RegionMeta 的快照。
  • 根据快照判断是否需要进行清理。
  • 如果需要,先同步快照到 Kafka 中,然后清理日志。