1 - HoraeDB 架构介绍

本文目标

  • 为想了解更多关于 HoraeDB 但不知道从何入手的开发者提供 HoraeDB 的概览
  • 简要介绍 HoraeDB 的主要模块以及这些模块之间的联系,但不涉及它们实现的细节

动机

HoraeDB 是一个时序数据库,与经典时序数据库相比,HoraeDB 的目标是能够同时处理时序型和分析型两种模式的数据,并提供高效的读写。

在经典的时序数据库中,Tag 列( InfluxDB 称之为 TagPrometheus 称之为 Label)通常会对其生成倒排索引,但在实际使用中,Tag 的基数在不同的场景中是不一样的 ———— 在某些场景下,Tag 的基数非常高(这种场景下的数据,我们称之为分析型数据),而基于倒排索引的读写要为此付出很高的代价。而另一方面,分析型数据库常用的扫描 + 剪枝方法,可以比较高效地处理这样的分析型数据。

因此 HoraeDB 的基本设计理念是采用混合存储格式和相应的查询方法,从而达到能够同时高效处理时序型数据和分析型数据。

架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
┌──────────────────────────────────────────┐
│       RPC Layer (HTTP/gRPC/MySQL)        │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│                 SQL Layer                │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │     Parser      │  │     Planner     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌───────────────────┐  ┌───────────────────┐
│    Interpreter    │  │      Catalog      │
└───────────────────┘  └───────────────────┘
┌──────────────────────────────────────────┐
│               Query Engine               │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │    Optimizer    │  │    Executor     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│         Pluggable Table Engine           │
│  ┌────────────────────────────────────┐  │
│  │              Analytic              │  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││      Wal       ││    Memtable    ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││     Flush      ││   Compaction   ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││    Manifest    ││  Object Store  ││  │
│  │└────────────────┘└────────────────┘│  │
│  └────────────────────────────────────┘  │
│  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           Another Table Engine        │  │
│  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
└──────────────────────────────────────────┘

上图展示了 HoraeDB 单机版本的架构,下面将会介绍重要模块的细节。

RPC 层

模块路径:https://github.com/apache/incubator-horaedb/tree/main/server

当前的 RPC 支持多种协议,包括 HTTP、gRPC、MySQL。

通常 HTTP 和 MySQL 用于调试 HoraeDB,手动查询和执行 DDL 操作(如创建、删除表等)。而 gRPC 协议可以被看作是一种用于高性能的定制协议,更适用于大量的读写操作。

SQL 层

模块路径:https://github.com/apache/incubator-horaedb/tree/main/query_frontend

SQL 层负责解析 SQL 并生成查询计划。

HoraeDB 基于 sqlparser 提供了一种 SQL 方言,为了更好的适配时序数据,引入一些概念,包括 Tag 和 Timestamp。此外,利用 DataFusion,HoraeDB 不仅可以生成常规的逻辑计划,还可以生成自定义的计划来实现时序场景要求的特殊算子,例如为了适配 PromQL 协议而做的工作就是利用了这个特性。

Interpreter

模块路径:https://github.com/apache/incubator-horaedb/tree/main/interpreters

Interpreter 模块封装了 SQL 的 CRUD 操作。在查询流程中,一个 SQL 语句会经过解析,生成出对应的查询计划,然后便会在特定的解释器中执行,例如 SelectInterpreterInsertInterpreter 等。

Catalog

模块路径:https://github.com/apache/incubator-horaedb/tree/main/catalog_impls

Catalog 实际上是管理元数据的模块,HoraeDB 采用的元数据分级与 PostgreSQL 类似:Catalog > Schema > Table,但目前它们只用作命名空间。

目前,CatalogSchema 在单机模式和分布式模式存在两种不同实现,因为一些生成 id 和持久化元数据的策略在这两种模式下有所不同。

查询引擎

模块路径:https://github.com/apache/incubator-horaedb/tree/main/query_engine

查询引擎负责优化和执行由 SQL 层解析出来的 SQL 计划,目前查询引擎实际上基于 DataFusion 来实现的。

除了 SQL 的基本功能外,HoraeDB 还通过利用 DataFusion 提供的扩展接口,为某些特定的查询(比如 PromQL)构建了一些定制的查询协议和优化规则。

Pluggable Table Engine

模块路径:https://github.com/apache/incubator-horaedb/tree/main/table_engine

Table Engine 是 HoraeDB 中用于管理表的存储引擎,其可插拔性是 HoraeDB 的一个核心设计,对于实现我们的一些长远目标(比如增加 Log 或 Tracing 类型数据的存储引擎)至关重要。HoraeDB 将会有多种 Table Engine 用于不同的工作负载,根据工作负载模式,应该选择最合适的存储引擎。

现在对 Table Engine 的要求是:

  • 管理引擎下的所有共享资源:
    • 内存
    • 存储
    • CPU
  • 管理表的元数据,如表的结构、表的参数选项;
  • 能够提供 Table 实例,该实例可以提供 readwrite 的能力;
  • 负责 Table 实例的创建、打开、删除和关闭;
  • ….

实际上,Table Engine 需要处理的事情有点复杂。现在在 HoraeDB 中,只提供了一个名为 AnalyticTable Engine,它在处理分析工作负载方面做得很好,但是在时序工作负载上还有很大的进步空间(我们计划通过添加一些帮助处理时序工作负载的索引来提高性能)。

以下部分描述了 Analytic Table Engine 的详细信息。

WAL

模块路径:https://github.com/apache/incubator-horaedb/tree/main/wal

HoraeDB 处理数据的模型是 WAL + MemTable,最近写入的数据首先被写入 WAL,然后写入 MemTable,在 MemTable 中累积了一定数量的数据后,该数据将以便于查询的形式被重新构建,并存储到持久化设备上。

目前,为 standalone 模式和分布式模式提供了三种 WAL 实现:

  • 对于 standalone 模式,WAL 基于 RocksDB,数据存储在本地磁盘上。
  • 对于分布式模式,需要 WAL 作为一个分布式组件,负责新写入数据的可靠性,因此,我们现在提供了基于 OceanBase 的实现。
  • 对于分布式模式,除了 OceanBase,我们还提供了一个更轻量级的基于 Apache Kafka 实现。

MemTable

模块路径:https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/memtable

由于 WAL 无法提供高效的查询,因此新写入的数据会存储一份到 Memtable 用于查询,并且在积累了一定数量后,HoraeDB 将 MemTable 中的数据组织成便于查询的存储格式(SST)并存储到持久化设备中。

MemTable 的当前实现基于 agatedb 的 skiplist。它允许并发读取和写入,并且可以根据 Arena 控制内存使用。

Flush

模块路径:https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/instance/flush_compaction.rs

MemTable 的内存使用量达到阈值时,Flush 操作会选择一些老的 MemTable,将其中的数据组织成便于查询的 SST 存储到持久化设备上。

在刷新过程中,数据将按照一定的时间段(由表选项 Segment Duration 配置)进行划分,保证任何一个 SST 的所有数据的时间戳都属于同一个 Segment。实际上,这也是大多数时序数据库中常见的操作,按照时间维度组织数据,以加速后续的时间相关操作,如查询一段时间内的数据,清除超出 TTL 的数据等。

Compaction

模块路径:https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/compaction

MemTable 的数据被刷新为 SST 文件,但最近刷新的 SST 文件可能非常小,而过小或过多的 SST 文件会导致查询性能不佳,因此,引入 Compaction 来重新整理 SST 文件,使多个较小的 SST 文件可以合并成较大的 SST 文件。

Manifest

模块路径:https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/meta

Manifest 负责管理每个表的元数据,包括:

  • 表的结构和表的参数选项;
  • 最新 Flush 过的 sequence number;
  • 表的所有 SST 文件的信息。

现在 Manifest 是基于 WALObject Store 来实现的,新的改动会直接写入到 WAL,而为了避免元数据无限增长(实际上每次 Flush 操作都会触发更新),会对其写入的记录做快照,生成的快照会被持久化道 Object Store

Object Storage

模块路径:https://github.com/apache/incubator-horaedb/tree/main/components/object_store

Flush 操作产生的 SST 文件需要持久化存储,而用于抽象持久化存储设备的就是 Object Storage,其中包括多种实现:

