1 - Compaction Offload

Note: This feature is still in development.

This chapter discusses compaction offload, which is designed to separate the compaction workload from the local horaedb nodes and delegate it to external compaction nodes.

Overview

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────────────────┐
│                                                                         │
│                           HoraeMeta Cluster                             │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
    ▲                ▲                           |                   |
    │                │1.Fetch compaction         │(Monitor compaction│
    │                │  node  info               │node)              │
    |                │                           ▼                   ▼
┌────────────┐  ┌────────────┐              ┌────────────┐   ┌────────────┐
│            │  │            │2.Offload Task│            │   │            │
│  HoraeDB   │  │  HoraeDB   │  ─────────▶  │ Compaction │   │ Compaction │
│            │  │            │  ◀─────────  │ Node       │   │ Node       │
└────────────┘  └────────────┘ 4.Ret Result └────────────┘   └────────────┘
    |                |                            |                 |
    │  5.Update the  │                            │    3.Compact    │
    │    SSTable     │                            │                 │
    ▼                ▼                            ▼                 ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                                                ┌─────────────────────┐  │
│                            Object Storage      │ Temporary Workspace │  │
│                                                └─────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘

The diagram above describes the architecture of cluster for compaction offload, where some key concepts need to be explained:

  • Compaction Node: Takes responsibility to handle offloaded compaction tasks. The compaction node receives the compaction task and performs the actual merging of SSTables, then sends back the task result to HoraeDB.
  • HoraeMeta Cluster: HoraeMeta acts as a compaction nodes manager in the compaction offload scenario. It monitors the compaction nodes cluster and schedule the compaction nodes.

The procedure of remote compaction based above architecture diagram is:

  1. HoraeDB servers fetch the information of suitable compaction nodes from the HoraeMeta.
  2. HoraeDB submit the compaction task to the remote compaction node, according to the information fetched from HoraeMeta.
  3. Compaction node executes the task and write results to the temporary workspace.
  4. Compaction node sends compaction results back to HoraeDB.
  5. HoraeDB receives the result, installs the data in temporary workspace and purges compaction input files.

The architecture above makes it easy to implement some wonderful features like load balancing and high availability. Let’s dive into the key components in the architecture and talking about how these features are implemented.

Compaction Node

Compaction Node runs the main logic of compaction. It is implemented based on HoraeDB and distinguished by:

  • NodeType: A config parameter used to distinguished the HoraeDB and CompactionNode. This info would be sent to HoraeMeta through heartbeat.

The compaction service is implemented as grpc service.

HoraeMeta

HoraeMeta manages the compaction nodes cluster with CompactionNodeManager, which takes responsibilities for compaction nodes metadata management and scheduling.

The compaction nodes metadata includes:

  • Compaction node information, such as node name, node state;
  • A compaction node name list, used as the key to access compaction node info, for better scheduling with round-robin strategy;

As for the compaction nodes scheduling work, it mainly includes:

  • Receiving the heartbeats from the compaction node and determining the online status of these registered nodes.
  • Performing load balancing according to the compaction nodes cluster info.
  • Providing the info of suitable compaction node for HoraeDB when remote compaction execution is needed.

Load Balancing

Load Balancing is critical for compaction nodes cluster to make their overall processing more efficient. The effect of load balancing mainly based on the schedule algorithm for compaction nodes impl in CompactionNodeManager.

(ps: The current implementation of schedule algorithm is round-robin strategy for easiness.)

The main process for the schedule algorithm based on real load is:

  • HoraeMeta collects the compaction nodes load information through the heartbeats to create a load overview of the compaction nodes cluster.
  • Pick a compaction node with low load according to the load overview.

High Availability

The fault tolerance of above architecture can be achieved by such a procedure:

  • When detecting that the heartbeat is broken, HoraeMeta determines that the compaction node is offline.
  • When HoraeMeta can not provide suitable compaction node for HoraeDB or compaction node doesn’t return the task result successfully, HoraeDB would switches to run compaction task locally.

2 - Introduction to Architecture of HoraeDB Cluster

Note: Some of the features mentioned in the article have not yet been implemented.

Overview

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│                           HoraeMeta Cluster                           │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘
                              ▲               ▲                 ▲
                              │               │                 │
                              │               │                 │
                              ▼               ▼                 ▼
┌───────┐Route Info ┌HoraeDB─────┬┬─┐ ┌HoraeDB─────┬┬─┐ ┌HoraeDB─────┬┬─┐
│client │◀────────▶ │  │  │TableN││ │ │  │  │TableN││ │ │  │  │TableN││ │
└───────┘Write/Query└──Shard(L)──┴┴─┘ └──Shard(F)──┴┴─┘ └──Shard(F)──┴┴─┘
                            ▲ │                 ▲               ▲
                              │                 │               │
                            │ Write─────────┐   ├────Sync───────┘
                                            │   │
                            │     ┌────────┬▼───┴────┬──────────────────┐
              Upload/Download     │        │         │                  │
                    SST     │     │WAL     │Region N │                  │
                                  │Service │         │                  │
                            │     └────────┴─────────┴──────────────────┘

┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│                            Object Storage                             │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

The diagram above describes the architecture of a HoraeDB cluster, where some key concepts need to be explained:

  • HoraeMeta Cluster: Takes responsibilities for managing the metadata and resource scheduling of the cluster;
  • Shard(L)/Shard(F): Leader shard and follower shard consisting of multiple tables;
  • HoraeDB: One HoraeDB instance consisting of multiple shards;
  • WAL Service: Write-ahead log service for storing new-written real-time data;
  • Object Storage: Object storage service for storing SST converted from memtable;

From the architecture diagram above, it can be concluded that the compute and storage are separated in the HoraeDB cluster, which makes it easy to implement useful distributed features, such as elastic autoscaling of compute/storage resources, high availability, load balancing, and so on.

Let’s dive into some of the key components mentioned above before explaining how these features are implemented.

Shard

Shard is the basic scheduling unit in the cluster, which consists of a group of tables. And the tables in a shard share the same region for better storage locality in the WAL Service, and because of this, it is efficient to recover the data of all tables in the shard by scanning the entire WAL region. For most of implementations of WAL Service, without the shard concept, it costs a lot to recover the table data one by one due to massive random IO, and this case will deteriorate sharply when the number of tables grows to a certain level.

A specific role, Leader or Follower, should be assigned to a shard. A pair of leader-follower shards share the same set of tables, and the leader shard can serve the write and query requests from the client while the follower shard can only serve the read-only requests, and must synchronize the newly written data from the WAL service in order to provide the latest snapshot for data retrieval. Actually, the follower is not needed if the high availability is not required, while with at least one follower, it takes only a short time to resume service by simply switching the Follower to Leader when the HoraeDB instance on which the leader shard exists crashes.

