feat: auto-align Prometheus schemas in pending rows batching#7877
feat: auto-align Prometheus schemas in pending rows batching#7877evenyag merged 39 commits intoGreptimeTeam:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements physical table batching in the PendingRowsBatcher to optimize Prometheus remote writes by reducing RPC counts and partition lookup overhead. Key changes include the introduction of the PendingRowsSchemaAlterer trait, a more efficient row-to-batch alignment process in prom_row_builder.rs, and a new physical flush path that merges logical table data into a single BulkInsertRequest per physical region. The PR also refactors error handling in the frontend and exposes necessary utilities from the metric-engine crate. Reviewers suggested replacing unreachable! macros with robust error handling in the table creation logic to improve service reliability.
There was a problem hiding this comment.
Pull request overview
This PR refactors the Prometheus remote-write batching/flush path to reduce redundant table/partition work by aligning logical-table schemas earlier (during batching), then flushing by physical table with improved observability and updated metric-engine support for physical bulk-insert passthrough.
Changes:
- Introduces
prom_row_builderutilities to align protoRowsdirectly intoRecordBatches matching target table schemas (rename/reorder/cast/null-fill). - Refactors
PendingRowsBatcheringest planning (table create/alter batch ops) and flush logic to transform logical batches into physical format and write by physical regions. - Adds new flush-stage metrics and updates metric-engine bulk insert to accept already-physical batches.
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/servers/src/prom_row_builder.rs | New helpers to convert proto rows to aligned Arrow RecordBatches and derive create-table schemas/missing columns. |
| src/servers/src/pipeline.rs | Simplifies catalog error propagation by using ? instead of wrapping with CatalogSnafu. |
| src/servers/src/pending_rows_batcher.rs | Major batching/flush refactor: table resolution planning, batch DDL hooks, physical-table flush pipeline, new flush-stage timing. |
| src/servers/src/metrics.rs | Adds PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED histogram for flush-stage timing. |
| src/servers/src/lib.rs | Exposes new prom_row_builder module internally. |
| src/servers/src/http/prometheus.rs | Simplifies catalog error propagation by using ?. |
| src/servers/src/http/event.rs | Simplifies catalog error propagation by using ?. |
| src/servers/src/error.rs | Makes Catalog errors transparent and adds a DataTypes error variant. |
| src/servers/Cargo.toml | Adds metric-engine dependency for physical-batch transformation utilities. |
| src/metric-engine/src/test_util.rs | Adds TestEnv::with_mito_config helper for tests needing custom mito config. |
| src/metric-engine/src/lib.rs | Makes batch_modifier public for reuse by servers layer. |
| src/metric-engine/src/engine/bulk_insert.rs | Adds physical-region bulk-insert passthrough and timing metrics; expands tests. |
| src/metric-engine/src/batch_modifier.rs | Exposes TagColumnInfo, compute_tsid_array, and modify_batch_sparse publicly. |
| src/frontend/src/server.rs | Wires new PendingRowsBatcher ctor params (metric-engine flag + schema alterer instance). |
| src/frontend/src/instance/prom_store.rs | Implements PendingRowsSchemaAlterer in Instance, adds prom-remote-write create/alter batching and physical table creation. |
| src/frontend/src/instance/jaeger.rs | Simplifies catalog error propagation by using ?. |
| src/frontend/src/instance/influxdb.rs | Simplifies catalog error propagation by using ?. |
| src/frontend/src/instance/dashboard.rs | Simplifies catalog error propagation by using ?. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f1330df to
e1794b3
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d106f4be81
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- **Error Handling Improvements**: - Removed `CatalogSnafu` context from various `.await` calls in `dashboard.rs`, `influxdb.rs`, `jaeger.rs`, `prometheus.rs`, `event.rs`, and `pipeline.rs` to streamline error handling. - **Prometheus Store Enhancements**: - Added support for auto-creating tables and adding missing Prometheus tag columns in `prom_store.rs` and `pending_rows_batcher.rs`. - Introduced `PendingRowsSchemaAlterer` trait for schema alterations in `pending_rows_batcher.rs`. - **Test Additions**: - Added tests for new Prometheus store functionalities in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Message Improvements**: - Enhanced error messages for catalog access in `error.rs`. - **Server Configuration Updates**: - Updated server configuration to include Prometheus store options in `server.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Add DataTypes Error Handling and Column Renaming Logic - **`error.rs`**: Introduced a new `DataTypes` error variant to handle errors from `datatypes::error::Error`. Updated `ErrorExt` implementation to include `DataTypes`. - **`pending_rows_batcher.rs`**: Added functions `find_prom_special_column_names` and `rename_prom_special_columns_for_existing_schema` to handle renaming of special Prometheus columns. Updated `build_prom_create_table_schema` to simplify error handling with `ConcreteDataType`. - **Tests**: Added a test case `test_rename_prom_special_columns_for_existing_schema` to verify the renaming logic for Prometheus special columns. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- Refactored `PendingRowsBatcher` to accommodate Prometheus record batches: - Introduced `accommodate_record_batch_for_target_schema` to normalize incoming record batches against existing table schemas. - Removed `collect_missing_prom_tag_columns` and `rename_prom_special_columns_for_existing_schema` in favor of the new function. - Added `unzip_logical_region_schema` to extract schema components. - Updated tests in `pending_rows_batcher.rs`: - Added tests for `accommodate_record_batch_for_target_schema` to verify handling of missing tag columns and renaming of special columns. - Ensured error handling for missing timestamp and field columns in target schema. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Commit Summary - **Enhancement in Table Creation Logic**: Updated `prom_store.rs` to modify the handling of `table_options` during table creation. Specifically, `table_options` are now extended differently based on the `AutoCreateTableType`. For `Physical` tables, enforced `sst_format=flat` to optimize pending-rows writes by leveraging bulk memtables. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Enhance Performance Monitoring in `pending_rows_batcher.rs` - Added performance monitoring timers to various stages of the `PendingRowsBatcher` process, including schema cache checks, table resolution, schema creation, and record batch alignment. - Improved schema handling by adding timers around schema alteration and missing column addition processes. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Enhance Concurrent Write Handling**: Introduced `FlushRegionWrite` and `FlushWriteResult` structs to manage region writes and their results. Added `flush_region_writes_concurrently` function to handle concurrent flushing of region writes based on `should_dispatch_concurrently` logic in `pending_rows_batcher.rs`. - **Testing Enhancements**: Added tests for concurrent dispatching of region writes and the logic for determining concurrent dispatch in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Add Histogram for Flush Stage Elapsed Time - **`metrics.rs`**: Introduced a new `HistogramVec` named `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to track the elapsed time of pending rows batch flush stages. - **`pending_rows_batcher.rs`**: Replaced instances of `PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED` with `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to measure the elapsed time for various flush stages, including `flush_write_region`, `flush_concat_table_batches`, `flush_resolve_table`, `flush_fetch_partition_rule`, `flush_split_record_batch`, `flush_filter_record_batch`, `flush_resolve_region_leader`, and `flush_encode_ipc`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
**Refactor `flush_batch` and `flush_batch_physical` functions** - Removed unused `catalog` and `schema` variables from `flush_batch` in `pending_rows_batcher.rs`. - Updated `flush_batch_physical` to directly use `ctx.current_catalog()` and `ctx.current_schema()` for resolving table names. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Remove Unused Function and Associated Test - **File:** `src/servers/src/prom_row_builder.rs` - Removed the unused function `build_prom_create_table_schema` which was responsible for building a `Vec<ColumnSchema>` from an Arrow schema. - Deleted the associated test `test_build_prom_create_table_schema_from_request_schema` that validated the removed function. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Remove Test**: Deleted the `test_bulk_insert_records_elapsed_metric` test from `bulk_insert.rs`. - **Refactor Table Resolution**: Introduced `TableResolutionPlan` struct and refactored table resolution logic in `pending_rows_batcher.rs`. - **Enhance Table Handling**: Added functions for collecting non-empty table rows, unique table schemas, and handling table creation and alteration in `pending_rows_batcher.rs`. - **Add Tests**: Implemented tests for `collect_non_empty_table_rows` and `collect_unique_table_schemas` in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Refactor Error Handling**: Updated error handling in `pending_rows_batcher.rs` and `prom_row_builder.rs` to use `Snafu` error context for more descriptive error messages. - **Remove Unused Functionality**: Eliminated the `rows_to_record_batch` function and related test in `prom_row_builder.rs` as it was redundant. - **Simplify Function Return Types**: Modified `rows_to_aligned_record_batch` in `prom_row_builder.rs` to return only `RecordBatch` without missing columns, simplifying the function's interface and related tests. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Add Helper Function for Table Options in `prom_store.rs` - Introduced `fill_metric_physical_table_options` function to encapsulate logic for setting table options, ensuring the use of flat SST format and physical table metadata. - Updated `Instance` implementation to utilize the new helper function for setting table options. - Added a unit test `test_metric_physical_table_options_forces_flat_sst_format` to verify the correct application of table options. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Refactor `PendingRowsBatcher`**: Simplified worker retrieval logic in `get_or_spawn_worker` method by using a more concise conditional check. - **Metrics Update**: Added `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` metric in `pending_rows_batcher.rs`. - **Remove Unused Code**: Deleted multiple test functions related to record batch alignment and schema preparation in `pending_rows_batcher.rs` and `prom_row_builder.rs`. - **Function Visibility Change**: Made `build_prom_create_table_schema_from_proto` public in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Refactor and Simplify Schema Alteration Logic - **Removed Unused Methods**: Deleted `create_table_if_missing` and `add_missing_prom_tag_columns` methods from `PendingRowsSchemaAlterer` trait in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Handling Improvement**: Enhanced error handling in `create_tables_if_missing_batch` method to return a specific error message for unsupported `AutoCreateTableType` in `prom_store.rs`. - **Visibility Change**: Made `as_str` method public in `AutoCreateTableType` enum in `insert.rs` to support external access. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Commit Message Improve safety in `prom_row_builder.rs` - Updated `unzip_logical_region_schema` to use `saturating_sub` for safer capacity calculation of `tag_columns`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Add TODO comments for future improvements in `pending_rows_batcher.rs` - Added a TODO comment to consider bounding the `flush_region_writes_concurrently` function. - Added a TODO comment to potentially limit the maximum rows to concatenate in the `flush_batch_physical` function. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Commit Message Enhance error handling in `pending_rows_batcher.rs` - Updated `collect_unique_table_schemas` to return a `Result` type, enabling error handling for duplicate table names. - Modified the function to return an error when duplicate table names are found in `table_rows`. - Adjusted test cases to handle the new `Result` return type in `collect_unique_table_schemas`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Refactor `partition_columns` Method**: Updated the `partition_columns` method in `multi_dim.rs`, `partition.rs`, and `splitter.rs` to return a slice reference instead of a cloned vector, improving performance by avoiding unnecessary cloning. - **Enhance Partition Handling**: Added functions `collect_tag_columns_and_non_tag_indices` and `strip_partition_columns_from_batch` in `pending_rows_batcher.rs` to manage partition columns more efficiently, including stripping partition columns from record batches. - **Update Tests**: Modified existing tests and added new ones in `pending_rows_batcher.rs` to verify the functionality of partition column handling, ensuring correct behavior of the new methods. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Enhance Schema Handling and Validation in `pending_rows_batcher.rs` - **Schema Validation Enhancements**: - Added checks for essential columns (`timestamp`, `value`) in `collect_tag_columns_and_non_tag_indices`. - Introduced `PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT` to ensure minimum column count in `strip_partition_columns_from_batch`. - Improved error handling for unexpected data types and duplicated columns. - **Function Modifications**: - Updated `strip_partition_columns_from_batch` to project essential columns without lookup. - Modified `flush_batch_physical` to use `essential_col_indices` instead of `non_tag_indices`. - **Test Enhancements**: - Added tests for schema validation, including checks for unexpected data types and duplicated columns. - Verified correct projection of essential columns in `strip_partition_columns_from_batch`. Files affected: `pending_rows_batcher.rs`, `tests`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
- **Add `smallvec` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `smallvec` as a workspace dependency. - **Refactor Function**: Renamed `collect_tag_columns_and_non_tag_indices` to `columns_taxonomy` in `pending_rows_batcher.rs` and updated its return type to use `SmallVec`. - **Update Tests**: Modified test cases in `pending_rows_batcher.rs` to reflect changes in function name and return type. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
**Refactor `pending_rows_batcher.rs` to Simplify Table ID Handling** - Updated `TableBatch` struct to use `TableId` directly instead of `Option<u32>` for `table_id`. - Simplified logic in `flush_batch_physical` by removing the check for `None` in `table_id`. - Adjusted related logic in `start_worker` to accommodate the change in `table_id` handling. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Enhance Batch Processing Logic - **`pending_rows_batcher.rs`**: - Moved column taxonomy resolution inside the loop to handle schema variations across batches. - Added checks to skip processing if both tag columns and essential column indices are empty. - **Tests**: - Added `test_modify_batch_sparse_with_taxonomy_per_batch` to verify batch modification logic with varying schemas. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Remove Primary Key Column Check in `pending_rows_batcher.rs` - Removed the check for the primary key column and other essential column names in the function `strip_partition_columns_from_batch` within `pending_rows_batcher.rs`. - Simplified the logic by eliminating the validation of column order against expected essential names. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Refactor error handling and iteration in `otlp.rs` and `pending_rows_batcher.rs` - **`otlp.rs`**: Simplified error handling by removing `CatalogSnafu` context when awaiting table retrieval. - **`pending_rows_batcher.rs`**: Streamlined iteration over tables by removing unnecessary `into_iter()` calls, improving code readability and efficiency. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
ab0d309 to
b2d9923
Compare
Add timing metrics for batch processing in `pending_rows_batcher.rs` - Introduced `modify_elapsed` and `columns_taxonomy_elapsed` to measure time spent in `modify_batch_sparse` and `columns_taxonomy` functions. - Updated `flush_batch_physical` to record these metrics using `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Commit Summary - **Remove Unused Code**: Eliminated the `#[allow(dead_code)]` attribute from the `compute_tsid_array` function in `batch_modifier.rs`. - **Error Handling Improvement**: Enhanced error handling in `flush_batch_physical` function by adjusting the `match` block in `pending_rows_batcher.rs`. - **Simplify Logic**: Streamlined the logic in `rows_to_aligned_record_batch` by removing unnecessary type casting in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
**Refactor `flush_batch_physical` in `pending_rows_batcher.rs`:** - Moved partition column stripping logic to a single location before processing region batches. - Updated the use of `combined_batch` to `stripped_batch` for consistency in batch processing. - Removed redundant partition column stripping logic within the region batch loop. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Update `batch_modifier.rs` Documentation and Parameter Naming - Enhanced documentation for `compute_tsid_array` and `modify_batch_sparse` functions to clarify their logic and parameters. - Renamed parameter `non_tag_column_indices` to `extra_column_indices` in `modify_batch_sparse` for better clarity. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
This PR improves the Prometheus ingest batching path by aligning logical-table schemas to physical-table expectations in
PendingRowsBatcher, reducing redundant table/partition work, and consolidating flush behavior for better throughput and simpler write flow.Key changes:
prom_row_builderand related batching helpers, with clearer error handling.Compatibility intent:
Performance
With batching mode enabled, we can expect about 2x performance boost with 4 regions.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.