HoraeDB 的分布式架构的一个核心特性就是存储和计算分离,因此要求 Object Storage 是一个高可用的服务,并独立于 HoraeDB。因此,像Amazon S3阿里云 OSS等存储系统是不错的选择,未来还将计划实现在其他云服务提供商的存储系统上。

SST

模块路径:https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/sst

SST 本身实际上是一种抽象,可以有多种具体实现。目前的实现是基于 Parquet,它是一种面向列的数据文件格式,旨在实现高效的数据存储和检索。

SST 的格式对于数据检索非常关键,也是决定查询性能的关键所在。目前,我们基于 ParquetSST 实现在处理分析型数据时表现良好,但目前在处理时序型数据上还有较高的提升空间。在我们的路线图中,我们将探索更多的存储格式,以便在两种类型的数据处理上都取得良好的性能。

Space

模块路径:https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/space.rs

Analytic Engine 中,有一个叫做 space 的概念,这里着重解释一下,以解决阅读源代码时出现的一些歧义。 实际上,Analytic Engine 没有 catalogschema 的概念,只提供两个层级的关系:spacetable。在实现中,上层的 schema id(要求在所有的 catalogs 中都应该是唯一的)实际上会直接映射成 space id

Analytic Engine 中的 space 主要用于隔离不同租户的资源,如内存的使用。

Critical Path

简要介绍了 HoraeDB 的一些重要模块后,我们将对代码中的一些关键路径进行描述,希望为有兴趣的开发人员在阅读代码时提供一些帮助。

Query

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─10──│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   9│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
                           │    ▲
                          5│   8│
                           │    │
                           ▼    │
                   ┌──────────────────┐
                   │                  │
                   │   Query Engine   │
                   │                  │
                   └──────────────────┘
                           │    ▲
                          6│   7│
                           │    │
                           ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

SELECT SQL 为例,上图展示了查询过程,其中的数字表示模块之间调用的顺序。

以下是详细流程:

  • Server 模块根据请求使用的协议选择合适的 rpc 模块(可能是 HTTP、gRPC 或 mysql)来处理请求;
  • 使用 parser 解析请求中的 sql ;
  • 根据解析好的 sql 以及 catalog/schema 提供的元信息,通过 DataFusion 可以生成逻辑计划;
  • 根据逻辑计划创建相应的 Interpreter,并由其执行逻辑计划;
  • 对于正常 SELECT SQL 的逻辑计划,它将通过 SelectInterpreter 执行;
  • SelectInterpreter 中,特定的查询逻辑由 Query Engine 执行:
    • 优化逻辑计划;
    • 生成物理计划;
    • 优化物理计划;
    • 执行物理计划;
  • 执行物理计划涉及到 Analytic Engine
    • 通过 Analytic Engine 提供的 Table 实例的 read 方法获取数据;
    • 表数据的来源是 SSTMemtable,可以通过谓词下推进行提前过滤;
    • 在检索到表数据后,Query Engine 将完成具体计算并生成最终结果;
  • SelectInterpreter 获取结果并将其传输给 Protocol 模块;
  • 协议模块完成转换结果后,Server 模块将其响应给客户端。

以下是v1.2.2的函数调用流程:

                                                       ┌───────────────────────◀─────────────┐    ┌───────────────────────┐
                                                       │      handle_sql       │────────┐    │    │       parse_sql       │
                                                       └───────────────────────┘        │    │    └────────────────┬──────┘
                                                           │             ▲              │    │           ▲         │
                                                           │             │              │    │           │         │
                                                           │             │              │    └36───┐     │        11
                                                          1│             │              │          │     │         │
                                                           │            8│              │          │     │         │
                                                           │             │              │          │    10         │
                                                           │             │              │          │     │         │
                                                           ▼             │              │          │     │         ▼
                                                       ┌─────────────────┴─────┐       9│         ┌┴─────┴────────────────┐───────12─────────▶┌───────────────────────┐
                                                       │maybe_forward_sql_query│        └────────▶│fetch_sql_query_output │                   │   statement_to_plan   │
                                                       └───┬───────────────────┘                  └────┬──────────────────┘◀───────19─────────└───────────────────────┘
                                                           │             ▲                             │              ▲                           │               ▲
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │             35                          13              18
                                                          2│            7│                            20              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           ▼               │
                                                           ▼             │                             ▼              │                       ┌───────────────────────┐
          ┌───────────────────────┐───────────6───────▶┌─────────────────┴─────┐                    ┌─────────────────┴─────┐                 │Planner::statement_to_p│
          │ forward_with_endpoint │                    │        forward        │                    │execute_plan_involving_│                 │          lan          │
          └───────────────────────┘◀────────5──────────└───┬───────────────────┘                 ┌──│    partition_table    │◀────────┐       └───┬───────────────────┘
                                                           │             ▲                       │  └───────────────────────┘         │           │              ▲
                                                           │             │                       │     │              ▲               │           │              │
                                                           │             │                       │     │              │               │          14             17
           ┌───────────────────────┐                       │            4│                       │     │              │               │           │              │
     ┌─────│ PhysicalPlan::execute │                      3│             │                       │    21              │               │           │              │
     │     └───────────────────────┘◀──┐                   │             │                       │     │             22               │           │              │
     │                                 │                   │             │                       │     │              │               │           ▼              │
     │                                 │                   │             │                       │     │              │               │       ┌────────────────────────┐
     │                                 │                   ▼             │                       │     ▼              │              34       │sql_statement_to_datafus│
     │     ┌───────────────────────┐  30               ┌─────────────────┴─────┐                 │  ┌─────────────────┴─────┐         │       │        ion_plan        │
    31     │ build_df_session_ctx  │   │               │         route         │                 │  │   build_interpreter   │         │       └────────────────────────┘
     │     └────┬──────────────────┘   │               └───────────────────────┘                 │  └───────────────────────┘         │           │              ▲
     │          │           ▲          │                                                         │                                    │           │              │
     │         27          26          │                                                        23                                    │          15             16
     │          ▼           │          │                                                         │                                    │           │              │
     └────▶┌────────────────┴──────┐   │               ┌───────────────────────┐                 │                                    │           │              │
           │ execute_logical_plan  ├───┴────32────────▶│       execute         │──────────┐      │   ┌───────────────────────┐        │           ▼              │
           └────┬──────────────────┘◀────────────25────┴───────────────────────┘         33      │   │interpreter_execute_pla│        │       ┌────────────────────────┐
                │           ▲                                           ▲                 └──────┴──▶│           n           │────────┘       │SqlToRel::sql_statement_│
               28           │                                           └──────────24────────────────┴───────────────────────┘                │   to_datafusion_plan   │
                │          29                                                                                                                 └────────────────────────┘
                ▼           │
           ┌────────────────┴──────┐
           │     optimize_plan     │
           └───────────────────────┘
  1. 收到请求经过各种协议转换会转到handle_sql中执行,由于该请求可能是非本节点处理的,可能需要转发,进入maybe_forward_sql_query处理转发逻辑。
  2. maybe_forward_sql_query中构造好ForwardRequest后,调用forward
  3. forward中构造好RouteRequest,后调用route
  4. 使用route获取目的节点endpoint后回到forward
  5. 调用forward_with_endpoint将请求进行转发
  6. 回到forward
  7. 回到maybe_forward_sql_query
  8. 回到handle_sql
  9. 此时若是Local请求,调用fetch_sql_query_output进行处理
  10. 调用parse_sqlsql解析成Statment
  11. 回到fetch_sql_query_output
  12. 使用Statment调用statement_to_plan
  13. 在其中使用ctxStatment构造Planner,调用Plannerstatement_to_plan方法
  14. planner中会对于请求的类别调用对应的planner方法,此时我们的sql是查询,会调用sql_statement_to_plan
  15. 调用sql_statement_to_datafusion_plan,其中会生成datafusion的对象,然后调用SqlToRel::sql_statement_to_plan
  16. SqlToRel::sql_statement_to_plan中会返回生成的逻辑计划
  17. 返回
  18. 返回
  19. 返回
  20. 调用execute_plan_involving_partition_table(使用默认配置情况下)进行该逻辑计划的后续优化和执行
  21. 调用build_interpreter生成Interpreter
  22. 返回
  23. 调用Interpreterinterpreter_execute_plan方法进行逻辑计划的执行。
  24. 调用对应执行函数,此时sql是查询,所以会调用SelectInterpreterexecute
  25. 调用execute_logical_plan,其中会调用build_df_session_ctx生成优化器
  26. build_df_session_ctx中会使用config信息生成对应上下文,首先使用datafusion和自定义的一些优化规则(在logical_optimize_rules()中)生成逻辑计划优化器, 使用 apply_adapters_for_physical_optimize_rules生成物理计划优化器
  27. 将优化器返回
  28. 调用optimize_plan,使用刚刚生成的优化器首先进行逻辑计划的优化随后进行物理计划的优化
  29. 返回优化后的物理计划
  30. 执行物理计划
  31. 执行后返回
  32. 收集所有分片的结果后,返回
  33. 返回
  34. 返回
  35. 返回
  36. 返回给上层进行网络协议转化,最后返回给请求发送方