The diagram below concludes the relationship between HoraeDB instance, Shard, Table. As shown in the diagram, the leader and follower shards are interleaved on the HoraeDB instance.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
┌─HoraeDB Instance0──────┐     ┌─HoraeDB Instance1──────┐
│  ┌─Shard0(L)────────┐  │     │  ┌─Shard0(F)────────┐  │
│  │ ┌────┬────┬────┐ │  │     │  │ ┌────┬────┬────┐ │  │
│  │ │ T0 │ T1 │ T2 │ │  │     │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │  │     │  │ └────┴────┴────┘ │  │
│  └──────────────────┘  │     │  └──────────────────┘  │
│                        │     │                        │
│  ┌─Shard1(F)────────┐  │     │  ┌─Shard1(L)────────┐  │
│  │ ┌────┬────┬────┐ │  │     │  │ ┌────┬────┬────┐ │  │
│  │ │ T0 │ T1 │ T2 │ │  │     │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │  │     │  │ └────┴────┴────┘ │  │
│  └──────────────────┘  │     │  └──────────────────┘  │
└────────────────────────┘     └────────────────────────┘

Since Shard is the basic scheduling unit, it is natural to introduce some basic shard operations:

  • Create/Drop table to/from a shard;
  • Open/Close a shard;
  • Split one shard into two shards;
  • Merge two shards into one shard;
  • Switch the role of a shard;

With these basic shard operations, some complex scheduling logic can be implemented, e.g. perform an expansion by splitting one shard into two shards and migrating one of them to the new HoraeDB instance.

HoraeMeta

HoraeMeta is implemented by embedding an ETCD inside to ensure consistency and takes responsibilities for cluster metadata management and scheduling.

The cluster metadata includes:

  • Table information, such as table name, table ID, and which cluster the table belongs to;
  • The mapping between table and shard and between shard and HoraeDB instance;

As for the cluster scheduling work, it mainly includes:

  • Receiving the heartbeats from the HoraeDB instances and determining the online status of these registered instances;
  • Assigning specific role shards to the registered HoraeDB instances;
  • Participating in table creation by assigning a unique table ID and the most appropriate shard to the table;
  • Performing load balancing through shard operations according to the load information sent with the heartbeats;
  • Performing expansion through shard operations when new instances are registered;
  • Initiating failover through shard operations when old instances go offline;

Route

In order to avoid the overhead of forwarding requests, the communication between clients and the HoraeDB instances is peer-to-peer, that is to say, the client should retrieve routing information from the server before sending any specific write/query requests.

Actually, the routing information is decided by the HoraeMeta, but clients are only allowed the access to it through the HoraeDB instances rather than HoraeMeta, to avoid potential performance issues on the HoraeMeta.

WAL Service & Object Storage

In the HoraeDB cluster, WAL Service and Object Storage exist as separate distributed systems featured with HA, data replication and scalability. Current distributed implementations for WAL Service includes Kafka and OBKV (access OceanBase by its table api), and the implementations for Object Storage include popular object storage services, such as AWS S3, Azure object storage and Aliyun OSS.

The two components are similar in that they are introduced to serve as the underlying storage layer for separating compute and storage, while the difference between two components is obvious that WAL Service is used to store the newly written data from the real-time write requests whose individual size is small but quantity is large, and Object Storage is used to store the read-friendly data files (SST) organized in the background, whose individual size is large and aggregate size is much larger.

The two components make it much easier to implement the horaedb cluster, which features horizontal scalability, high availability and load balancing.

Scalability

Scalability is an important feature for a distributed system. Let’s take a look at to how the horizontal scalability of the HoraeDB cluster is achieved.

First, the two storage components (WAL Service and Object Storage) should be horizontally scalable when deciding on the actual implementations for them, so the two storage services can be expanded separately if the storage capacity is not sufficient.

It will be a little bit complex when discussing the scalability of the compute service. Basically, these cases will bring the capacity problem:

  • Massive queries on massive tables;
  • Massive queries on a single large table;
  • Massive queries on a normal table;

For the first case, it is easy to achieve horizontal scalability just by assigning shards that are created or split from old shards to expanded HoraeDB instances.

For the second case, the table partitioning is proposed and after partitioning, massive queries are distributed across multiple HoraeDB instances.

And the last case is the most important and the most difficult. Actually, the follower shard can handle part of the queries, but the number of follower shards is limited by the throughput threshold of the synchronization from the WAL regions. As shown in the diagram below, a pure compute shard can be introduced if the followers are not enough to handle the massive queries. Such a shard is not required to synchronize data with the leader shard, and retrieves the newly written data from the leader/follower shard only when the query comes. As for the SSTs required by the query, they will be downloaded from Object Storage and cached afterwards. With the two parts of the data, the compute resources are fully utilized to execute the CPU-intensive query plan. As we can see, such a shard can be added with only a little overhead (retrieving some data from the leader/follower shard when it needs), so to some extent, the horizontal scalability is achieved.

1
2
3
4
5
6
7
8
9
                                             ┌HoraeDB─────┬┬─┐
                            ┌──newly written─│  │  │TableN││ │
                            ▼                └──Shard(L/F)┴┴─┘
┌───────┐  Query  ┌HoraeDB─────┬┬─┐
│client │────────▶│  │  │TableN││ │
└───────┘         └──Shard─────┴┴─┘          ┌───────────────┐
                            ▲                │    Object     │
                            └───old SST──────│    Storage    │
                                             └───────────────┘

High Availability

Assuming that WAL service and Object Storage are highly available, the high availability of the HoraeDB cluster can be achieved by such a procedure:

  • When detecting that the heartbeat is broken, HoraeMeta determines that the HoraeDB instance is offline;
  • The follower shards whose paired leader shards exist on the offline instance are switched to leader shards for fast failover;
  • A slow failover can be achieved by opening the crashed shards on another instance if such follower shards don’t exist.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────────────────────┐
│                                                         │
│                    HoraeMeta Cluster                    │
│                                                         │
└─────────────────────────────────────────────────────────┘
             ┌ ─ Heartbeat ─ ┤
                   Broken    │
             │               │
┌ HoraeDB Instance0 ─ ─ ─    │   ┌─HoraeDB Instance1──────┐                   ┌─HoraeDB Instance1──────┐
   ┌─Shard0(L)────────┐  │   │   │  ┌─Shard0(F)────────┐  │                   │  ┌─Shard0(L)────────┐  │
│  │ ┌────┬────┬────┐ │      │   │  │ ┌────┬────┬────┐ │  │                   │  │ ┌────┬────┬────┐ │  │
   │ │ T0 │ T1 │ T2 │ │  │   ├───│  │ │ T0 │ T1 │ T2 │ │  │                   │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │      │   │  │ └────┴────┴────┘ │  │                   │  │ └────┴────┴────┘ │  │
   └──────────────────┘  │   │   │  └──────────────────┘  │     Failover      │  └──────────────────┘  │
│                            │   ├─HoraeDB Instance2──────┤   ───────────▶    ├─HoraeDB Instance2──────┤
   ┌─Shard1(L)────────┐  │   │   │  ┌─Shard1(F)────────┐  │                   │  ┌─Shard1(L)────────┐  │
