Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
db2bf3b
feat: switch partition tree to bulk
evenyag May 6, 2026
b3354ac
chore: keep partition tree memtable for migration test
evenyag May 7, 2026
c0ee6be
chore: drop partition tree memtable from runtime
evenyag May 7, 2026
e065e6e
refactor(mito2): move timestamp_array_to_i64_slice into read module
evenyag May 7, 2026
592b183
refactor(mito2): use TimeSeriesMemtableBuilder in time_partition tests
evenyag May 7, 2026
f1e409d
chore(mito2): delete PartitionTreeMemtable implementation
evenyag May 7, 2026
dc08e2e
refactor(mito-codec): drop skip_partition_column parameter
evenyag May 7, 2026
f5cd847
chore(mito2): remove unused MemtableConfig enum
evenyag May 7, 2026
6da5b6b
chore: fmt code
evenyag May 7, 2026
21a05cd
refactor: remove unused variant
evenyag May 8, 2026
f7d352c
test: update test_config_api
evenyag May 8, 2026
1628b14
fix: remove unused memtable test helpers
evenyag May 11, 2026
c5ea10c
chore: address review comment
evenyag May 11, 2026
c1ef078
fix: support bulk memtable options
evenyag May 11, 2026
db4d01a
fix: sanitize config
evenyag May 12, 2026
83bde33
feat: remove partition tree options from region options
evenyag May 12, 2026
95e4adc
test: make ssts test datetime replaced text stable
evenyag May 12, 2026
920cce3
test: update sqlness result
evenyag May 13, 2026
e8d2c61
chore: validate_enum_options consider bulk memtable
evenyag May 13, 2026
c1daeca
Merge branch 'main' into chore/remove-partitiontree
evenyag May 14, 2026
ae0f828
chore: Merge branch 'main' into chore/remove-partitiontree
evenyag May 14, 2026
993f69e
refactor: pass region id when parsing region options
evenyag May 14, 2026
718f81c
fix: align sst_format with bulk memtable on parse and open
evenyag May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.sparse_primary_key_encoding` | Bool | `true` | Whether to use sparse primary key encoding. |
Expand Down Expand Up @@ -585,11 +580,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.gc` | -- | -- | -- |
| `region_engine.mito.gc.enable` | Bool | `false` | Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur |
| `region_engine.mito.gc.lingering_time` | String | `1m` | Lingering time before deleting files.<br/>Should be long enough to allow long running queries to finish.<br/>If set to None, then unused files will be deleted immediately. |
Expand Down
18 changes: 0 additions & 18 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -646,24 +646,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
## - `partition_tree`: partition tree memtable (experimental)
type = "time_series"

## The max number of keys in one shard.
## Only available for `partition_tree` memtable.
index_max_keys_per_shard = 8192

## The max rows of data inside the actively writing buffer in one shard.
## Only available for `partition_tree` memtable.
data_freeze_threshold = 32768

## Max dictionary bytes.
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[region_engine.mito.gc]
## Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur
enable = false
Expand Down
18 changes: 0 additions & 18 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -762,24 +762,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
## - `partition_tree`: partition tree memtable (experimental)
type = "time_series"

## The max number of keys in one shard.
## Only available for `partition_tree` memtable.
index_max_keys_per_shard = 8192

## The max rows of data inside the actively writing buffer in one shard.
## Only available for `partition_tree` memtable.
data_freeze_threshold = 32768

## Max dictionary bytes.
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[[region_engine]]
## Enable the file engine.
[region_engine.file]
Expand Down
269 changes: 6 additions & 263 deletions src/metric-engine/src/engine/bulk_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,14 @@

use std::collections::HashSet;