Write

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─8───│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   7│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
      │    ▲
      │    │
      │    │
      │    │
      │    │       ┌──────────────────┐
      │    │       │                  │
     5│   6│       │   Query Engine   │
      │    │       │                  │
      │    │       └──────────────────┘
      │    │
      │    │
      │    │
      ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

INSERT SQL 为例,上图展示了查询过程,其中的数字表示模块之间调用的顺序。

以下是详细流程:

  • Server 模块根据请求使用的协议选择合适的 rpc 模块(可能是 HTTP、gRPC 或 mysql)来处理请求;
  • 使用 parser 解析请求中的 sql;
  • 根据解析好的 sql 以及 catalog/schema 提供的元信息,通过 DataFusion 可以生成逻辑计划;
  • 根据逻辑计划创建相应的 Interpreter ,并由其执行逻辑计划;
  • 对于正常 INSERT SQL 的逻辑计划,它将通过 InsertInterpreter 执行;
  • InsertInterpreter 中,调用 Analytic Engine 提供的 Tablewrite 方法:
    • 首先将数据写入 WAL
    • 然后写入 MemTable
  • 在写入 MemTable 之前,会检查内存使用情况。如果内存使用量过高,则会触发 Flush
    • 将一些旧的 MemTable 持久化为 SST
    • 将新的 SST 信息记录到 Manifest
    • 记录最新 Flush 的 WAL 序列号;
    • 删除相应的 WAL 日志;
  • Server 模块将执行结果响应给客户端。

2 - 集群模式

注意:文章中提到的部分特性暂时还未实现。

整体架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│                           HoraeMeta Cluster                           │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘
                              ▲               ▲                 ▲
                              │               │                 │
                              │               │                 │
                              ▼               ▼                 ▼
┌───────┐Route Info ┌HoraeDB─────┬┬─┐ ┌HoraeDB─────┬┬─┐ ┌HoraeDB─────┬┬─┐
│client │◀────────▶ │  │  │TableN││ │ │  │  │TableN││ │ │  │  │TableN││ │
└───────┘Write/Query└──Shard(L)──┴┴─┘ └──Shard(F)──┴┴─┘ └──Shard(F)──┴┴─┘
                            │ │                 ▲               ▲
                              │                 │               │
                            │ Write─────────┐   ├────Sync───────┘
                                            │   │
                            │     ┌────────┬▼───┴────┬──────────────────┐
                       Upload SST │        │         │                  │
                            │     │WAL     │Region N │                  │
                                  │Service │         │                  │
                            │     └────────┴─────────┴──────────────────┘

┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│                            Object Storage                             │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

上面给出来 HoraeDB 集群化方案的整体架构图,对于其中的一些名词做出解释:

  • HoraeMeta Cluster:集群的元数据中心,负责集群的整体调度;
  • Shard(L)/Shard(F): Leader Shard 和 Follower Shard,每一个 Shard 由多张 Table 组成;
  • HoraeDB:一个 HoraeDB 实例, 包含多个 Shard;
  • WAL Service:WAL 服务,在集群方案中,用于存储实时写入的数据;
  • Object Storage:对象存储服务,用于存储从 memtable 生成的 SST 文件;

根据架构图,读者应该对于集群化方案有一个初步的认知 —— 存储计算分离。正因如此,HoraeDB 实例本身不存储任何数据,从而使得计算存储弹性扩缩容、服务高可用和负载均衡等等有用的分布式特性比较容易实现。

Shard

Shard 是一个重要的概念,是集群调度的基本单元,一个 Shard 包含多张表。

这里值得说明的是,集群调度的基本单元并非是 Table,而是采用了 Shard,主要原因在于考虑到 HoraeDB 集群如果需要支持至少百万级别的 Table 的话,而部分组件,直接处理起这个量级的 Table 可能会存在一些性能问题,例如对于 WAL 服务,如果按照 Table 去单独管理数据的话,一个重要的性能问题就在于重启后恢复新写入数据的时候,按照 Table 级别去做数据恢复的话,将会非常耗时,但按照 Shard 去管理数据的话,这个情况会改善很多,因为具体的 WAL 服务实现可以按照 Shard 这个信息,将 Table 的数据进行合理组织,从而获得比较好的数据局部性,在重启恢复的时候,就可以通过 Shard 来进行快速的数据恢复。

此外 Shard 具有 Leader 和 Follower 两种 Role,也就是图中的 Shard(L)Shard(F),Leader 负责读写,Follower 只有读权限,Follower 的引入其实是为了解决 HA 的问题,保证 Leader 在 crash 了之后,能快速的恢复相关表的读写服务,为了做到这一点,Follower 需要从 WAL 不停地同步最新的数据(由 Leader 写入)。

一个 HoraeDB 实例会拥有多个 Shard,这些 Shard 可以是 Leader 也是 Follower,以交织的方式存在:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
┌─HoraeDB Instance0──────┐     ┌─HoraeDB Instance1──────┐
│  ┌─Shard0(L)────────┐  │     │  ┌─Shard0(F)────────┐  │
│  │ ┌────┬────┬────┐ │  │     │  │ ┌────┬────┬────┐ │  │
│  │ │ T0 │ T1 │ T2 │ │  │     │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │  │     │  │ └────┴────┴────┘ │  │
│  └──────────────────┘  │     │  └──────────────────┘  │
│                        │     │                        │
│  ┌─Shard1(F)────────┐  │     │  ┌─Shard1(L)────────┐  │
│  │ ┌────┬────┬────┐ │  │     │  │ ┌────┬────┬────┐ │  │
│  │ │ T0 │ T1 │ T2 │ │  │     │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │  │     │  │ └────┴────┴────┘ │  │
│  └──────────────────┘  │     │  └──────────────────┘  │
└────────────────────────┘     └────────────────────────┘

上面提到集群的基本调度单元是 Shard,因此对于 Shard 存在这一些基本操作,通过这些操作可以完成一些负责的调度逻辑:

  • 向 Shard 新增/删除 Table:在建表/删表的时候使用;
  • Shard 的打开/关闭:可以用来将一个 Shard 迁移到另外一个 HoraeDB 实例上面;
  • Shard 分裂/合并:可以用来完成扩容和缩容;
  • Shard 角色切换:将一个 Shard 从 Leader 切换成 Follower,或者从 Follower 切换成 Leader;

HoraeMeta

HoraeMeta 主要负责集群的元数据管理和集群调度,其实现是通过内置一个 ETCD 来保证分布式一致性的。

元数据主要都是围绕 Table 来构建的,包括一些建表信息(如 Table ID),Table 所属集群(HoraeMeta 是可以支持多个 HoraeDB 集群的),Table 与 Shard 的映射关系等等。

HoraeMeta 的主要工作还是负责集群的调度,需要完成:

  • 接收来自 HoraeDB 实例的心跳,根据心跳来检测 HoraeDB 实例的存活状态;
  • 负责 Shard 的具体分配以及调度,会将 Shard 按照一定的算法(负载均衡)分配到具体的实例上面去;
  • 参与 Table 的创建,将 Table 分配到合适的 Shard(也就会分配到具体的实例);
  • 如果有新的 HoraeDB 实例加入到集群中,可以进行扩容操作;
  • 如果有检测到 HoraeDB 实例的下线状况,可以完成自动的故障恢复;

路由

为了避免转发请求的开销,客户端与 HoraeDB 实例之间的通信是点对点的,也就是说,客户端在发送任何特定的写入/查询请求之前,应该从服务器获取路由信息。