│  │ ┌────┬────┬────┐ │      │   │  │ ┌────┬────┬────┐ │  │                   │  │ ┌────┬────┬────┐ │  │
   │ │ T0 │ T1 │ T2 │ │  │   └───│  │ │ T0 │ T1 │ T2 │ │  │                   │  │ │ T0 │ T1 │ T2 │ │  │
│  │ └────┴────┴────┘ │          │  │ └────┴────┴────┘ │  │                   │  │ └────┴────┴────┘ │  │
   └──────────────────┘  │       │  └──────────────────┘  │                   │  └──────────────────┘  │
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─        └────────────────────────┘                   └────────────────────────┘

Load Balancing

HoraeMeta collects the instance load information contained in the received heartbeats to create a load overview of the whole cluster, according to which the load balancing can be implemented as an automatic mechanism:

  • Pick a shard on a low-load instance for the newly created table;
  • Migrate a shard from a high-load instance load to another low-load instance;
  • Split the large shard on the high-load instance and migrate the split shards to other low-load instances;

3 - Introduction to HoraeDB's Architecture

Target

  • Provide the overview of HoraeDB to the developers who want to know more about HoraeDB but have no idea where to start.
  • Make a brief introduction to the important modules of HoraeDB and the connections between these modules but details about their implementations are not be involved.

Motivation

HoraeDB is a timeseries database (TSDB). However, HoraeDB’s goal is to handle both timeseries and analytic workloads compared with the classic TSDB, which usually have a poor performance in handling analytic workloads.

In the classic timeseries database, the Tag columns (InfluxDB calls them Tag and Prometheus calls them Label) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag varies in different scenarios. And in some scenarios the cardinality of Tag is very high (we name this case after analytic workload), and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such analytic workload.

The basic design idea of HoraeDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.

Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
┌──────────────────────────────────────────┐
│       RPC Layer (HTTP/gRPC/MySQL)        │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│                 SQL Layer                │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │     Parser      │  │     Planner     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌───────────────────┐  ┌───────────────────┐
│    Interpreter    │  │      Catalog      │
└───────────────────┘  └───────────────────┘
┌──────────────────────────────────────────┐
│               Query Engine               │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │    Optimizer    │  │    Executor     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│         Pluggable Table Engine           │
│  ┌────────────────────────────────────┐  │
│  │              Analytic              │  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││      Wal       ││    Memtable    ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││     Flush      ││   Compaction   ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││    Manifest    ││  Object Store  ││  │
│  │└────────────────┘└────────────────┘│  │
│  └────────────────────────────────────┘  │
│  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           Another Table Engine        │  │
│  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
└──────────────────────────────────────────┘

The figure above shows the architecture of HoraeDB stand-alone service and the details of some important modules will be described in the following part.

RPC Layer

module path: https://github.com/apache/incubator-horaedb/tree/main/server

The current RPC supports multiple protocols including HTTP, gRPC, MySQL.

Basically, HTTP and MySQL are used to debug HoraeDB, query manually and perform DDL operations (such as creating, deleting tables, etc.). And gRPC protocol can be regarded as a customized protocol for high-performance, which is suitable for massive reading and writing operations.

SQL Layer

module path: https://github.com/apache/incubator-horaedb/tree/main/query_frontend

SQL layer takes responsibilities for parsing sql and generating the query plan.

Based on sqlparser a sql dialect, which introduces some key concepts including Tag and Timestamp, is provided for processing timeseries data. And by utilizing DataFusion the planner is able to generate both regular logical plans and tailored ones which is used to implement the special operators defined by timeseries queries, e.g PromQL.

Interpreter

module path: https://github.com/apache/incubator-horaedb/tree/main/interpreters

The Interpreter module encapsulates the SQL CRUD operations. In the query procedure, a sql received by HoraeDB is parsed, converted into the query plan and then executed in some specific interpreter, such as SelectInterpreter, InsertInterpreter and etc.

Catalog

module path: https://github.com/apache/incubator-horaedb/tree/main/catalog_impls

Catalog is actually the module managing metadata and the levels of metadata adopted by HoraeDB is similar to PostgreSQL: Catalog > Schema > Table, but they are only used as namespace.

At present, Catalog and Schema have two different kinds of implementation for standalone and distributed mode because some strategies to generate ids and ways to persist metadata differ in different mode.

Query Engine

module path: https://github.com/apache/incubator-horaedb/tree/main/query_engine

Query Engine is responsible for optimizing and executing query plan given a basic SQL plan provided by SQL layer and now such work is mainly delegated to DataFusion.

In addition to the basic functions of SQL, HoraeDB also defines some customized query protocols and optimization rules for some specific query plans by utilizing the extensibility provided by DataFusion. For example, the implementation of PromQL is implemented in this way and read it if you are interested.

Pluggable Table Engine

module path: https://github.com/apache/incubator-horaedb/tree/main/table_engine

Table Engine is actually a storage engine for managing tables in HoraeDB and the pluggability of Table Engine is a core design of HoraeDB which matters in achieving our long-term target, e.g supporting handle log or tracing workload by implementing new storage engines. HoraeDB will have multiple kinds of Table Engine for different workloads and the most appropriate one should be chosen as the storage engine according to the workload pattern.

Now the requirements for a Table Engine are:

  • Manage all the shared resources under the engine:
    • Memory
    • Storage
    • CPU
  • Manage metadata of tables such as table schema and table options;
  • Provide Table instances which provides read and write methods;
  • Take responsibilities for creating, opening, dropping and closing Table instance;
  • ….

Actually the things that a Table Engine needs to process are a little complicated. And now in HoraeDB only one Table Engine called Analytic is provided and does a good job in processing analytical workload, but it is not ready yet to handle the timeseries workload (we plan to enhance it for a better performance by adding some indexes which help handle timeseries workload).

The following part gives a description about details of Analytic Table Engine.

WAL

module path: https://github.com/apache/incubator-horaedb/tree/main/wal

The model of HoraeDB processing data is WAL + MemTable that the recent written data is written to WAL first and then to MemTable and after a certain amount of data in MemTable is accumulated, the data will be organized in a query-friendly form to persistent devices.

Now three implementations of WAL are provided for standalone and distributed mode:

  • For standalone mode, WAL is based on RocksDB and data is persisted on the local disk.
  • For distributed mode, WAL is required as a distributed component and to be responsible for durability of the newly written data, so now we provide an implementation based on OceanBase.
  • For distributed mode, in addition to OceanBase, we also provide a more lightweight implementation based on Apache Kafka.

MemTable

module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/memtable

For WAL can’t provide efficient data retrieval, the newly written data is also stored in Memtable for efficient data retrieval, after a certain amount of data is reached, HoraeDB organizes the data in MemTable into a query-friendly storage format (SST) and stores it to the persistent device.

The current implementation of MemTable is based on agatedb’s skiplist. It allows concurrent reads and writes and can control memory usage based on Arena.

Flush

module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/instance/flush_compaction.rs

What Flush does is that when the memory usage of MemTable reaches the threshold, some MemTables are selected for flushing into query-friendly SSTs saved on persistent device.

