refactor(metric-engine): Refactor PendingRowsBatcher for better testability and benchmarking#7902
Conversation
**Refactor and Enhance Error Handling in `pending_rows_batcher.rs`** - **Refactored `record_failure` Macro**: Moved the `record_failure` macro outside of the `flush_batch_physical` function to improve code reuse and maintainability. - **Enhanced Batch Transformation**: Introduced `transform_logical_batches_to_physical` function to handle the transformation of logical table batches into physical format. - **Batch Concatenation**: Added `concat_modified_batches` function to concatenate modified batches into a single batch. - **Region Write Splitting**: Implemented `split_and_encode_region_writes` function to split combined batches into region-specific writes based on partition rules. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Add tests for `transform_logical_batches_to_physical` in `pending_rows_batcher.rs` - Implemented `mock_tag_batch` function to create mock `RecordBatch` instances for testing. - Added multiple test cases for `transform_logical_batches_to_physical`: - `test_transform_logical_batches_to_physical_success`: Verifies successful transformation of logical to physical batches. - `test_transform_logical_batches_to_physical_taxonomy_failure`: Tests failure scenario when column IDs are missing. - `test_transform_logical_batches_to_physical_multiple_batches`: Checks handling of multiple batches. - `test_transform_logical_batches_to_physical_mixed_success_failure`: Tests mixed success and failure scenarios. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
…lity Introduced several traits to abstract dependencies on CatalogManager, PartitionRuleManager, and NodeManager, enabling easier unit testing with mock implementations. - Added `PhysicalFlushCatalogProvider`, `PhysicalFlushPartitionProvider`, and `PhysicalFlushNodeRequester` traits. - Implemented adapters for existing managers to satisfy the new traits. - Refactored `flush_batch_physical` to use these traits instead of concrete manager references. - Modularized region write planning, resolution, and encoding into standalone functions. - Added comprehensive unit tests for the refactored logic, including edge cases for table lookup and region routing. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
### Enhance Error Handling and Simplify Code in `error.rs` and `pending_rows_batcher.rs` - **Error Handling Improvements**: - Added new error variants `Partition` and `MetricEngine` in `error.rs` to handle specific error cases. - Updated error propagation using `ResultExt` and `context` for better error messages and handling in `pending_rows_batcher.rs`. - **Code Simplification**: - Removed `FlushWriteResult` enum and refactored `flush_region_writes_concurrently` to return `Result<()>`. - Simplified error handling in `flush_batch_physical` and related functions by removing `first_error` and using `Result` for error propagation. - **Test Adjustments**: - Updated tests to align with the new error handling approach, ensuring they check for specific error messages and conditions. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
…r state management Refactored `PendingBatch` in `pending_rows_batcher.rs` to use `Option<PendingBatch>` within the worker loop. This change simplifies initialization and cleanup logic by leveraging `Option::get_or_insert_with` and `Option::take`. - Updated `PendingBatch` fields `created_at` and `ctx` to be non-optional. - Modified `drain_batch` to take `&mut Option<PendingBatch>` and return the drained batch, removing the need for `flush_with_error`. - Simplified the worker loop logic for batch creation and flushing. - Added a unit test `test_drain_batch_takes_initialized_pending_batch_from_option` to verify the new draining logic. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Enhanced error reporting in `PendingRowsBatcher` by using `Arc<Error>` in `FlushWaiter` and `WorkerCommand`. This allows the same error instance to be shared among all waiters of a batch, avoiding redundant error string conversions and providing more structured error information. - Added `SubmitBatch` variant to `Error` in `error.rs`. - Updated `FlushWaiter` and `WorkerCommand` to use `std::result::Result<(), Arc<Error>>`. - Refactored `notify_waiters` to distribute the shared `Arc<Error>`. - Added `SubmitBatchSnafu` context when receiving results from the worker. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Exported several internal types and traits from `pending_rows_batcher.rs` to enable external benchmarking of the physical batch flushing logic. - Made `PhysicalTableMetadata`, `PhysicalFlushCatalogProvider`, `PhysicalFlushPartitionProvider`, `PhysicalFlushNodeRequester`, `TableBatch`, and `flush_batch_physical` public. - Added a new criterion benchmark `flush_batch_physical.rs` to measure the performance of physical batch flushing with varying numbers of logical tables and rows per table. - Registered the new benchmark in `src/servers/Cargo.toml`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the flush_batch_physical logic within the pending_rows_batcher to improve modularity and testability through the introduction of provider traits, accompanied by a new benchmark and expanded unit tests. Feedback indicates that the shift to a fail-fast error handling approach using the ? operator and try_join_all significantly reduces ingestion resilience and may lead to inaccurate metrics for dropped rows during partial failures. Additionally, a suggestion was made to optimize memory allocation when transforming logical batches into physical format.
There was a problem hiding this comment.
Pull request overview
This PR refactors the Prometheus remote write PendingRowsBatcher flush path to make the physical-table flush logic more modular (trait-based adapters), easier to unit test, and externally benchmarkable.
Changes:
- Refactors physical flush into smaller steps and introduces trait-based dependency injection for catalog/partition/node interactions.
- Changes flush result propagation to share a single
Arc<Error>across all waiters and adds new error variants for better typed propagation. - Adds unit tests for the new planning/transform/drain helpers and introduces a Criterion benchmark for
flush_batch_physical.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| src/servers/src/pending_rows_batcher.rs | Major refactor of batching/flush workflow; introduces mockable traits and exports flush_batch_physical; adds new unit tests. |
| src/servers/src/error.rs | Adds new error variants (Partition, MetricEngine, SubmitBatch) to support new propagation paths. |
| src/servers/Cargo.toml | Registers the new Criterion benchmark target. |
| src/servers/benches/flush_batch_physical.rs | Adds a new benchmark that exercises flush_batch_physical under varying table/row loads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 92f211fd3b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Refactored error handling in `pending_rows_batcher.rs` by using `ArrowSnafu` for RecordBatch projection errors and simplified partition rule fetching. Added comprehensive documentation for `flush_batch_physical` and updated error display for `SubmitBatch`. - Added `Location` to `Arrow` error variant for better traceability. - Updated `SubmitBatch` display to include source error. - Replaced manual error mapping with `context(error::ArrowSnafu)` in `strip_partition_columns_from_batch`. - Added doc comments to `flush_batch_physical` outlining the pipeline steps. - Optimized capacity allocation in `transform_logical_batches_to_physical`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
…ed batch Renamed `name_to_ids` to `col_name_to_ids` in `PhysicalTableMetadata` to better reflect its purpose. Refactored `PlannedRegionBatch` to use a `num_rows()` method instead of storing a redundant `row_count` field. - Updated `PhysicalTableMetadata` and its usages in `pending_rows_batcher.rs` and benchmarks. - Removed `row_count` field from `PlannedRegionBatch` and added a `num_rows()` helper. - Cleaned up manual `with_context` closures for table lookups. - Fixed a minor formatting issue in worker command processing. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Removed redundant `row_count` fields from `FlushRegionWrite` and `PlannedRegionBatch` (made the helper method test-only). Centralized the incrementing of `FLUSH_TOTAL` and `FLUSH_ROWS` metrics into `flush_batch` to avoid duplication and ensure consistency. - Removed `row_count` from `FlushRegionWrite` and `PlannedRegionBatch`. - Marked `PlannedRegionBatch::num_rows()` as `#[cfg(test)]`. - Updated `flush_batch` to handle `FLUSH_TOTAL` and `FLUSH_ROWS` metrics. - Simplified concurrent and sequential flush logic by removing local metric updates. - Cleaned up related tests to match the structural changes. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
This PR refactors the
PendingRowsBatcherto improve its maintainability, testability, and performance tracking.Key Changes:
Arc<Error>to share error instances across all waiters of a batch, reducing redundant error string conversions and providing more structured error information.Snafucontext for better diagnostics in the write path.PendingBatchto useOption<PendingBatch>within the worker loop, simplifying initialization and draining logic using idiomaticOptionmethods.flush_batch_physicalto be more testable and exported it along with related internal types (PhysicalTableMetadata,TableBatch, etc.) to allow external benchmarking.flush_batch_physicalto measure the performance of physical batch flushing under various loads (number of logical tables and rows).transform_logical_batches_to_physicalanddrain_batch.PR Checklist
Please convert it to a draft if some of the following conditions are not met.