实际上,路由信息是由 HoraeMeta 决定的,但是客户端只允许通过 HoraeDB 实例而不是 HoraeMeta 来访问它,以避免由 HoraeMeta 引起的潜在性能瓶颈。

WAL Service & Object Storage

在集群方案中,WAL ServiceObject Storage 都是作为独立的服务而存在的,并且是具备容灾能力的分布式系统。目前 WAL Service 的实现,在集群化方案中主要包括两种实现:KafkaOBKV(通过其提供的 Table API 访问 OceanBase);而 Object Storage 的实现比较多,基本覆盖了主流的对象存储服务。

这两个系统的相似之处在于,它们是作为计算存储分离架构中的的存储层;而它们的区别也很明显,WAL Service 具备较高的实时写入性能(高吞吐、低延时),负责实时的新增数据写入,而 Object Storage 具备低成本的存储代价和较为高效的查询吞吐,负责后台整理好的、长期存储的数据读写。

这两个服务组件的引入,使得 HoraeDB 集群的水平扩容、服务高可用、负载均衡提供了实现基础。

水平扩容

集群的扩展能力是评价一个集群方案的重要指标,下面从存储和计算两方面来介绍一下 HoraeDB 集群的水平扩容能力是如何实现的。

存储

很明显,在选择 WAL Service 和 Object Storage 两个底层服务实现的时候,水平扩容能力是必须的,存储容量出现问题的时候,可以单独地进行存储服务的扩容。

计算

计算能力的水平扩容可能会复杂一下,考虑如下几个容量问题:

  • 大量查询大量的表;
  • 大量查询同一张数据量大的表;
  • 大量查询同一张正常的表;

对于第一种情况,通过 Shard 分裂,将分裂出来的新 Shard 迁移到新扩容的 HoraeDB 实例,这样表就会分散到新的实例上。

对于第二种情况,可以通过表分区的功能,将该表分成多个子表,从而达到水平扩容的效果;

第三种情况是最重要也是最难处理的。事实上,Follower Shard 可以为 Leader Shard 分担部分查询请求,但是由于 Follower Shard 需要同步数据,它的数量是受到 WAL 同步能力的限制的。实际上,从下图可以看到,对于这样的场景(当 Follower Shard 不足以承担住访问量的时候),可以考虑在集群中添加一种纯计算节点,其中的 Shard 不参与实时的数据同步,当这个实例被访问的时候,新写入的实时数据可以从 Leader Shard 或者 Follower Shard 恢复出来,历史的数据可以直接从 Object Storage 获取并缓存,这样的话,就可以在一定程度上(实际上还是存在单表的实时数据拉取的瓶颈,但是考虑到这部分数据并非特别多,理论上这个瓶颈应该比较难以达到),达到水平扩容的效果。

1
2
3
4
5
6
7
8
9
                                             ┌HoraeDB─────┬┬─┐
                            ┌──newly written─│  │  │TableN││ │
                            ▼                └──Shard(L/F)┴┴─┘
┌───────┐  Query  ┌HoraeDB─────┬┬─┐
│client │────────▶│  │  │TableN││ │
└───────┘         └──Shard─────┴┴─┘          ┌───────────────┐
                            ▲                │    Object     │
                            └───old SST──────│    Storage    │
                                             └───────────────┘

High Availability

通过上面的介绍,HoraeDB 的高可用方案,其实比较自然了。考虑某台 HoraeDB 实例 Crash 了,后续的服务恢复步骤整体如下:

  • HoraeMeta 通过发现心跳断了,判断该 HoraeDB 实例下线;
  • HoraeMeta 选择将该实例上的所有 Leader Shard 所对应的 Follower Shard,切换成 Leader Shard,从而完成快速恢复;
  • 如果没有对应的 Follower Shard 存在,那么需要执行 Open Shard 操作,不过这样的恢复是较慢的;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────────────────────┐
│                                                         │
│                    HoraeMeta Cluster                    │
│                                                         │
└─────────────────────────────────────────────────────────┘
             ┌ ─ ─Broken ─ ─ ┤
             │               │
┌ HoraeDB Instance0 ─ ─ ─    │   ┌─HoraeDB Instance1──────┐                   ┌─HoraeDB Instance1──────┐
   ┌─Shard0(L)────────┐  │   │   │  ┌─Shard0(F)────────┐  │                   │  ┌─Shard0(L)────────┐  │
│  │ ┌────┬────┬────┐ │      │   │  │ ┌────┬────┬────┐ │  │                   │  │ ┌────┬────┬────┐ │  │
   │ │ T0 │ T1 │ T2 │ │  │   ├───│  │ │ T0 │ T1 │ T2 │ │  │                   │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │      │   │  │ └────┴────┴────┘ │  │                   │  │ └────┴────┴────┘ │  │
   └──────────────────┘  │   │   │  └──────────────────┘  │     Failover      │  └──────────────────┘  │
│                            │   ├─HoraeDB Instance2──────┤   ───────────▶    ├─HoraeDB Instance2──────┤
   ┌─Shard1(L)────────┐  │   │   │  ┌─Shard1(F)────────┐  │                   │  ┌─Shard1(L)────────┐  │
│  │ ┌────┬────┬────┐ │      │   │  │ ┌────┬────┬────┐ │  │                   │  │ ┌────┬────┬────┐ │  │
   │ │ T0 │ T1 │ T2 │ │  │   └───│  │ │ T0 │ T1 │ T2 │ │  │                   │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │          │  │ └────┴────┴────┘ │  │                   │  │ └────┴────┴────┘ │  │
   └──────────────────┘  │       │  └──────────────────┘  │                   │  └──────────────────┘  │
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─        └────────────────────────┘                   └────────────────────────┘

Load Balancing

HoraeDB 上传的心跳信息会带上机器的负载信息,HoraeMeta 会根据这些信息,完成自动的负载均衡的调度工作,主要包括:

  • 新建表的时候,为该表挑选一个负载低的实例上面的 Shard;
  • 将负载高的实例上的 Shard 迁移到负载低的实例;
  • 更好地,可以根据 Shard 的负载,通过 Shard 的分裂和合并,来完成更细粒度的负载均衡;

3 - 存储引擎

存储引擎主要提供以下两个功能:

  1. 数据的持久化
  2. 在保证数据正确性的前提下,用最合理的方式来组织数据,来满足不同场景的查询需求

本篇文档就来介绍 HoraeDB 中存储引擎的内部实现,读者可以参考这里面的内容,来探索如何高效使用 HoraeDB。

整体架构

HoraeDB 是一种基于 share-nothing 架构的分布式存储系统,不同服务器之间的数据相互隔离,互不影响。每一个单机中的存储引擎是 LSM(Log-structured merge-tree)的一个变种,针对时序场景做了优化,下图展示了其主要组件的运作方式:

Write Ahead Log (WAL)

一次写入请求的数据会写到两个部分:

  1. 内存中的 memtable
  2. 可持久化的 WAL

由于 memtable 不是实时持久化到底层存储系统,因此需要用 WAL 来保证 memtable 中数据的可靠性。

另一方面,由于分布式架构的设计,要求 WAL 本身是高可用的,现在 HoraeDB 中,主要有以下几种实现:

Memtable

Memtable 是一个内存的数据结构,用来保存最近写入的数据。一个表对应一个 memtable。

Memtable 默认是可读写的(称为 active),当写入达到一起阈值时,会变成只读的并且被一个新的 memtable 替换掉。只读的 memtable 会被后台线程以 SST 的形式写入到底层存储系统中,写入完成后,只读的 memtable 就可以被销毁,同时 WAL 中也可以删除对应部分的数据。

Sorted String Table(SST)

SST 是数据的持久化格式,按照表主键的顺序存放,目前 HoraeDB 采用 parquet 格式来存储。

对于 HoraeDB 来说,SST 有一个重要特性: segment_duration,只有同一个 segment 内的 SST 才有可能进行合并操作。而且有了 segment,也方便淘汰过期的数据。

除了存放原始数据外,SST 内也会存储数据的统计信息来加速查询,比如:最大值、最小值等。

Compactor