During the flushing procedure, the data will be divided by a certain time range (which is configured by table option Segment Duration), and any SST is ensured that the timestamps of the data in it are in the same Segment. Actually this is also a common operation in most timeseries databases which organizes data in the time dimension to speed up subsequent time-related operations, such as querying data over a time range and assisting purge data outside the TTL.

Compaction

module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/compaction

The data of MemTable is flushed as SSTs, but the file size of recently flushed SST may be very small. And too small or too many SSTs lead to the poor query performance. Therefore, Compaction is then introduced to rearrange the SSTs so that the multiple smaller SST files can be compacted into a larger SST file.

Manifest

module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/meta

Manifest takes responsibilities for managing tables’ metadata of Analytic Engine including:

  • Table schema and table options;
  • The sequence number where the newest flush finishes;
  • The information of all the SSTs belonging to the table.

Now the Manifest is based on WAL and Object Storage. The newly written updates on the Manifest are persisted as logs in WAL, and in order to avoid infinite expansion of Manifest (actually every Flush leads to an update), Snapshot is also introduced to clean up the history of metadata updates, and the generated Snapshot will be saved to Object Storage.

Object Storage

module path: https://github.com/apache/incubator-horaedb/tree/main/components/object_store

The SST generated by Flush needs to be persisted and the abstraction of the persistent storage device is ObjectStore including multiple implementations:

The distributed architecture of HoraeDB separates storage and computing, which requires Object Store needs to be a highly available and reliable service independent of HoraeDB. Therefore, storage systems like Amazon S3, Alibaba Cloud OSS is a good choice and in the future implementations on storage systems of some other cloud service providers is planned to provide.

SST

module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/sst

SST is actually an abstraction that can have multiple specific implementations. The current implementation is based on Parquet, which is a column-oriented data file format designed for efficient data storage and retrieval.

The format of SST is very critical for retrieving data and is also the most important part to perform well in handling both timeseries and analytic workloads. At present, our Parquet-based implementation is good at processing analytic workload but is poor at processing timeseries workload. In our roadmap, we will explore more storage formats in order to achieve a good performance in both workloads.

Space

module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/space.rs

In Analytic Engine, there is a concept called space and here is an explanation for it to resolve some ambiguities when read source code. Actually Analytic Engine does not have the concept of catalog and schema and only provides two levels of relationship: space and table. And in the implementation, the schema id (which should be unique across all catalogs) on the upper layer is actually mapped to space id.

The space in Analytic Engine serves mainly for isolation of resources for different tenants, such as the usage of memory.

Critical Path

After a brief introduction to some important modules of HoraeDB, we will give a description for some critical paths in code, hoping to provide interested developers with a guide for reading the code.

Query

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─10──│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   9│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
                           │    ▲
                          5│   8│
                           │    │
                           ▼    │
                   ┌──────────────────┐
                   │                  │
                   │   Query Engine   │
                   │                  │
                   └──────────────────┘
                           │    ▲
                          6│   7│
                           │    │
                           ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

Take SELECT SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.

Here are the details:

  • Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
  • Parse SQL in the request by the parser;
  • With the parsed sql and the information provided by catalog/schema module, DataFusion can generate the logical plan;
  • With the logical plan, the corresponding Interpreter is created and logical plan will be executed by it;
  • For the logical plan of normal Select SQL, it will be executed through SelectInterpreter;
  • In the SelectInterpreter the specific query logic is executed by the Query Engine:
    • Optimize the logical plan;
    • Generate the physical plan;
    • Optimize the physical plan;
    • Execute the physical plan;
  • The execution of physical plan involves Analytic Engine:
    • Data is obtained by read method of Table instance provided by Analytic Engine;
    • The source of the table data is SST and Memtable, and the data can be filtered by the pushed down predicates;
    • After retrieving the table data, Query Engine will complete the specific computation and generate the final results;
  • SelectInterpreter gets the results and feeds them to the protocol module;
  • After the protocol layer converts the results, the server module responds to the client with them.

The following is the flow of function calls in version v1.2.2:

                                                       ┌───────────────────────◀─────────────┐    ┌───────────────────────┐
                                                       │      handle_sql       │────────┐    │    │       parse_sql       │
                                                       └───────────────────────┘        │    │    └────────────────┬──────┘
                                                           │             ▲              │    │           ▲         │
                                                           │             │              │    │           │         │
                                                           │             │              │    └36───┐     │        11
                                                          1│             │              │          │     │         │
                                                           │            8│              │          │     │         │
                                                           │             │              │          │    10         │
                                                           │             │              │          │     │         │
                                                           ▼             │              │          │     │         ▼
                                                       ┌─────────────────┴─────┐       9│         ┌┴─────┴────────────────┐───────12─────────▶┌───────────────────────┐
                                                       │maybe_forward_sql_query│        └────────▶│fetch_sql_query_output │                   │   statement_to_plan   │
                                                       └───┬───────────────────┘                  └────┬──────────────────┘◀───────19─────────└───────────────────────┘
                                                           │             ▲                             │              ▲                           │               ▲
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │             35                          13              18
                                                          2│            7│                            20              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           │               │
                                                           │             │                             │              │                           ▼               │
                                                           ▼             │                             ▼              │                       ┌───────────────────────┐
          ┌───────────────────────┐───────────6───────▶┌─────────────────┴─────┐                    ┌─────────────────┴─────┐                 │Planner::statement_to_p│
          │ forward_with_endpoint │                    │        forward        │                    │execute_plan_involving_│                 │          lan          │
          └───────────────────────┘◀────────5──────────└───┬───────────────────┘                 ┌──│    partition_table    │◀────────┐       └───┬───────────────────┘
                                                           │             ▲                       │  └───────────────────────┘         │           │              ▲
                                                           │             │                       │     │              ▲               │           │              │
                                                           │             │                       │     │              │               │          14             17
           ┌───────────────────────┐                       │            4│                       │     │              │               │           │              │
     ┌─────│ PhysicalPlan::execute │                      3│             │                       │    21              │               │           │              │
     │     └───────────────────────┘◀──┐                   │             │                       │     │             22               │           │              │
     │                                 │                   │             │                       │     │              │               │           ▼              │
     │                                 │                   │             │                       │     │              │               │       ┌────────────────────────┐
     │                                 │                   ▼             │                       │     ▼              │              34       │sql_statement_to_datafus│
     │     ┌───────────────────────┐  30               ┌─────────────────┴─────┐                 │  ┌─────────────────┴─────┐         │       │        ion_plan        │
    31     │ build_df_session_ctx  │   │               │         route         │                 │  │   build_interpreter   │         │       └────────────────────────┘
     │     └────┬──────────────────┘   │               └───────────────────────┘                 │  └───────────────────────┘         │           │              ▲
     │          │           ▲          │                                                         │                                    │           │              │
     │         27          26          │                                                        23                                    │          15             16
     │          ▼           │          │                                                         │                                    │           │              │
     └────▶┌────────────────┴──────┐   │               ┌───────────────────────┐                 │                                    │           │              │
           │ execute_logical_plan  ├───┴────32────────▶│       execute         │──────────┐      │   ┌───────────────────────┐        │           ▼              │
           └────┬──────────────────┘◀────────────25────┴───────────────────────┘         33      │   │interpreter_execute_pla│        │       ┌────────────────────────┐
                │           ▲                                           ▲                 └──────┴──▶│           n           │────────┘       │SqlToRel::sql_statement_│
               28           │                                           └──────────24────────────────┴───────────────────────┘                │   to_datafusion_plan   │
                │          29                                                                                                                 └────────────────────────┘
                ▼           │
           ┌────────────────┴──────┐
           │     optimize_plan     │
           └───────────────────────┘
  1. The received request will be forwarded to handle_sql after various protocol conversions, and since the request may not be processed by this node, it may need to be forwarded to maybe_forward_sql_query to handle the forwarding logic.
  2. After constructing the ForwardRequest in maybe_forward_sql_query, call forward
  3. After constructing the RouteRequest in forward, call route
  4. Use route to get the destination node endpoint and return to forward.
  5. Call forward_with_endpoint to forward the request
  6. return forward
  7. return maybe_forward_sql_query
  8. return handle_sql
  9. If this is a Local request, call fetch_sql_query_output to process it
  10. Call parse_sql to parse sql into Statment
  11. return fetch_sql_query_output
  12. Call statement_to_plan with Statment
  13. Construct Planner with ctx and Statment, and call the statement_to_plan method of Planner
  14. The planner will call the corresponding planner method for the requested category, at this point our sql is a query and will call sql_statement_to_plan
  15. Call sql_statement_to_datafusion_plan , which will generate the datafusion object, and then call SqlToRel::sql_statement_to_plan
  16. The generated logical plan is returned from SqlToRel::sql_statement_to_plan
  17. return
  18. return
  19. return
  20. Call execute_plan_involving_partition_table (in the default configuration) for subsequent optimization and execution of this logical plan
  21. Call build_interpreter to generate Interpreter
  22. return
  23. Call Interpreter's interpreter_execute_plan method for logical plan execution.
  24. The corresponding execute function is called, at this time the sql is a query, so the execute of the SelectInterpreter will be called
  25. call execute_logical_plan , which will call build_df_session_ctx to generate the optimizer
  26. build_df_session_ctx will use the config information to generate the corresponding context, first using datafusion and some custom optimization rules (in logical_optimize_rules()) to generate the logical plan optimizer, using apply_adapters_for_physical_optimize_rules to generate the physical plan optimizer
  27. return optimizer
  28. Call optimize_plan, using the optimizer just generated to first optimize the logical plan and then the physical plan
  29. Return to optimized physical plan
  30. execute physical plan
  31. returned after execution
  32. After collecting the results of all slices, return
  33. return
  34. return
  35. return
  36. Return to the upper layer for network protocol conversion and finally return to the request sender