use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
use api::v1::{ArrowIpc, SemanticType};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::prelude::{greptime_timestamp, greptime_value};
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::record_batch::RecordBatch;
use snafu::{OptionExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{
AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
};
use store_api::region_request::{AffectedRows, RegionBulkInsertsRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
Expand All @@ -42,8 +36,7 @@ impl MetricEngineInner {
/// **Logical region path:** The request payload is a logical `RecordBatch`
/// (timestamp, value and tag columns). It is transformed to physical format
/// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
/// `BulkInserts` request to the data region. If mito reports
/// `StatusCode::Unsupported`, the request is transparently retried as a `Put`.
/// `BulkInserts` request to the data region.
///
/// **Physical region path:** The request payload is already in physical format
/// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
Expand Down Expand Up @@ -134,27 +127,9 @@ impl MetricEngineInner {
},
partition_expr_version,
};
match self
.data_region
self.data_region
.write_data(data_region_id, RegionRequest::BulkInserts(request))
.await
{
Ok(affected_rows) => Ok(affected_rows),
Err(err) if err.status_code() == StatusCode::Unsupported => {
// todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
let rows = record_batch_to_rows(&batch, region_id)?;
self.put_region(
region_id,
RegionPutRequest {
rows,
hint: None,
partition_expr_version,
},
)
.await
}
Err(err) => Err(err),
}
}

fn resolve_tag_columns_from_metadata(
Expand Down Expand Up @@ -214,174 +189,6 @@ impl MetricEngineInner {
}
}

fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
let schema_ref = batch.schema();
let fields = schema_ref.fields();

let mut ts_idx = None;
let mut val_idx = None;
let mut tag_indices = Vec::new();

for (idx, field) in fields.iter().enumerate() {
if field.name() == greptime_timestamp() {
ts_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
_
)
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else if field.name() == greptime_value() {
val_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Float64
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else {
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Utf8
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Tag column '{}' in region {:?} must be Utf8, found: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
tag_indices.push(idx);
}
}

let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' not found in RecordBatch for region {:?}",
greptime_timestamp(),
logical_region_id
),
})?;
let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' not found in RecordBatch for region {:?}",
greptime_value(),
logical_region_id
),
})?;

let mut schema = Vec::with_capacity(2 + tag_indices.len());
schema.push(api::v1::ColumnSchema {
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
});
schema.push(api::v1::ColumnSchema {
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
});
for &idx in &tag_indices {
let field = &fields[idx];
schema.push(api::v1::ColumnSchema {
column_name: field.name().clone(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
options: None,
});
}

let ts_array = batch
.column(ts_idx)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("validated as TimestampMillisecond");
let val_array = batch
.column(val_idx)
.as_any()
.downcast_ref::<Float64Array>()
.expect("validated as Float64");
let tag_arrays: Vec<&StringArray> = tag_indices
.iter()
.map(|&idx| {
batch
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.expect("validated as Utf8")
})
.collect();

let num_rows = batch.num_rows();
let mut rows = Vec::with_capacity(num_rows);
for row_idx in 0..num_rows {
let mut values = Vec::with_capacity(2 + tag_arrays.len());

if ts_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
ts_array.value(row_idx),
)),
});
}

if val_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(
val_array.value(row_idx),
)),
});
}

for arr in &tag_arrays {
if arr.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(
arr.value(row_idx).to_string(),
)),
});
}
}

rows.push(api::v1::Row { values });
}

Ok(api::v1::Rows { schema, rows })
}

fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
let mut encoder = FlightEncoder::default();
let schema = encoder.encode_schema(record_batch.schema().as_ref());
Expand Down Expand Up @@ -422,7 +229,7 @@ mod tests {
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use mito2::config::MitoConfig;
use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
use store_api::metric_engine_consts::PRIMARY_KEY_ENCODING;
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
Expand Down Expand Up @@ -483,10 +290,7 @@ mod tests {
env.create_physical_region(
physical_region_id,
&TestEnv::default_table_dir(),
vec![(
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
"dense".to_string(),
)],
vec![(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string())],
)
.await;

Expand Down Expand Up @@ -810,65 +614,4 @@ mod tests {

assert_eq!(put_output, bulk_output);
}

#[test]
fn test_record_batch_to_rows_with_null_values() {
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::storage::RegionId;

use crate::engine::bulk_insert::record_batch_to_rows;

let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("job", DataType::Utf8, true),
Field::new("host", DataType::Utf8, true),
]));

let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(ts_array),
Arc::new(val_array),
Arc::new(job_array),
Arc::new(host_array),
],
)
.unwrap();

let region_id = RegionId::new(1, 1);
let rows = record_batch_to_rows(&batch, region_id).unwrap();

assert_eq!(rows.rows.len(), 3);
assert_eq!(rows.schema.len(), 4);

// Row 0: all non-null except host
assert!(rows.rows[0].values[0].value_data.is_some());
assert!(rows.rows[0].values[1].value_data.is_some());
assert!(rows.rows[0].values[2].value_data.is_some());
assert!(rows.rows[0].values[3].value_data.is_none());

// Row 1: null timestamp, null job
assert!(rows.rows[1].values[0].value_data.is_none());
assert!(rows.rows[1].values[1].value_data.is_some());
assert!(rows.rows[1].values[2].value_data.is_none());
assert!(rows.rows[1].values[3].value_data.is_some());

// Row 2: null value
assert!(rows.rows[2].values[0].value_data.is_some());
assert!(rows.rows[2].values[1].value_data.is_none());
assert!(rows.rows[2].values[2].value_data.is_some());
assert!(rows.rows[2].values[3].value_data.is_some());
}
}
Loading
Loading