Compactor 可以把多个小 SST 文件合并成一个,用于解决小文件数过多的问题。此外,Compactor 也会在合并时进行过期数据的删除,重复数据的去重。目前 HoraeDB 中的合并策略参考自 Cassandra,主要有两个:

Manifest

Manifest 记录表、SST 文件元信息,比如:一个 SST 内数据的最小、最大时间戳。

由于分布式架构的设计,要求 Manifest 本身是高可用的,现在 HoraeDB 中,主要有以下几种实现:

  • WAL
  • ObjectStore

ObjectStore

ObjectStore 是数据(即 SST)持久化的地方,一般来说各大云厂商均有对应服务,像阿里云的 OSS,AWS 的 S3。

4 - Shared Nothing 架构

背景

集群 文章中介绍了 HoraeDB 的集群方案,简单总结一下就是:

  • 计算存储分离;
  • 由中心化的元数据中心,管理整个集群;

然而计算存储分离的架构下,有个重要的问题在于,在集群调度的过程中,如何保证在共享的存储层中的数据不会因为不同的计算节点访问导致数据损坏,一个简单的例子就是如果同一块数据块被多个计算节点同时更新,可能就会出现数据损坏。

而 HoraeDB 的解决方案是通过特定的机制,在共享存储的情况下达到了类似 Shared-Nothing 架构 的效果,也就是说存储层的数据经过一定规则的划分,可以保证在任何时刻最多只有一个 HoraeDB 实例可以对其进行更新,本文中,将这个特性定义成集群拓扑的正确性,如果这个正确性得到保证的话,那么数据就不会因为集群的灵活调度而受到损坏。

本文对于 Shared Nothing 架构的优劣不做赘述,主要分享一下,HoraeDB 集群方案是如何在计算存储分离的方案下,达到 Shared Nothing 的效果(即如何保证 集群拓扑的正确性)。

数据划分

为了达到 Shared Nothing 的效果,首先需要将数据在共享的存储层上面进行好逻辑和物理的划分。在 此前的集群介绍文章 中介绍了 Shard 的基本作用,作为集群的基本调度单元,同时也是数据分布的基本划分单元,不同的 Shard 在存储层对应的数据是隔离的:

  • 在 WAL 中,写入的 Table 数据会按照 Shard 组织起来,按照 Shard 写入到 WAL 的不同区域中,不同的 Shard 在 WAL 中的数据是隔离开的;
  • 在 Object Storage 中,数据的管理是按照 Table 来划分的,而 Shard 和 Table 之间的关系是一对多的关系,也就说,任何一个 Table 只属于一个 Shard,因此在 Object Storage 中,Shard 之间的数据也是隔离的;

Shard Lock

在数据划分好之后,需要保证的就是在任何时刻,同一时刻最多只有一个 HoraeDB 实例能够更新 Shard 的数据。那么要如何保证这一点的呢?很自然地,通过锁可以达到互斥的效果,不过在分布式集群中,我们需要的是分布式锁。通过分布式锁,每一个 Shard 被分配给 HoraeDB 实例时,HoraeDB 必须先获取到相应的 Shard Lock,才能完成 Shard 的打开操作,对应地,当 Shard 关闭后,HoraeDB 实例也需要主动释放 Shard Lock。

HoraeDB 集群的元数据服务 HoraeMeta 是基于 ETCD 构建的,而基于 ETCD 实现分布式的 Shard Lock 是非常方便的,因此我们选择基于现有的 ETCD 实现 Shard Lock,具体逻辑如下:

  • 以 Shard ID 作为 ETCD 的 Key,获取到 Shard Lock 等价于创建出这个 Key;
  • 对应的 Value,可以把 HoraeDB 的地址编码进去(用于 HoraeMeta 调度);
  • Shard Lock 获取到了之后,HoraeDB 实例需要通过 ETCD 提供的接口对其进行续租,保证 Shard Lock 不会被释放;

HoraeMeta 暴露了 ETCD 的服务提供给 HoraeDB 集群来构建 Shard Lock,下图展示了 Shard Lock 的工作流程,图中的两个 HoraeDB 实例都尝试打开 Shard 1,但是由于 Shard Lock 的存在,最终只有一个 HoraeDB 实例可以完成 Shard 1 的打开:

             ┌────────────────────┐
             │                    │
             │                    │
             ├───────┐            │
   ┌─────┬──▶│ ETCD  │            │
   │     │   └───────┴────HoraeMeta
   │     │       ▲
   │     │       └──────┬─────┐
   │     │          Rejected  │
   │     │              │     │
┌─────┬─────┐        ┌─────┬─────┐
│Shard│Shard│        │Shard│Shard│
│  0  │  1  │        │  1  │  2  │
├─────┴─────┤        ├─────┴─────┤
└─────HoraeDB        └─────HoraeDB

其他方案

Shard Lock 的方案本质上是 HoraeDB 通过 ETCD 来保证在集群中对于任何一个 Shard 在任何时刻最多只有一个 HoraeDB 实例可以对其进行更新操作,也就是保证了在任何时刻 集群拓扑的正确性,需要注意,这个保证实际上成为了 HoraeDB 实例提供的能力(虽然是利用 ETCD 来实现的),而 HoraeMeta 无需保证这一点,下面的对比中,这一点是一个非常大的优势。

除此 Shard Lock 的方案,我们还考虑过这样的两种方案:

HoraeMeta 状态同步

HoraeMeta 规划并存储集群的拓扑状态,保证其正确性,并将这个正确的拓扑状态同步到 HoraeDB,而 HoraeDB 本身无权决定 Shard 是否可以打开,只有在得到 HoraeMeta 的通知后,才能打开指定的 Shard。此外,HoraeDB 需要不停地向 HoraeMeta 发送心跳,一方面汇报自身的负载信息,另一方面让 HoraeMeta 知道该节点仍然在线,用以计算最新的正确的拓扑状态。

该方案也是 HoraeDB 一开始采用的方案,该方案的思路简洁,但是在实现过程中却是很难做好的,其难点在于,HoraeMeta 在执行调度的时候,需要基于最新的拓扑状态,决策出一个新的变更,并且应用到 HoraeDB 集群,但是这个变更到达某个具体的 HoraeDB 实例时,即将产生效果的时候,该方案无法简单地保证此刻的集群状态仍然是和做出该变更决策时基于的那个集群状态是一致的。

让我们用更精确的语言描述一下:

t0: 集群状态是 S0,HoraeMeta 据此计算出变更 U;
t1: HoraeMeta 将 U 发送到某个 HoraeDB 实例,让其执行变更;
t2: 集群状态变成 S1;
t3: HoraeDB 接收到 U,准备进行变更;

上述的例子的问题在于,t3 时刻,HoraeDB 执行变更 U 是否正确呢?执行这个变更 U,是否会让数据遭到损坏?这个正确性,需要 HoraeMeta 完成相当复杂的逻辑来保证即使在集群状态为 S1 的情况下,执行 U 变更也不会出现问题,除此之外,状态的回滚也是一个非常麻烦的过程。

举一个例子,就可以发现这个方案的处理比较麻烦:

t0: HoraeMeta 尝试在 HoraeDB0 打开 Shard0,然而 HoraeDB0 打开 Shard0 遇到一些问题,Hang 住了,HoraeMeta 只能在超时之后,认为打开失败;
t1: HoraeMeta 计算出新的拓扑结构,尝试在 HoraeDB1 继续打开 Shard0;
t2: HoraeDB0 和 HoraeDB1 可能会同时打开 Shard0;

自然是有一些办法来避免掉 t2 时刻的事情,比如在 t0 时刻失败了之后,需要等待一个心跳周期,来获知 HoraeDB0 是否仍然在尝试打开 Shard0,来避免 t1 时刻发出的命令,但是这样的逻辑比较繁琐,难以维护。

对比 Shard Lock 的方案,可以发现,该方案尝试获得更强的一致性,即尝试保证集群的拓扑状态在任何时刻都需要和 HoraeMeta 中的集群状态保持一致,显然,这样的一致性肯定是能够保证集群拓扑的正确性的,但也正因为如此实现起来才会更加复杂,而基于 Shard Lock 的方案放弃了这样一个已知的、正确的集群拓扑状态,转而只需要保证集群状态是正确的即可,而不需要知道这个状态究竟是什么。更重要的是,从另外一个角度来看,保证集群拓扑的正确性这部分逻辑和集群的调度完成了解耦,因此 HoraeMeta 的逻辑大大简化,只需要专注于完成负载均衡的集群调度工作即可,而集群拓扑的正确性由 HoraeDB 本身来保证。