Write

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─8───│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   7│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
      │    ▲
      │    │
      │    │
      │    │
      │    │       ┌──────────────────┐
      │    │       │                  │
     5│   6│       │   Query Engine   │
      │    │       │                  │
      │    │       └──────────────────┘
      │    │
      │    │
      │    │
      ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

Take INSERT SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.

Here are the details:

  • Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
  • Parse SQL in the request by the parser;
  • With the parsed sql and the catalog/schema module, DataFusion can generate the logical plan;
  • With the logical plan, the corresponding Interpreter is created and logical plan will be executed by it;
  • For the logical plan of normal INSERT SQL, it will be executed through InsertInterpreter;
  • In the InsertInterpreter, write method of Table provided Analytic Engine is called:
    • Write the data into WAL first;
    • Write the data into MemTable then;
  • Before writing to MemTable, the memory usage will be checked. If the memory usage is too high, the flush process will be triggered:
    • Persist some old MemTables as SSTs;
    • Store updates about the new SSTs and the flushed sequence number of WAL to Manifest;
    • Delete the corresponding WAL entries;
  • Server module responds to the client with the execution result.

4 - Storage

The storage engine mainly provides the following two functions:

  1. Persistence of data
  2. Under the premise of ensuring the correctness of the data, organize the data in the most reasonable way to meet the query needs of different scenarios.

This document will introduce the internal implementation of the storage engine in HoraeDB. Readers can refer to the content here to explore how to use HoraeDB efficiently.

Overall Structure

HoraeDB is a distributed storage system based on the share-nothing architecture.

Data between different servers is isolated from each other and does not affect each other. The storage engine in each stand-alone machine is a variant of log-structured merge-tree, which is optimized for time-series scenarios. The following figure shows its core components:

Write Ahead Log (WAL)

A write request will be written to

  1. memtable in memory
  2. WAL in durable storage

Since memtable is not persisted to the underlying storage system in real time, so WAL is required to ensure the reliability of the data in memtable.

On the other hand, due to the design of the distributed architecture, WAL itself is required to be highly available. Now there are following implementations in HoraeDB:

Memtable

Memtable is a memory data structure used to hold recently written table data. Different tables have its corresponding memtable.

Memtable is read-write by default (aka active), and when the write reaches some threshold, it will become read-only and be replaced by a new memtable.

The read-only memtable will be flushed to the underlying storage system in SST format by background thread. After flush is completed, the read-only memtable can be destroyed, and the corresponding data in WAL can also be deleted.

Sorted String Table(SST)

SST is a persistent format for data, which is stored in the order of primary keys of table. Currently, HoraeDB uses parquet format for this.

For HoraeDB, SST has an important option: segment_duration, only SST within the same segment can be merged, which is benefical for time-series data. And it is also convenient to eliminate expired data.

In addition to storing the original data, the statistical information of the data will also be stored in the SST to speed up the query, such as the maximum value, the minimum value, etc.

Compactor

Compactor can merge multiple small SST files into one, which is used to solve the problem of too many small files. In addition, Compactor will also delete expired data and duplicate data during the compaction. In future, compaction maybe add more task, such as downsample.

The current compaction strategy in HoraeDB reference Cassandra:

Manifest

Manifest records metadata of table, SST file, such as: the minimum and maximum timestamps of the data in an SST.

Due to the design of the distributed architecture, the manifest itself is required to be highly available. Now in HoraeDB, there are mainly the following implementations:

  • WAL
  • ObjectStore

ObjectStore

ObjectStore is place where data (i.e. SST) is persisted.

Generally speaking major cloud vendors should provide corresponding services, such as Alibaba Cloud’s OSS and AWS’s S3.

5 - Table Partitioning

Note: This feature is still in development, and the API may change in the future.

This chapter discusses PartitionTable.

The partition table syntax used by HoraeDB is similar to that of MySQL.

General partition tables include Range Partitioning, List Partitoning, Hash Partitioning, and Key Partititioning.

HoraeDB currently only supports Key Partitioning.

Architecture

Similar to MySQL, different portions of a partition table are stored as separate tables in different locations.

