Rust
安装
1
| cargo add ceresdb-client
|
你可以在这里找到最新的版本 here.
初始化客户端
首先,我们需要初始化客户端。
- 创建客户端的 builder,你必须设置
endpoint
和 mode
:endpoint
是类似 “ip/domain_name:port” 形式的字符串。mode
用于指定访问 HoraeDB 服务器的方式,关于 mode 的详细信息。
1
| let mut builder = Builder::new("ip/domain_name:port", Mode::Direct/Mode::Proxy);
|
- 创建和设置
rpc_config
,可以按需进行定义或者直接使用默认值,更多详细参数请参考这里:
1
2
3
4
5
6
| let rpc_config = RpcConfig {
thread_num: Some(1),
default_write_timeout: Duration::from_millis(1000),
..Default::default()
};
let builder = builder.rpc_config(rpc_config);
|
- 设置
default_database
,这会在执行 RPC 请求时未通过 RpcContext 设置 database 的情况下,将被作为目标 database 使用。
1
| let builder = builder.default_database("public");
|
1
| let client = builder.build();
|
管理表
为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。
当然你也可以通过 create table
语句来更精细化的管理的表(比如添加索引等)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| let create_table_sql = r#"CREATE TABLE IF NOT EXISTS horaedb (
str_tag string TAG,
int_tag int32 TAG,
var_tag varbinary TAG,
str_field string,
int_field int32,
bin_field varbinary,
t timestamp NOT NULL,
TIMESTAMP KEY(t)) ENGINE=Analytic with
(enable_ttl='false')"#;
let req = SqlQueryRequest {
tables: vec!["horaedb".to_string()],
sql: create_table_sql.to_string(),
};
let resp = client
.sql_query(rpc_ctx, &req)
.await
.expect("Should succeed to create table");
|
1
2
3
4
5
6
7
8
9
10
| let drop_table_sql = "DROP TABLE horaedb";
let req = SqlQueryRequest {
tables: vec!["horaedb".to_string()],
sql: drop_table_sql.to_string(),
};
let resp = client
.sql_query(rpc_ctx, &req)
.await
.expect("Should succeed to create table");
|
写入数据
我们支持使用类似 InfluxDB 的时序数据模型进行写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| let test_table = "horaedb";
let ts = Local::now().timestamp_millis();
let point = PointBuilder::new(test_table.to_string())
.timestamp(ts)
.tag("str_tag".to_string(), Value::String("tag_val".to_string()))
.tag("int_tag".to_string(), Value::Int32(42))
.tag(
"var_tag".to_string(),
Value::Varbinary(b"tag_bin_val".to_vec()),
)
.field(
"str_field".to_string(),
Value::String("field_val".to_string()),
)
.field("int_field".to_string(), Value::Int32(42))
.field(
"bin_field".to_string(),
Value::Varbinary(b"field_bin_val".to_vec()),
)
.build()
.unwrap();
|
- 将
point
添加到 write request
中:
1
2
| let mut write_req = WriteRequest::default();
write_req.add_point(point);
|
- 创建
rpc_ctx
,同样地可以按需设置或者使用默认值,rpc_ctx
的详细信息请参考这里:
1
2
3
4
| let rpc_ctx = RpcContext {
database: Some("public".to_string()),
..Default::default()
};
|
1
2
3
4
5
| let rpc_ctx = RpcContext {
database: Some("public".to_string()),
..Default::default()
};
let resp = client.write(rpc_ctx, &write_req).await.expect("Should success to write");
|
Sql query
我们支持使用 sql 进行数据查询。
- 在
sql query request
中指定相关的表和 sql 语句:
1
2
3
4
| let req = SqlQueryRequest {
tables: vec![table name 1,...,table name n],
sql: sql string (e.g. select * from xxx),
};
|
1
| let resp = client.sql_query(rpc_ctx, &req).await.expect("Should success to write");
|
示例
你可以在本项目的仓库中找到完整的例子。