HoraeDB 提供一致性协议

该方案是参考 TiDB 的元数据服务 PD 的,PD 管理着所有的 TiKV 数据节点,但是 PD 不需要维护一致性的集群状态,并且应用到 TiKV 节点上面去,因为 TiKV 集群中,每一个 Raft Group,都能够达到一致性,也就是说 TiKV 无需借助 PD,本身就具备了让整个集群拓扑正确的能力(一个 Raft Group 不会出现两个 Leader)。

参考该方案,实际上我们也可以在 HoraeDB 实例之间实现一致性协议,让其本身也具备这样的能力,不过在 HoraeDB 之间引入一致性协议,似乎把事情变得更加复杂了,而且目前也没有更多的数据需要同步,通过外部的服务(ETCD)依然可以同样的效果,从 HoraeMeta 看,就等价于 HoraeDB 本身获得了让集群一致的能力。

因此 Shard Lock 的方案,可以看作是该方案的一个变种,是一种取巧但是很实用的实现。

总结

HoraeDB 分布式方案的最终目标自然不是保证集群拓扑正确就够了,但是保持正确性是后续特性的重要基石,一旦这部分的逻辑简洁明了,有充分的理论保证,就可以让后续的特性实现也同样的优雅、简洁。例如,为了使得 HoraeDB 集群中的各个节点达到负载均衡的效果,HoraeMeta 就必须根据 HoraeDB 实例上报的消息,对集群中的节点进行调度,而调度的单位必然是 Shard,然而任何一次 Shard 的变动都可能造成数据的损坏(一个 Shard 被两个实例同时打开),在有了 Shard Lock 的保证后,HoraeMeta 就可以放心地生成调度计划,根据上报的负载信息,计算出当前状态的最佳调度结果,然后发送到涉及的 HoraeDB 实例让其执行,即使计算的前提可能是错误的(即集群状态已经和计算出调度结果时的状态不一样了),也不用担心集群拓扑的正确性遭到破坏,因而 HoraeMeta 的调度逻辑,变得简洁而优雅(只需要生成、执行,不需要考虑失败处理)。

5 - 分区表

注意:此功能仍在开发中,API 将来可能会发生变化。

本章讨论 PartitionTable

HoraeDB 使用的分区表语法类似于 MySQL

一般的分区表包括Range PartitioningList PartitoningHash PartitioningKey Partititioning

HoraeDB 目前仅支持 Key Partitioning。

设计

与 MySQL 类似,分区表的不同部分作为单独的表存储在不同的位置。

目前设计,一个分区表可以在多个 HoraeDB 节点上打开,支持同时写入和查询,可以水平扩展。

如下图所示,在 node0 和 node1 上打开了PartitionTable,在 node2 和 node3 上打开了存放实际数据的物理子表。

                        ┌───────────────────────┐      ┌───────────────────────┐
                        │Node0                  │      │Node1                  │
                        │   ┌────────────────┐  │      │  ┌────────────────┐   │
                        │   │ PartitionTable │  │      │  │ PartitionTable │   │
                        │   └────────────────┘  │      │  └────────────────┘   │
                        │            │          │      │           │           │
                        └────────────┼──────────┘      └───────────┼───────────┘
                                     │                             │
                                     │                             │
             ┌───────────────────────┼─────────────────────────────┼───────────────────────┐
             │                       │                             │                       │
┌────────────┼───────────────────────┼─────────────┐ ┌─────────────┼───────────────────────┼────────────┐
│Node2       │                       │             │ │Node3        │                       │            │
│            ▼                       ▼             │ │             ▼                       ▼            │
│ ┌─────────────────────┐ ┌─────────────────────┐  │ │  ┌─────────────────────┐ ┌─────────────────────┐ │
│ │                     │ │                     │  │ │  │                     │ │                     │ │
│ │     SubTable_0      │ │     SubTable_1      │  │ │  │     SubTable_2      │ │     SubTable_3      │ │
│ │                     │ │                     │  │ │  │                     │ │                     │ │
│ └─────────────────────┘ └─────────────────────┘  │ │  └─────────────────────┘ └─────────────────────┘ │
│                                                  │ │                                                  │
└──────────────────────────────────────────────────┘ └──────────────────────────────────────────────────┘

Key 分区

Key Partitioning支持一列或多列计算,使用 HoraeDB 内置的 hash 算法进行计算。

使用限制:

  • 仅支持 tag 列作为分区键。
  • 暂时不支持 LINEAR KEY

key 分区的建表语句如下:

1
2
3
4
5
6
7
CREATE TABLE `demo`(
    `name`string TAG,
    `id` int TAG,
    `value` double NOT NULL,
    `t` timestamp NOT NULL,
    TIMESTAMP KEY(t)
    ) PARTITION BY KEY(name) PARTITIONS 2 ENGINE = Analytic

参考 MySQL key partitioning

查询

由于分区表数据实际上是存放在不同的物理表中,所以查询时需要根据查询请求计算出实际请求的物理表。

首先查询会根据查询语句计算出要查询的物理表, 然后通过 HoraeDB 内部服务 remote engine 远程请求物理表所在节点获取数据(支持谓词下推)。

分区表的实现在 PartitionTableImpl 中。

  • 第一步:解析查询 sql,根据查询参数计算出要查询的物理表。
  • 第二步:查询物理表数据。
  • 第三步:用拉取的数据进行计算。
                       │
                     1 │
                       │
                       ▼
               ┌───────────────┐
               │Node0          │
               │               │
               │               │
               └───────────────┘
                       ┬
                2      │       2
        ┌──────────────┴──────────────┐
        │              ▲              │
        │       3      │       3      │
        ▼ ─────────────┴───────────── ▼
┌───────────────┐             ┌───────────────┐
│Node1          │             │Node2          │
│               │             │               │
│               │             │               │
└───────────────┘             └───────────────┘

Key 分区

  • 带有 and, or, in, = 的过滤器将选择特定的子表。
  • 支持模糊匹配过滤器,如 <, >,但可能性能较差,因为它会扫描所有物理表。

Key partitioning 规则实现在 KeyRule

写入

写入过程与查询过程类似。

首先根据分区规则,将写入请求拆分到不同的物理表中,然后通过 remote engine 服务发送到不同的物理节点进行实际的数据写入。

6 - 基于 Kafka 的 WAL

架构

在本节中,将会介绍一种分布式 WAL 实现(基于 Kafka)。表的预写日志(write-ahead logs,以下简称日志)在本实现中是按 region 级别管理的,region 可以简单理解为多个表的共享日志文件。

如下图所示,在本实现中将 region 映射到 Kafka 中的 topic(只有一个 partition)。 通常一个 region 需要两个 topic ,一个用于存储日志,另一个用于存储元数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
                                                 ┌──────────────────────────┐
                                                 │         Kafka            │
                                                 │                          │
                                                 │         ......           │
                                                 │                          │
                                                 │ ┌─────────────────────┐  │
                                                 │ │      Meta Topic     │  │
                                                 │ │                     │  │
                                         Delete  │ │ ┌─────────────────┐ │  │
               ┌──────────────────────┐  ┌───────┼─┼─►    Partition    │ │  │
               │       HoraeDB        │  │       │ │ │                 │ │  │
               │                      │  │       │ │ └─────────────────┘ │  │
               │ ┌──────────────────┐ │  │       │ │                     │  │
               │ │       WAL        │ │  │       │ └─────────────────────┘  │
               │ │      ......      │ │  │       │                          │
               │ │ ┌──────────────┐ │ │  │       │ ┌──────────────────────┐ │
               │ │ │    Region    │ │ │  │       │ │     Data Topic       │ │
               │ │ │              ├─┼─┼──┘       │ │                      │ │
               | | | ┌──────────┐ │ │ │          │ │ ┌──────────────────┐ │ │
               │ │ │ │ Metadata │ │ │ │          │ │ │    Partition     │ │ │
               │ │ │ └──────────┘ │ │ │    Write │ │ │                  │ │ │