Currently designed, a partition table can be opened on multiple HoraeDB nodes, supports writing and querying at the same time, and can be expanded horizontally.

As shown in the figure below, PartitionTable is opened on node0 and node1, and the physical subtables where the actual data are stored on node2 and node3.

                        ┌───────────────────────┐      ┌───────────────────────┐
                        │Node0                  │      │Node1                  │
                        │   ┌────────────────┐  │      │  ┌────────────────┐   │
                        │   │ PartitionTable │  │      │  │ PartitionTable │   │
                        │   └────────────────┘  │      │  └────────────────┘   │
                        │            │          │      │           │           │
                        └────────────┼──────────┘      └───────────┼───────────┘
                                     │                             │
                                     │                             │
             ┌───────────────────────┼─────────────────────────────┼───────────────────────┐
             │                       │                             │                       │
┌────────────┼───────────────────────┼─────────────┐ ┌─────────────┼───────────────────────┼────────────┐
│Node2       │                       │             │ │Node3        │                       │            │
│            ▼                       ▼             │ │             ▼                       ▼            │
│ ┌─────────────────────┐ ┌─────────────────────┐  │ │  ┌─────────────────────┐ ┌─────────────────────┐ │
│ │                     │ │                     │  │ │  │                     │ │                     │ │
│ │     SubTable_0      │ │     SubTable_1      │  │ │  │     SubTable_2      │ │     SubTable_3      │ │
│ │                     │ │                     │  │ │  │                     │ │                     │ │
│ └─────────────────────┘ └─────────────────────┘  │ │  └─────────────────────┘ └─────────────────────┘ │
│                                                  │ │                                                  │
└──────────────────────────────────────────────────┘ └──────────────────────────────────────────────────┘

Key Partitioning

Key Partitioning supports one or more column calculations, using the hash algorithm provided by HoraeDB for calculations.

Use restrictions:

  • Only tag column is supported as partition key.
  • LINEAR KEY is not supported yet.

The table creation statement for the key partitioning is as follows:

1
2
3
4
5
6
7
CREATE TABLE `demo`(
    `name`string TAG,
    `id` int TAG,
    `value` double NOT NULL,
    `t` timestamp NOT NULL,
    TIMESTAMP KEY(t)
    ) PARTITION BY KEY(name) PARTITIONS 2 ENGINE = Analytic

Refer to MySQL key partitioning.

Query

Since the partition table data is actually stored in different physical tables, it is necessary to calculate the actual requested physical table according to the query request when querying.

The query will calculate the physical table to be queried according to the query parameters, and then remotely request the node where the physical table is located to obtain data through the HoraeDB internal service remote engine (support predicate pushdown).

The implementation of the partition table is in PartitionTableImpl.

  • Step 1: Parse query sql and calculate the physical table to be queried according to the query parameters.
  • Step 2: Query data of physical table.
  • Step 3: Compute with the raw data.
                       │
                     1 │
                       │
                       ▼
               ┌───────────────┐
               │Node0          │
               │               │
               │               │
               └───────────────┘
                       ┬
                2      │       2
        ┌──────────────┴──────────────┐
        │              ▲              │
        │       3      │       3      │
        ▼ ─────────────┴───────────── ▼
┌───────────────┐             ┌───────────────┐
│Node1          │             │Node2          │
│               │             │               │
│               │             │               │
└───────────────┘             └───────────────┘

Key partitioning

  • Filters like and, or, in, = will choose specific SubTables.
  • Fuzzy matching filters like <, > are also supported, but may have poor performance since it will scan all physical tables.

Key partitioning rule is implemented in KeyRule.

Write

The write process is similar to the query process.

First, according to the partition rules, the write request is split into different partitioned physical tables, and then sent to different physical nodes through the remote engine for actual data writing.

6 - Wal

7 - WAL on Disk

Architecture

This section introduces the implementation of a standalone Write-Ahead Log (WAL, hereinafter referred to as “the log”) based on a local disk. In this implementation, the log is managed at the region level.

            ┌────────────────────────────┐
            │          HoraeDB           │
            │                            │
            │ ┌────────────────────────┐ │
            │ │          WAL           │ │         ┌────────────────────────┐
            │ │                        │ │         │                        │
            │ │         ......         │ │         │      File System       │
            │ │                        │ │         │                        │
            │ │ ┌────────────────────┐ │ │ manage  │ ┌────────────────────┐ │
 Write ─────┼─┼─►       Region       ├─┼─┼─────────┼─►     Region Dir     │ │
            │ │ │                    │ │ │         │ │                    │ │
 Read  ─────┼─┼─►   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 0 ├───┼─┼─┼─────────┼─┼─► Segment File 0 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
Delete ─────┼─┼─►   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 1 ├───┼─┼─┼─────────┼─┼─► Segment File 1 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
            │ │ │   ┌────────────┐   │ │ │  mmap   │ │ ┌────────────────┐ │ │
            │ │ │   │  Segment 2 ├───┼─┼─┼─────────┼─┼─► Segment File 2 │ │ │
            │ │ │   └────────────┘   │ │ │         │ │ └────────────────┘ │ │
            │ │ │       ......       │ │ │         │ │       ......       │ │
            │ │ └────────────────────┘ │ │         │ └────────────────────┘ │
            │ │         ......         │ │         │         ......         │
            │ └────────────────────────┘ │         └────────────────────────┘
            └────────────────────────────┘

Data Model

File Paths

Each region has its own directory to manage all segments for that region. The directory is named after the region’s ID. Each segment is named using the format seg_<id>, with IDs starting from 0 and incrementing.

Segment Format

Logs for all tables within a region are stored in segments, arranged in ascending order of sequence numbers. The structure of the segment files is as follows:

   Segment0            Segment1
┌────────────┐      ┌────────────┐
│ Magic Num  │      │ Magic Num  │
├────────────┤      ├────────────┤
│   Record   │      │   Record   │
├────────────┤      ├────────────┤
│   Record   │      │   Record   │
├────────────┤      ├────────────┤   ....
│   Record   │      │   Record   │
├────────────┤      ├────────────┤
│     ...    │      │     ...    │
│            │      │            │
└────────────┘      └────────────┘
    seg_0               seg_1

In memory, each segment stores additional information used for read, write, and delete operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
pub struct Segment {
    /// A hashmap storing both min and max sequence numbers of records within
    /// this segment for each `TableId`.
    table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,

    /// An optional vector of positions within the segment.
    record_position: Vec<Position>,

    ...
}

Log Format

The log format within a segment is as follows:

+---------+--------+------------+--------------+--------------+-------+
| version |  crc   |  table id  | sequence num | value length | value |
|  (u8)   | (u32)  |   (u64)    |    (u64)     |     (u32)    |(bytes)|
+---------+--------+------------+--------------+--------------+-------+

Field Descriptions:

  1. version: Log version number.

  2. crc: Used to ensure data consistency. Computes the CRC checksum from the table id to the end of the record.

  3. table id: The unique identifier of the table.

  4. sequence num: The sequence number of the record.

  5. value length: The byte length of the value.

  6. value: The value in the general log format.

