Skip to content

Commit b8951a3

Browse files
authored
feat: persist our column_id to parquet field_id (GreptimeTeam#8032)
* feat: persist our column_id to parquet field_id * refactor: avoid clone field when possible * chore: fmt * chore: address style suggestions
1 parent d731024 commit b8951a3

8 files changed

Lines changed: 157 additions & 98 deletions

File tree

src/metric-engine/src/engine/flush.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ mod tests {
136136
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
137137
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
138138
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
139-
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
139+
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
140140
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
141-
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
141+
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
142142
);
143143
// list from storage
144144
let storage_entries = mito

src/mito2/src/engine/basic_test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -865,9 +865,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
865865
#[tokio::test]
866866
async fn test_list_ssts() {
867867
test_list_ssts_with_format(false, r#"
868-
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
869-
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
870-
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
868+
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
869+
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
870+
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
871871
r#"
872872
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
873873
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
@@ -876,9 +876,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
876876
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
877877
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
878878
test_list_ssts_with_format(true, r#"
879-
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
880-
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
881-
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
879+
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
880+
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
881+
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
882882
r#"
883883
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
884884
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }

src/mito2/src/read/batch_adapter.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::error::{
3535
};
3636
use crate::memtable::BoxedBatchIterator;
3737
use crate::read::Batch;
38-
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
38+
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
3939

4040
/// Adapts a [`BoxedBatchIterator`] into an `Iterator<Item = Result<RecordBatch>>`
4141
/// producing flat-format record batches.
@@ -212,38 +212,45 @@ fn compute_output_arrow_schema(
212212
if !read_column_id_set.contains(&column_metadata.column_id) {
213213
continue;
214214
}
215-
let field = Arc::new(Field::new(
215+
let field = Field::new(
216216
&column_metadata.column_schema.name,
217217
column_metadata.column_schema.data_type.as_arrow_type(),
218218
column_metadata.column_schema.is_nullable(),
219-
));
220-
let field = if column_metadata.semantic_type == SemanticType::Tag {
221-
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
219+
);
220+
let field = with_field_id(field, column_metadata.column_id);
221+
222+
if column_metadata.semantic_type == SemanticType::Tag {
223+
fields.push(tag_maybe_to_dictionary_field(
224+
&column_metadata.column_schema.data_type,
225+
&Arc::new(field),
226+
));
222227
} else {
223-
field
224-
};
225-
fields.push(field);
228+
fields.push(Arc::new(field));
229+
}
226230
}
227231

228232
for column_metadata in metadata.field_columns() {
229233
if !read_column_id_set.contains(&column_metadata.column_id) {
230234
continue;
231235
}
232-
let field = Arc::new(Field::new(
236+
let field = Field::new(
233237
&column_metadata.column_schema.name,
234238
column_metadata.column_schema.data_type.as_arrow_type(),
235239
column_metadata.column_schema.is_nullable(),
236-
));
237-
fields.push(field);
240+
);
241+
fields.push(Arc::new(with_field_id(field, column_metadata.column_id)));
238242
}
239243

240244
let time_index = metadata.time_index_column();
241-
let time_index_field = Arc::new(Field::new(
245+
let time_index_field = Field::new(
242246
&time_index.column_schema.name,
243247
time_index.column_schema.data_type.as_arrow_type(),
244248
time_index.column_schema.is_nullable(),
245-
));
246-
fields.push(time_index_field);
249+
);
250+
fields.push(Arc::new(with_field_id(
251+
time_index_field,
252+
time_index.column_id,
253+
)));
247254
fields.extend(internal_fields().iter().cloned());
248255

249256
Arc::new(datatypes::arrow::datatypes::Schema::new(fields))

src/mito2/src/read/compat.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::error::{
4545
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
4646
use crate::sst::parquet::flat_format::primary_key_column_index;
4747
use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
48-
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
48+
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
4949

5050
/// Returns true if `left` and `right` have same columns and primary key encoding.
5151
pub(crate) fn has_same_columns_and_pk_encoding(
@@ -143,12 +143,19 @@ impl FlatCompatBatch {
143143
let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
144144
// For tag columns, we need to create a dictionary field.
145145
if expect_column.semantic_type == SemanticType::Tag {
146-
fields.push(tag_maybe_to_dictionary_field(
146+
let field = tag_maybe_to_dictionary_field(
147147
&expect_column.column_schema.data_type,
148148
column_field,
149-
));
149+
);
150+
fields.push(Arc::new(with_field_id(
151+
(*field).clone(),
152+
expect_column.column_id,
153+
)));
150154
} else {
151-
fields.push(column_field.clone());
155+
fields.push(Arc::new(with_field_id(
156+
(**column_field).clone(),
157+
expect_column.column_id,
158+
)));
152159
};
153160

154161
if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {

src/mito2/src/read/flat_projection.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::sst::parquet::flat_format::sst_column_id_indices;
3737
use crate::sst::parquet::format::FormatProjection;
3838
use crate::sst::{
3939
FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
40+
with_field_id,
4041
};
4142

4243
/// Handles projection and converts batches in flat format with correct schema.
@@ -395,17 +396,20 @@ pub(crate) fn compute_input_arrow_schema(
395396
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
396397
for (column_id, _) in batch_schema {
397398
let column_metadata = metadata.column_by_id(*column_id).unwrap();
398-
let field = Arc::new(Field::new(
399+
let field = Field::new(
399400
&column_metadata.column_schema.name,
400401
column_metadata.column_schema.data_type.as_arrow_type(),
401402
column_metadata.column_schema.is_nullable(),
402-
));
403-
let field = if column_metadata.semantic_type == SemanticType::Tag {
404-
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
403+
);
404+
let field = with_field_id(field, *column_id);
405+
if column_metadata.semantic_type == SemanticType::Tag {
406+
new_fields.push(tag_maybe_to_dictionary_field(
407+
&column_metadata.column_schema.data_type,
408+
&Arc::new(field),
409+
));
405410
} else {
406-
field
407-
};
408-
new_fields.push(field);
411+
new_fields.push(Arc::new(field));
412+
}
409413
}
410414
new_fields.extend_from_slice(&internal_fields());
411415

0 commit comments

Comments
 (0)