Write ─────────┼─┼─►              ├─┼─┼───┐      │ │ │ ┌──┬──┬──┬──┬──┐ │ │ │
               │ │ │ ┌──────────┐ │ │ │   └──────┼─┼─┼─►  │  │  │  │  ├─┼─┼─┼────┐
               │ │ │ │  Client  │ │ │ │          │ │ │ └──┴──┴──┴──┴──┘ │ │ │    │
Read ◄─────────┼─┼─┤ └──────────┘ │ │ │          │ │ │                  │ │ │    │
               │ │ │              │ │ │          │ │ └──────────────────┘ │ │    │
               │ │ └──▲───────────┘ │ │          │ │                      │ │    │
               │ │    │ ......      │ │          │ └──────────────────────┘ │    │
               │ └────┼─────────────┘ │          │         ......           │    │
               │      │               │          └──────────────────────────┘    │
               └──────┼───────────────┘                                          │
                      │                                                          │
                      │                                                          │
                      │                        Read                              │
                      └──────────────────────────────────────────────────────────┘

数据模型

日志格式

日志格式采用了在 基于 RocksDB 的 WAL 中定义的通用格式。

元数据

每个 region 都将在内存和 Kafka 中维护其元数据,我们在这里称之为 RegionMeta。它可以被认为是一张映射表,以表 ID 作为键,以 TableMeta 作为值。我们简要介绍一下 TableMeta 中的变量:

  • next_seq_num,为下一条写入日志分配的 sequence number。
  • latest_marked_deleted,表最后一次触发 flush 时对应的 sequence number, 所以对应 sequence number 小于该值的日志都将被标记为可以删除。
  • current_high_watermark, 该表最近一次日志写入后,Kafka 对应 topic 的高水位。
  • seq_offset_mapping,sequence number 和 Kafka 对应 topic offset 的映射,每次 flush 后,会将 latest_marked_deleted 前的条目进行清理。
┌─────────────────────────────────────────┐
│              RegionMeta                 │
│                                         │
│ Map<TableId, TableMeta> table_metas     │
└─────────────────┬───────────────────────┘
                  │
                  │
                  │
                  └─────┐
                        │
                        │
 ┌──────────────────────┴──────────────────────────────┐
 │                       TableMeta                     │
 │                                                     │
 │ SequenceNumber next_seq_num                         │
 │                                                     │
 │ SequenceNumber latest_mark_deleted                  │
 │                                                     │
 │ KafkaOffset high_watermark                          │
 │                                                     │
 │ Map<SequenceNumber, KafkaOffset> seq_offset_mapping │
 └─────────────────────────────────────────────────────┘

主要流程

我们主要关于对于单个 region 的主要操作,会介绍以下操作的主要流程:

  • 打开或创建 region。
  • 读写日志。
  • 删除日志。

打开或创建 region

步骤

  • 在打开的 namespace 中搜索 region。
  • 如果 region 存在,最重要的事是去恢复其元数据,恢复过程将在之后介绍。
  • 如果 region 不存在并且需要自动创建,则需要在 Kafka 上创建对应的 topic。
  • 在 cache 中插入相应 region 并将其返回。

恢复

上面提到,RegionMeta 实际就是以表 ID 为键,以 TableMeta 为值的映射表。因此,我们在本节中只关注特定 TableMeta 的恢复即可,将在每步的介绍中加入例子以作更好的说明。

  • 从快照中恢复。我们会在某些场景下为 RegionMeta制作快照(例如当标记日志为可删除时,真正清理日志时),并且将其写到 meta topic 中,快照实际上就是在某个时间点的 RegionMeta。当恢复 region 时,我们可以使用快照来避免扫描 data topic 的全部数据。下面为上述过程对应的例子,我们从在 Kafka 高水位为 64 的时间点时制作的快照中恢复 RegionMeta
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
high watermark in snapshot: 64

 ┌──────────────────────────────┐
 │         RegionMeta           │
 │                              │
 │          ......              │
 │ ┌──────────────────────────┐ │
 │ │       TableMeta          │ │
 │ │                          │ │
 │ │ next_seq_num: 5          │ │
 │ │                          │ │
 │ │ latest_mark_deleted: 2   │ │
 │ │                          │ │
 │ │ high_watermark: 32       │ │
 │ │                          │ │
 │ │ seq_offset_mapping:      │ │
 │ │                          │ │
 │ │ (2, 16) (3, 16) (4, 31)  │ │
 │ └──────────────────────────┘ │
 │          ......              │
 └──────────────────────────────┘
  • 从日志数据中恢复。 当从快照中恢复的过程完成后,我们以快照被制作时 data topic 中的高水位为起点,扫描其中的日志数据进行后续恢复,明显这能够避免扫描 data topic 中的全部数据。以下为上述过程的例子:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌────────────────────────────────────┐
│                                    │
│    high_watermark in snapshot: 64  │
│                                    │
│  ┌──────────────────────────────┐  │
│  │         RegionMeta           │  │
│  │                              │  │
│  │          ......              │  │
│  │ ┌──────────────────────────┐ │  │
│  │ │       TableMeta          │ │  │
│  │ │                          │ │  │
│  │ │ next_seq_num: 5          │ │  │                  ┌────────────────────────────────┐
│  │ │                          │ │  │                  │          RegionMeta            │
│  │ │ latest_mark_deleted: 2   │ │  │                  │                                │
│  │ │                          │ │  │                  │            ......              │
│  │ │ high_watermark: 32       │ │  │                  │ ┌────────────────────────────┐ │
│  │ │                          │ │  │                  │ │         TableMeta          │ │
│  │ │ seq_offset_mapping:      │ │  │                  │ │                            │ │
│  │ │                          │ │  │                  │ │ next_seq_num: 8            │ │
│  │ │ (2, 16) (3, 16) (4, 31)  │ │  │                  │ │                            │ │
│  │ └──────────────────────────┘ │  │                  │ │ latest_mark_deleted: 2     │ │
│  │          ......              │  │                  │ │                            │ │
│  └──────────────────────────────┘  ├──────────────────► │ high_watermark: 32         │ │
│                                    │                  │ │                            │ │
│ ┌────────────────────────────────┐ │                  │ │ seq_offset_mapping:        │ │
│ │          Data topic            │ │                  │ │                            │ │
│ │                                │ │                  │ │ (2, 16) (3, 16) (4, 31)    │ │
│ │ ┌────────────────────────────┐ │ │                  │ │                            │ │
│ │ │        Partition           │ │ │                  │ │ (5, 72) (6, 81) (7, 90)    │ │
│ │ │                            │ │ │                  │ │                            │ │
│ │ │ ┌────┬────┬────┬────┬────┐ │ │ │                  │ └────────────────────────────┘ │
│ │ │ │ 64 │ 65 │ ...│ 99 │100 │ │ │ │                  │             ......             │
│ │ │ └────┴────┴────┴────┴────┘ │ │ │                  └────────────────────────────────┘
│ │ │                            │ │ │
│ │ └────────────────────────────┘ │ │
│ │                                │ │
│ └────────────────────────────────┘ │
│                                    │
└────────────────────────────────────┘

读写日志

读写流程比较简单。

写流程:

  • 打开指定的 region,如果不存在则需要创建。
  • 利用 client 将日志写入到 region 对应的 data topic 中。
  • 更新 TableMeta 中的 next_seq_num, current_high_watermarkseq_offset_mapping等元数据,

读流程:

  • 打开指定的 region。
  • 读取 region 的所有日志数据,按表切分数据和回放等工作需要调用者实现。

删除日志

日志的删除可以划分为两个步骤:

  • 标记日志为可删除。
  • 利用后台线程做延迟清理。

标记

  • 更新在 TableMeta 中的 latest_mark_deletedseq_offset_mapping(需要进行维护,使得每一条目的 sequence number 大于等于更新后的 latest_mark_deleted)。
  • 或许我们需要在删除表的时候,制作并及时同步 RegionMeta 的快照到 Kafka 中。

清理

清理逻辑如下,会在后台线程中执行:

  • 制作 RegionMeta 的快照。
  • 根据快照判断是否需要进行清理。
  • 如果需要,先同步快照到 Kafka 中,然后清理日志。

7 - 基于 RocksDB 的 WAL

架构