The region ID is not stored in the log because it can be obtained from the file path.

Main Processes

Opening the WAL

  1. Identify all region directories under the WAL directory.

  2. In each region directory, identify all segment files.

  3. Open each segment file, traverse all logs within it, record the start and end offsets of each log, and record the minimum and maximum sequence numbers of each TableId in the segment, then close the file.

  4. If there is no region directory or there are no segment files under the directory, automatically create the corresponding directory and files.

Reading Logs

  1. Based on the metadata of the segments, determine all segments involved in the current read operation.

  2. Open these segments in order of their IDs from smallest to largest, and decode the raw bytes into logs.

Writing Logs

  1. Serialize the logs to be written into byte data and append them to the segment file with the largest ID.

  2. When a segment is created, it pre-allocates a fixed size of 64MB and will not change dynamically. When the pre-allocated space is used up, a new segment is created, and appending continues in the new segment.

  3. After each append, flush is not called immediately; by default, flush is performed every ten writes or when the segment file is closed.

  4. Update the segment’s metadata table_ranges in memory.

Deleting Logs

Suppose logs in the table with ID table_id and sequence numbers less than seq_num need to be marked as deleted:

  1. Update the table_ranges field of the relevant segments in memory, updating the minimum sequence number of the table to seq_num + 1.

  2. If after modification, the minimum sequence number of the table in this segment is greater than the maximum sequence number, remove the table from table_ranges.

  3. If a segment’s table_ranges is empty and it is not the segment with the largest ID, delete the segment file.

8 - WAL on Kafka

Architecture

In this section we present a distributed WAL implementation(based on Kafka). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by region, which can be simply understood as a shared log file of multiple tables.

As shown in the following figure, regions are mapped to topics(with only one partition) in Kafka. And usually two topics are needed by a region, one is used for storing logs and the other is used for storing metadata.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
                                                 ┌──────────────────────────┐
                                                 │         Kafka            │
                                                 │                          │
                                                 │         ......           │
                                                 │                          │
                                                 │ ┌─────────────────────┐  │
                                                 │ │      Meta Topic     │  │
                                                 │ │                     │  │
                                         Delete  │ │ ┌─────────────────┐ │  │
               ┌──────────────────────┐  ┌───────┼─┼─►    Partition    │ │  │
               │       HoraeDB        │  │       │ │ │                 │ │  │
               │                      │  │       │ │ └─────────────────┘ │  │
               │ ┌──────────────────┐ │  │       │ │                     │  │
               │ │       WAL        │ │  │       │ └─────────────────────┘  │
               │ │      ......      │ │  │       │                          │
               │ │ ┌──────────────┐ │ │  │       │ ┌──────────────────────┐ │
               │ │ │    Region    │ │ │  │       │ │     Data Topic       │ │
               │ │ │              ├─┼─┼──┘       │ │                      │ │
               | | | ┌──────────┐ │ │ │          │ │ ┌──────────────────┐ │ │
               │ │ │ │ Metadata │ │ │ │          │ │ │    Partition     │ │ │
               │ │ │ └──────────┘ │ │ │    Write │ │ │                  │ │ │
Write ─────────┼─┼─►              ├─┼─┼───┐      │ │ │ ┌──┬──┬──┬──┬──┐ │ │ │
               │ │ │ ┌──────────┐ │ │ │   └──────┼─┼─┼─►  │  │  │  │  ├─┼─┼─┼────┐
               │ │ │ │  Client  │ │ │ │          │ │ │ └──┴──┴──┴──┴──┘ │ │ │    │
Read ◄─────────┼─┼─┤ └──────────┘ │ │ │          │ │ │                  │ │ │    │
               │ │ │              │ │ │          │ │ └──────────────────┘ │ │    │
               │ │ └──▲───────────┘ │ │          │ │                      │ │    │
               │ │    │ ......      │ │          │ └──────────────────────┘ │    │
               │ └────┼─────────────┘ │          │         ......           │    │
               │      │               │          └──────────────────────────┘    │
               └──────┼───────────────┘                                          │
                      │                                                          │
                      │                                                          │
                      │                        Read                              │
                      └──────────────────────────────────────────────────────────┘

Data Model

Log Format

The common log format described in WAL on RocksDB is used here.

Metadata

Each region will maintain its metadata both in memory and in Kafka, we call it RegionMeta here. It can be thought of as a map, taking table id as a key and TableMeta as a value. We briefly introduce the variables in TableMeta here:

  • next_seq_num, the sequence number allocated to the next log entry.
  • latest_marked_deleted, the last flushed sequence number, all logs in the table with a lower sequence number than it can be removed.
  • current_high_watermark, the high watermark in the Kafka partition after the last writing of this table.
  • seq_offset_mapping, mapping from sequence numbers to offsets will be done on every write and will removed to the updated latest_marked_deleted after flushing.
┌─────────────────────────────────────────┐
│              RegionMeta                 │
│                                         │
│ Map<TableId, TableMeta> table_metas     │
└─────────────────┬───────────────────────┘
                  │
                  │
                  │
                  └─────┐
                        │
                        │
 ┌──────────────────────┴──────────────────────────────┐
 │                       TableMeta                     │
 │                                                     │
 │ SequenceNumber next_seq_num                         │
 │                                                     │
 │ SequenceNumber latest_mark_deleted                  │
 │                                                     │
 │ KafkaOffset high_watermark                          │
 │                                                     │
 │ Map<SequenceNumber, KafkaOffset> seq_offset_mapping │
 └─────────────────────────────────────────────────────┘

Main Process

We focus on the main process in one region, following process will be introduced:

  • Open or create region.
  • Write and read logs.
  • Delete logs.

Open or Create Region

Steps

  • Search the region in the opened namespace.
  • If the region found, the most important thing we need to do is to recover its metadata, we will introduce this later.
  • If the region not found and auto creating is defined, just create the corresponding topic in Kafka.
  • Add the found or created region to cache, return it afterwards.

Recovery

As mentioned above, the RegionMeta is actually a map of the TableMeta. So here we will focus on recovering a specific TableMeta, and examples will be given to better illustrate this process.

  • First, recover the RegionMeta from snapshot. We will take a snapshot of the RegionMeta in some scenarios (e.g. mark logs deleted, clean logs) and put it to the meta topic. The snapshot is actually the RegionMeta at a particular point in time. When recovering a region, we can use it to avoid scanning all logs in the data topic. The following is the example, we recover from the snapshot taken at the time when Kafka high watermark is 64:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