在本节中,我们将介绍单机版 WAL 的实现(基于 RocksDB)。预写日志(write-ahead logs,以下简称日志)在本实现中是按表级别进行管理的,对应的数据结构为 TableUnit。为简单起见,所有相关数据(日志或元数据)都存储在单个 column family(RocksDB 中的概念,可以类比关系型数据库的表) 中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
            ┌─────────────────────────┐
            │         HoraeDB         │
            │                         │
            │ ┌─────────────────────┐ │
            │ │         WAL         │ │
            │ │                     │ │
            │ │        ......       │ │
            │ │                     │ │
            │ │  ┌────────────────┐ │ │
 Write ─────┼─┼──►   TableUnit    │ │ │
            │ │  │                │ │ │
 Read  ─────┼─┼──► ┌────────────┐ │ │ │
            │ │  │ │ RocksDBRef │ │ │ │
            │ │  │ └────────────┘ │ │ │
Delete ─────┼─┼──►                │ │ │
            │ │  └────────────────┘ │ │
            │ │        ......       │ │
            │ └─────────────────────┘ │
            │                         │
            └─────────────────────────┘

数据模型

通用日志格式

通用日志格式分为 key 格式和 value 格式,下面是对 key 格式各个字段的介绍:

  • namespace: 出于不同的目的,可能会存在多个 WAL 实例(例如,manifest 也依赖于 wal), namespace 用于区分它们。
  • region_id: 在一些 WAL 实现中我们可能需要在共享日志文件中,管理来自多个表的日志,region 就是描述这样一组表的概念, 而 region id 就是其标识。
  • table_id: 表的标识。
  • sequence_num: 特定表中单条日志的标识。
  • version: 用于兼容新旧格式。
1
2
3
+---------------+----------------+-------------------+--------------------+-------------+
| namespace(u8) | region_id(u64) |   table_id(u64)   |  sequence_num(u64) | version(u8) |
+---------------+----------------+-------------------+--------------------+-------------+

下面是对 value 格式各个字段的介绍(payload 可以理解为编码后的具体日志内容):

1
2
3
+--------------------+----------+
| version header(u8) | payload  |
+--------------------+----------+

元数据

与日志格式相同,元数据以 key-value 格式存储, 本实现的元数据实际只是存储了每张表最近一次 flush 对应的 sequence_num。下面是定义的元数据 key 格式和其中字段的介绍:

  • namespace, table_id, version 和日志格式中相同。
  • key_type, 用于定义元数据的类型,现在只定义了 MaxSeq 类型的元数据,在。 因为在 RocksDB 版本的 WAL 实现中,日志是按表级别进行管理,所以这个 key 格式里面没有 region_id 字段。
1
2
3
+---------------+--------------+----------------+-------------+
| namespace(u8) | key_type(u8) | table_id(u64)  | version(u8) |
+---------------+--------------+----------------+-------------+

这是定义的元数据值格式,如下所示,其中只有 versionmax_seq(flushed sequence):

1
2
3
+-------------+--------------+
| version(u8) | max_seq(u64) |
+-------------+--------------+

主要流程

  • 打开 TableUnit:
    • 读取所有表的最新日志条目,目的是恢复表的 next sequence num(将会分配给下一条写入的日志)。
    • 扫描 metadata 恢复上一步遗漏的表的 next sequence num(因为可能有表刚刚触发了 fl​​ush,并且之后没有新的写入日志,所以当前不存在日志数据)。
  • 读写日志。从 RocksDB 读取或者写入相关日志数据。
  • 删除日志。为简单起见,在本实现中只是同步地删除相应的日志数据。

8 - 基于本地磁盘的 WAL

架构

本节将介绍基于本地磁盘的单机版 WAL(Write-Ahead Log,以下简称日志)的实现。在此实现中,日志按 region 级别进行管理。

            ┌────────────────────────────┐
            │          HoraeDB           │
            │                            │
            │ ┌────────────────────────┐ │
            │ │          WAL           │ │         ┌────────────────────────┐
            │ │                        │ │         │                        │
            │ │         ......         │ │         │      File System       │
            │ │                        │ │         │                        │
            │ │ ┌────────────────────┐ │ │ manage  │ ┌────────────────────┐ │
 Write ─────┼─┼─►       Region       ├─┼─┼─────────┼─►     Region Dir     │ │
            │ │ │                    │ │ │         │ │                    │ │
 Read  ─────┼─┼─►   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 0 ├───┼─┼─┼─────────┼─┼─► Segment File 0 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
Delete ─────┼─┼─►   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 1 ├───┼─┼─┼─────────┼─┼─► SegmenteFile 1 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
            │ │ │   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 2 ├───┼─┼─┼─────────┼─┼─► SegmenteFile 2 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
            │ │ │       ......       │ │ │         │ │       ......       │ │
            │ │ └────────────────────┘ │ │         │ └────────────────────┘ │
            │ │         ......         │ │         │         ......         │
            │ └────────────────────────┘ │         └────────────────────────┘
            └────────────────────────────┘

数据模型

文件路径

每个 region 都拥有一个目录,用于管理该 region 的所有 segment。目录名为 region 的 ID。每个 segment 的命名方式为 seg_<id>,ID 从 0 开始递增。

Segment 的格式

一个 region 中所有表的日志都存储在 segments 中,并按照 sequence number 从小到大排列。segment 文件的结构如下:

   Segment0            Segment1
┌────────────┐      ┌────────────┐
│ Magic Num  │      │ Magic Num  │
├────────────┤      ├────────────┤
│   Record   │      │   Record   │
├────────────┤      ├────────────┤
│   Record   │      │   Record   │
├────────────┤      ├────────────┤   ....
│   Record   │      │   Record   │
├────────────┤      ├────────────┤
│     ...    │      │     ...    │
│            │      │            │
└────────────┘      └────────────┘
    seg_0               seg_1

在内存中,每个 segment 还会存储一些额外的信息以供读写和删除操作使用:

pub struct Segment {
    /// A hashmap storing both min and max sequence numbers of records within
    /// this segment for each `TableId`.
    table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,

    /// An optional vector of positions within the segment.
    record_position: Vec<Position>,

    ...
}

日志格式

segment 中的日志格式如下:

+---------+--------+------------+--------------+--------------+-------+
| version |  crc   |  table id  | sequence num | value length | value |
|  (u8)   | (u32)  |   (u64)    |    (u64)     |     (u32)    |(bytes)|
+---------+--------+------------+--------------+--------------+-------+

字段说明:

  1. version:日志版本号。

  2. crc:用于确保数据一致性。计算从 table id 到该记录结束的 CRC 校验值。

  3. table id:表的唯一标识符。

  4. sequence num:记录的序列号。

  5. value length:value 的字节长度。

  6. value:通用日志格式中的值。

日志中不存储 region ID,因为可以通过文件路径获取该信息。

主要流程

打开 Wal

  1. 识别 Wal 目录下的所有 region 目录。

  2. 在每个 region 目录下,识别所有 segment 文件。

  3. 打开每个 segment 文件,遍历其中的所有日志,记录其中每个日志开始和结束的偏移量和每个 TableId 在该 segment 中的最小和最大序列号,然后关闭文件。

  4. 如果不存在 region 目录或目录下没有任何 segment 文件,则自动创建相应的目录和文件。

读日志

  1. 根据 segment 的元数据,确定本次读取操作涉及的所有 segment。
  2. 按照 id 从小到大的顺序,依次打开这些 segment,将原始字节解码为日志。

写日志

  1. 将待写入的日志序列化为字节数据,追加到 id 最大的 segment 文件中。

  2. 每个 segment 创建时预分配固定大小的 64MB,不会动态改变。当预分配的空间用完后,创建一个新的 segment,并切换到新的 segment 继续追加。

  3. 每次追加后不会立即调用 flush;默认情况下,每写入十次或在 segment 文件关闭时才执行 flush。

  4. 在内存中更新 segment 的元数据 table_ranges

删除日志

假设需要将 id 为 table_id 的表中,序列号小于 seq_num 的日志标记为删除:

  1. 在内存中更新相关 segment 的 table_ranges 字段,将该表的最小序列号更新为 seq_num + 1。

  2. 如果修改后,该表在此 segment 中的最小序列号大于最大序列号,则从 table_ranges 中删除该表。

  3. 如果一个 segment 的 table_ranges 为空,且不是 id 最大的 segment,则删除该 segment 文件。