high watermark in snapshot: 64

 ┌──────────────────────────────┐
 │         RegionMeta           │
 │                              │
 │          ......              │
 │ ┌──────────────────────────┐ │
 │ │       TableMeta          │ │
 │ │                          │ │
 │ │ next_seq_num: 5          │ │
 │ │                          │ │
 │ │ latest_mark_deleted: 2   │ │
 │ │                          │ │
 │ │ high_watermark: 32       │ │
 │ │                          │ │
 │ │ seq_offset_mapping:      │ │
 │ │                          │ │
 │ │ (2, 16) (3, 16) (4, 31)  │ │
 │ └──────────────────────────┘ │
 │          ......              │
 └──────────────────────────────┘
  • Recovering from logs. After recovering from snapshot, we can continue to recover by scanning logs in data topic from the Kafka high watermark when snapshot is taken, and obviously that avoid scanning the whole data topic. Let’s see the example:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌────────────────────────────────────┐
│                                    │
│    high_watermark in snapshot: 64  │
│                                    │
│  ┌──────────────────────────────┐  │
│  │         RegionMeta           │  │
│  │                              │  │
│  │          ......              │  │
│  │ ┌──────────────────────────┐ │  │
│  │ │       TableMeta          │ │  │
│  │ │                          │ │  │
│  │ │ next_seq_num: 5          │ │  │                  ┌────────────────────────────────┐
│  │ │                          │ │  │                  │          RegionMeta            │
│  │ │ latest_mark_deleted: 2   │ │  │                  │                                │
│  │ │                          │ │  │                  │            ......              │
│  │ │ high_watermark: 32       │ │  │                  │ ┌────────────────────────────┐ │
│  │ │                          │ │  │                  │ │         TableMeta          │ │
│  │ │ seq_offset_mapping:      │ │  │                  │ │                            │ │
│  │ │                          │ │  │                  │ │ next_seq_num: 8            │ │
│  │ │ (2, 16) (3, 16) (4, 31)  │ │  │                  │ │                            │ │
│  │ └──────────────────────────┘ │  │                  │ │ latest_mark_deleted: 2     │ │
│  │          ......              │  │                  │ │                            │ │
│  └──────────────────────────────┘  ├──────────────────► │ high_watermark: 32         │ │
│                                    │                  │ │                            │ │
│ ┌────────────────────────────────┐ │                  │ │ seq_offset_mapping:        │ │
│ │          Data topic            │ │                  │ │                            │ │
│ │                                │ │                  │ │ (2, 16) (3, 16) (4, 31)    │ │
│ │ ┌────────────────────────────┐ │ │                  │ │                            │ │
│ │ │        Partition           │ │ │                  │ │ (5, 72) (6, 81) (7, 90)    │ │
│ │ │                            │ │ │                  │ │                            │ │
│ │ │ ┌────┬────┬────┬────┬────┐ │ │ │                  │ └────────────────────────────┘ │
│ │ │ │ 64 │ 65 │ ...│ 99 │100 │ │ │ │                  │             ......             │
│ │ │ └────┴────┴────┴────┴────┘ │ │ │                  └────────────────────────────────┘
│ │ │                            │ │ │
│ │ └────────────────────────────┘ │ │
│ │                                │ │
│ └────────────────────────────────┘ │
│                                    │
└────────────────────────────────────┘

Write and Read Logs

The writing and reading process in a region is simple.

For writing:

  • Open the specified region (auto create it if necessary).
  • Put the logs to specified Kafka partition by client.
  • Update next_seq_num, current_high_watermark and seq_offset_mapping in corresponding TableMeta.

For reading:

  • Open the specified region.
  • Just read all the logs of the region, and the split and replay work will be done by the caller.

Delete Logs

Log deletion can be divided into two steps:

  • Mark the logs deleted.
  • Do delayed cleaning work periodically in a background thread.

Mark

  • Update latest_mark_deleted and seq_offset_mapping(just retain the entries whose’s sequence >= updated latest_mark_deleted) in TableMeta.
  • Maybe we need to make and sync the RegionMeta snapshot to Kafka while dropping table.

Clean

The cleaning logic done in a background thread called cleaner:

  • Make RegionMeta snapshot.
  • Decide whether to clean the logs based on the snapshot.
  • If so, sync the snapshot to Kafka first, then clean the logs.

9 - WAL on RocksDB

Architecture

In this section we present a standalone WAL implementation (based on RocksDB). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by table, and we call the corresponding storage data structure TableUnit. All related data (logs or some metadata) is stored in a single column family for simplicity.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
            ┌───────────────────────────────┐
            │         HoraeDB               │
            │                               │
            │ ┌─────────────────────┐       │
            │ │         WAL         │       │
            │ │                     │       │
            │ │        ......       │       │
            │ │                     │       │
            │ │  ┌────────────────┐ │       │
 Write ─────┼─┼──►   TableUnit    │ │Delete │
            │ │  │                │ ◄────── │
 Read  ─────┼─┼──► ┌────────────┐ │ │       │
            │ │  │ │ RocksDBRef │ │ │       │
            │ │  │ └────────────┘ │ │       │
            │ │  │                | |       |
            │ │  └────────────────┘ │       │
            │ │        ......       │       │
            │ └─────────────────────┘       │
            │                               │
            └───────────────────────────────┘

Data Model

Common Log Format

We use the common key and value format here. Here is the defined key format, and the following is introduction for fields in it:

  • namespace: multiple instances of WAL can exist for different purposes (e.g. manifest also needs wal). The namespace is used to distinguish them.
  • region_id: in some WAL implementations we may need to manage logs from multiple tables, region is the concept to describe such a set of table logs. Obviously the region id is the identification of the region.
  • table_id: identification of the table logs to which they belong.
  • sequence_num: each login table can be assigned an identifier, called a sequence number here.
  • version: for compatibility with old and new formats.
1
2
3
+---------------+----------------+-------------------+--------------------+-------------+
| namespace(u8) | region_id(u64) |   table_id(u64)   |  sequence_num(u64) | version(u8) |
+---------------+----------------+-------------------+--------------------+-------------+

Here is the defined value format, version is the same as the key format, payload can be understood as encoded log content.

1
2
3
+-------------+----------+
| version(u8) | payload  |
+-------------+----------+

Metadata

The metadata here is stored in the same key-value format as the log. Actually only the last flushed sequence is stored in this implementation. Here is the defined metadata key format and field instructions:

  • namespace, table_id, version are the same as the log format.
  • key_type, used to define the type of metadata. MaxSeq now defines that metadata of this type will only record the most recently flushed sequence in the table. Because it is only used in wal on RocksDB, which manages the logs at table level, so there is no region id in this key.
1
2
3
+---------------+--------------+----------------+-------------+
| namespace(u8) | key_type(u8) | table_id(u64)  | version(u8) |
+---------------+--------------+----------------+-------------+

Here is the defined metadata value format, as you can see, just the version and max_seq(flushed sequence) in it:

1
2
3
+-------------+--------------+
| version(u8) | max_seq(u64) |
+-------------+--------------+

Main Process

  • Open TableUnit:
    • Read the latest log entry of all tables to recover the next sequence numbers of tables mainly.
    • Scan the metadata to recover next sequence num as a supplement (because some table has just triggered flush and no new written logs after this, so no logs exists now).
  • Write and read logs. Just write and read key-value from RocksDB.
  • Delete logs. For simplicity It will remove corresponding logs synchronously.