feat(flow): use batching mode&fix sqlness#5903
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis update introduces substantial enhancements to the flow processing infrastructure, focusing on batching mode, flow consistency, and schema handling. The core flow engine is refactored to a dual-engine architecture with a new background consistency check mechanism that reconciles flows between metadata and runtime engines. Batching mode receives improved schema alignment, time window handling, and robust logical plan manipulation for insert operations. The Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant FlowDualEngine
participant StreamingEngine
participant BatchingEngine
participant FlowMetadataManager
participant CatalogManager
Client->>FlowDualEngine: create_flow / remove_flow / flush_flow
FlowDualEngine->>FlowMetadataManager: Query metadata for flows
FlowDualEngine->>CatalogManager: Resolve table info
FlowDualEngine->>StreamingEngine: Create/Remove/Flush flow if streaming
FlowDualEngine->>BatchingEngine: Create/Remove/Flush flow if batching
FlowDualEngine-->>Client: Respond with result
Note over FlowDualEngine: Background Consistency Check Task
FlowDualEngine->>FlowMetadataManager: Periodically fetch flow metadata
FlowDualEngine->>StreamingEngine: List actual flows
FlowDualEngine->>BatchingEngine: List actual flows
FlowDualEngine->>StreamingEngine: Create/Drop flows to match metadata
FlowDualEngine->>BatchingEngine: Create/Drop flows to match metadata
sequenceDiagram
participant FrontendClient
participant DistributedPeer
participant StandaloneHandler
alt Distributed Mode
FrontendClient->>DistributedPeer: Send request via ChannelManager
DistributedPeer-->>FrontendClient: Return result
else Standalone Mode
FrontendClient->>StandaloneHandler: Lock & upgrade handler
StandaloneHandler->>StandaloneHandler: Execute query
StandaloneHandler-->>FrontendClient: Return result
end
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
Some sqlness is still not fixed, but other code are ready for review |
d45f81f to
97028f3
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the flow engine to adopt a batching mode for aggregate queries, while also updating the SQL planning/insert logic and frontend client usage. Key changes include:
- Replacing the traditional manager with a dual engine interface (dual_engine) and updating relevant function calls
- Extensive modifications in batching_mode components (engine, task, frontend_client) to support new query plans and recovery mechanisms
- Minor fixes in proto version and test adjustments for SQL query consistency
Reviewed Changes
Copilot reviewed 49 out of 49 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| src/frontend/Cargo.toml | Added dependency bytes.workspace |
| src/flow/src/server.rs | Updated engine references to use dual_engine in service creation |
| src/flow/src/lib.rs | Added new re-exports for GrpcQueryHandlerWithBoxedError |
| src/flow/src/engine.rs | Introduced list_flows into the FlowEngine trait |
| src/flow/src/batching_mode/* | Numerous updates to query generation, auto-column rewriting, and task state handling |
| src/flow/src/adapter/flownode_impl.rs | Refactored flownode implementation to use FlowDualEngine and consistent check task management |
| src/cmd/src/standalone.rs & src/cmd/src/flownode.rs | Adjusted client creation to support new frontend grpc handler usage |
| Cargo.toml | Updated greptime-proto to a new revision |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 11
🔭 Outside diff range comments (3)
src/flow/src/batching_mode/state.rs (1)
50-63:⚠️ Potential issue
task_handleis never awaited or aborted – leaked join handle
TaskStatenow storestask_handle: Option<JoinHandle<()>>but no code
everawaits, joins or cancels the task during shutdown. This may:
- keep the runtime alive after
BatchingTaskis dropped,- swallow panics inside the task (they will be logged only when joined).
Consider:
impl TaskState { pub fn set_task_handle(&mut self, handle: JoinHandle<()>) { self.task_handle = Some(handle); } pub async fn stop(&mut self) { if let Some(handle) = self.task_handle.take() { handle.abort(); // or .await } } }and invoking
stop()from the owning engine’s shutdown path.tests/cases/standalone/common/flow/flow_basic.sql (1)
274-278: 🛠️ Refactor suggestion
⚠️ Potential issueRemove the dangling commas in
SELECT …listsSeveral
SELECTclauses end their projection list with a trailing comma right before theFROMkeyword:SELECT DISTINCT country, -- <‑ trailing comma FROM ngx_access_log; … SELECT DISTINCT country, date_bin( … ) as time_window, -- <‑ trailing comma FROM ngx_access_logAlthough Greptime’s parser may currently tolerate this, ANSI SQL (and most third‑party parsers/editors/lint tools) treat the extra comma as a syntax error. Keeping it limits cross‑DB portability and makes automated tooling complain.
-SELECT - DISTINCT country, -FROM - ngx_access_log; +SELECT DISTINCT country +FROM ngx_access_log; - date_bin(INTERVAL '1 hour', access_time) as time_window, + date_bin(INTERVAL '1 hour', access_time) as time_windowA quick search shows the same pattern in the Android‑log flow (lines 678‑684). Please drop the final comma at each affected spot.
Also applies to: 336-342, 678-684, 734-740
src/flow/src/server.rs (1)
343-375: 💡 Verification agent🧩 Analysis chain
Flow recovery is no longer executed after switching to
FlowDualEngine
FlownodeBuilder::buildused to callself.recover_flows(&manager)so flows were recreated on restart.
After migrating to the dual‑engine the call disappeared, meaning flows are not recovered any more:let dual = FlowDualEngine::new(…); // missing: self.recover_flows(&dual).await?;Unless recovery moved elsewhere, this breaks durability across restarts.
@@ let dual = FlowDualEngine::new(…); // Recover persisted flows before exposing services -let server = FlownodeServer::new(FlowService::new(Arc::new(dual))); +let recovered = self.recover_flows(&dual).await?; +info!("Recovered {} flows", recovered); + +let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));Please verify the regression and restore recovery logic or document why it is no longer required.
🏁 Script executed:
#!/bin/bash # Search for recover_flows definition and usages in the codebase rg -n "recover_flows" -A 5Length of output: 500
Invoke flow recovery before starting the server
The
recover_flowsmethod is defined (src/flow/src/server.rs:391) but never called inFlownodeBuilder::build. Without invoking it on theFlowDualEngine, persisted flows won’t be recreated on restart, breaking durability.• Location needing fix:
- src/flow/src/server.rs, in
pub async fn buildafter creatingdual
• Suggested change:let dual = FlowDualEngine::new( manager.clone(), batching, self.flow_metadata_manager.clone(), self.catalog_manager.clone(), ); +// Recover persisted flows before exposing services +let recovered = self.recover_flows(&dual).await?; +info!("Recovered {} flows", recovered); let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));Please restore the recovery call or document why it’s no longer required.
🧹 Nitpick comments (24)
tests/cases/standalone/common/flow/flow_blog.sql (1)
29-29: Grouping by window start is valid
Grouping on thestart_windowalias correctly buckets rows into 5‑second windows. For more predictable test results, consider adding anORDER BY start_windowin the final SELECT queries to enforce deterministic output ordering.tests/cases/standalone/common/flow/flow_view.sql (1)
35-35: Track the new TODO in an issue.
The added-- TODO(discord9): fix flow stat update for batching mode flowhighlights a gap in your test coverage for batching mode. Please ensure this is linked to a ticket or issue ID and that there’s sufficient context (expected vs. actual stats behavior) so it can be prioritized and addressed.tests/conf/frontend-test.toml.template (1)
2-3: Document the{grpc_addr}placeholder usage.
These dynamic placeholders require a templating step (e.g., viastrfmt) to inject the actual gRPC address. Consider adding a comment here or updating your documentation/README to show how and where{grpc_addr}is set during test runs.src/frontend/src/instance.rs (1)
281-282: Consider impact of plan cloning on performanceThe change to clone the logical plan before passing it to
executeseems reasonable as it ensures the original plan remains available after execution, which may be needed for subsequent operations. However, large plans could impact performance if executed frequently.Consider adding a comment explaining why the plan needs to be cloned here to help future maintainers understand the reasoning.
src/common/meta/src/ddl/create_flow.rs (1)
309-323: Well-implemented function for flow type extraction.The
get_flow_type_from_optionsfunction:
- Correctly extracts flow type from options
- Handles known flow types (BATCHING, STREAMING)
- Returns appropriate error for unknown types
- Maintains backward compatibility by defaulting to Batching
Consider adding unit tests to verify this function's behavior for all possible inputs.
tests/cases/standalone/common/flow/flow_flush.result (1)
19-26: Prefer explicit column aliases to avoid quoted identifiers in result snapshotsThe SELECT list uses
sum(number)without an alias, which causes SQLness to emit the engine‑generated identifier"sum(numbers_input_basic.number)".
Adding an explicit alias (… AS total_sum) will
- make the output easier to read,
- reduce the chance of brittle test failures if the automatic name formatting ever changes, and
- simplify downstream parsing if these results are consumed elsewhere.
Example:
SELECT sum(number) AS total_sum, date_bin(…) FROM … GROUP BY time_window;tests/cases/standalone/common/flow/flow_rebuild.sql (1)
174-186: Sleep‑based stabilisation leads to slow & flaky testsMultiple fixed
-- SQLNESS SLEEP 3spauses are introduced to “give flownode a second to rebuild flow”.
Hard‑coded sleeps:
- slow the suite unnecessarily on healthy runs,
- can still be too short on busy CI machines, and
- mask real readiness problems.
A more robust pattern is to poll for a readiness condition with a timeout, e.g. retry
ADMIN SHOW_FLOW('…')until it reportsRUNNINGorREBUILDINGflag disappears, then proceed.
This trims idle time while keeping determinism.Would you like guidance or a helper script to implement polling in SQLness?
tests/cases/standalone/common/flow/flow_advance_ttl.result (1)
108-116: Wall‑clock sleep may be unnecessary for TTL driven by timestamp columnRows are inserted with timestamps in 2021, so a
ttl = '5s'based on thetscolumn will consider them expired immediately—no 6 s real‑time wait is required.
Relying on-- SQLNESS SLEEP 6sincreases runtime and can introduce flakiness on slow CI nodes.Consider removing the sleep and simply invoking
ADMIN FLUSH_TABLEright away, or insert rows whosetsis computed relative tonow()if true wall‑clock expiry is what you intend to test.tests-integration/src/standalone.rs (3)
177-188: Frontend ↔︎ Flownode coupling: retain strong reference to the client
FlownodeBuilderreceivesArc::new(frontend_client), but the variable itself is moved into theArcand not kept elsewhere.
That’s fine, however make sure no future code expects to access the sameFrontendClientinstance outside theArc, otherwise it will have been moved.Small nit: consider naming the tuple return to make ownership clearer:
let (frontend_client, frontend_handler) = FrontendClient::from_empty_grpc_handler(); let frontend_client = Arc::new(frontend_client); // explicit
253-260: Unwrap on mutex lock can panic and hide deadlocksUsing:
frontend_instance_handler .lock() .unwrap() .replace(weak_grpc_handler);will panic the whole test binary if the mutex is poisoned (e.g. due to a previous panic on another thread). Although acceptable in unit tests, replacing with
expect("mutex poisoned")gives a clearer message, or propagate the error with?in async contexts.- .lock() - .unwrap() + .lock() + .expect("frontend_instance_handler mutex poisoned")
261-262: Keep a reference tostreaming_engine()until after invoker injection
flow_worker_manager.set_frontend_invoker(invoker).await;relies on
flow_worker_managerstaying alive. You already cloned it, so it is safe, but note that if theFlownodeInstancedrops later in the test setup, the invoker will lose its callback early.
Consider storing the clone inGreptimeDbStandalone(next tofrontend) if subsequent test logic needs it.src/cmd/src/standalone.rs (1)
527-538: Avoid the extra clone & double‐wrapping ofFrontendClient
Arc::new(frontend_client.clone())allocates a freshFrontendClientand then wraps it again in anArc.
Becausefrontend_clientis already owned by the current scope, you can hand it over directly and avoid an
unnecessary clone:-let flow_builder = FlownodeBuilder::new( - flownode_options, - plugins.clone(), - table_metadata_manager.clone(), - catalog_manager.clone(), - flow_metadata_manager.clone(), - Arc::new(frontend_client.clone()), -); +let flow_builder = FlownodeBuilder::new( + flownode_options, + plugins.clone(), + table_metadata_manager.clone(), + catalog_manager.clone(), + flow_metadata_manager.clone(), + Arc::new(frontend_client), // move, no extra clone +);Minor, but keeps allocations down and avoids two layers of reference counting on the same object.
src/operator/src/statement/ddl.rs (1)
389-421:determine_flow_typedoes not need to beasyncThe function performs no
.awaitoperations, yet it is declaredasyncand is awaited by
the caller (determine_flow_type(&plan).await?).
The extra state machine adds overhead and needlessly complicates the call‑site.-async fn determine_flow_type(plan: &LogicalPlan) -> Result<FlowType> { +fn determine_flow_type(plan: &LogicalPlan) -> Result<FlowType> { … } … -let flow_type = determine_flow_type(&plan).await?; +let flow_type = determine_flow_type(&plan)?;Removing the
asynckeyword yields clearer intent and slightly better performance.src/flow/src/batching_mode/state.rs (1)
122-125:add_windowignores merging logic – possible explosion of window entries
add_window()directly inserts(start, end)without calling
merge_dirty_time_windows, so two overlapping or adjacent windows added
through this API will stay separate and may blow past
MAX_FILTER_NUM, causing unexpected truncation later.Recommend invoking the merge immediately or documenting the expectation
that callers perform a subsequentmerge_dirty_time_windowscall.src/common/query/src/logical_plan.rs (1)
90-126: Consider handling unsupported/unknownWriteOpmore explicitly
breakup_insert_plansilently returnsNonewhen the plan is not anINSERT ... APPEND.
Returning an error variant (or at least logging a debug trace) will make diagnosis much easier for callers that rely on the presence of an insert without checking the return value.src/frontend/src/instance/grpc.rs (1)
318-338: Optional: optimise after augmenting the plan, not beforeThe first decoding step disables optimisation (
optimize = false) but the subsequent analysis/optimise pass happens after the insert node has been added.
If the initial logical plan is large, decoding it twice per request may become a bottleneck. You could:
- Decode & optimise the plan once.
- Insert the write op.
- Run the (cheaper) rule‑based projection/column pruning pass.
This would avoid doing heavy optimisation work twice.
src/flow/src/batching_mode/engine.rs (2)
365-367: Logging placeholder prints literally{flow_id}
tracing::info!treats the format string asformat_args!, so the braces do not interpolate variables.
Use positional formatting or structured logging:- info!("Try flush flow {flow_id}"); + info!("Try flush flow {}", flow_id); + // or + // info!(flow_id, "Try flush flow");
190-209: Avoid double table‑info lookups and improve naming utilities
get_table_nameimmediately callsget_table_info, which then fetches the same value again when the caller needs both the name and meta.
Returning theTableInfoValueonce and letting the caller extract what it needs removes one round‑trip to the underlying store:let table_info = get_table_info(...).await?; let table_name = table_info.table_name();Consider exposing
table_namedirectly onTableInfoValueto streamline this common access pattern.src/flow/src/server.rs (2)
189-194: The consistency‑check task is never awaited on shutdown
stop_workerssignals cancellation but does not await / join the underlying task created bystart_flow_consistent_check_task. If that task owns resources (locks, I/O handles) it may keep running past shutdown or panic later.Please expose a
JoinHandlefromstart_flow_consistent_check_taskand await it here (or at least callabort).
70-80: Stale TODO & naming nitThe struct still carries a “TODO: replace with dual engine” even though the replacement is complete, and the field is named
dual_engine, not a “manager”. Clean‑up improves readability.-pub struct FlowService { - /// TODO(discord9): replace with dual engine - pub dual_engine: FlowDualEngineRef, -} +pub struct FlowService { + /// Shared dual engine (streaming + batching) + pub dual_engine: FlowDualEngineRef, +}src/flow/src/batching_mode/frontend_client.rs (1)
260-267: Overflow risk when down‑castingaffected_rowstou32
affected_rowsarrives asu64but is truncated viatry_into()tou32.
Large batch inserts will silently error out once the row count exceedsu32::MAX(≈4 B). Consider:
- returning the original
u64, or- capping with a clear error message before the conversion.
src/flow/src/batching_mode/task.rs (1)
435-445: Error handling loop can create a tight log‑spam cycleAfter logging an error the loop sleeps only
MIN_REFRESH_DURATION(currently very small) and immediately retries, potentially spamming logs if the root cause is persistent (e.g. schema mismatch).Consider exponential back‑off or capping the retry rate for repeated identical errors to keep logs readable and avoid unnecessary load.
src/flow/src/adapter/flownode_impl.rs (2)
446-456: Defaulting toBatchingwhen the option is absent may break existing usersHistorically flows were streaming unless explicitly marked as batching. Switching the
implicit default toBatchingchanges behaviour for any client that does not send the
flow_typeoption (e.g. older CLI versions, scripts, tests). Silent behavioural changes
are hard to diagnose.Consider:
- Making the field mandatory and returning a validation error when missing.
- Keeping the previous default (
Streaming) for backward compatibility and requiring
an explicit flag for batching.- At minimum, emit a
warn!log indicating which flow is being defaulted.Clarifying this avoids unexpected production surprises.
518-523:list_flowsmay return duplicate IDs
streaming_engine.list_flows()andbatching_engine.list_flows()are blindly concatenated.
If (mis‑configuration, transition period, or bug) ever places the sameflow_idin both
engines the caller will receive duplicates, potentially exploding metrics or causing
double‑drops.- stream_flows.extend(batch_flows); - Ok(stream_flows) + stream_flows.extend(batch_flows); + stream_flows.sort_unstable(); + stream_flows.dedup(); + Ok(stream_flows)A set‑based merge is cheap (the vectors are tiny) and gives stronger API guarantees.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (48)
Cargo.toml(1 hunks)src/api/src/helper.rs(1 hunks)src/catalog/src/table_source.rs(1 hunks)src/cmd/src/flownode.rs(2 hunks)src/cmd/src/standalone.rs(4 hunks)src/common/meta/src/ddl/create_flow.rs(4 hunks)src/common/meta/src/ddl/tests/create_flow.rs(1 hunks)src/common/query/src/logical_plan.rs(2 hunks)src/flow/src/adapter.rs(2 hunks)src/flow/src/adapter/flownode_impl.rs(6 hunks)src/flow/src/batching_mode/engine.rs(12 hunks)src/flow/src/batching_mode/frontend_client.rs(3 hunks)src/flow/src/batching_mode/state.rs(8 hunks)src/flow/src/batching_mode/task.rs(21 hunks)src/flow/src/batching_mode/time_window.rs(2 hunks)src/flow/src/batching_mode/utils.rs(13 hunks)src/flow/src/engine.rs(1 hunks)src/flow/src/lib.rs(1 hunks)src/flow/src/server.rs(11 hunks)src/frontend/Cargo.toml(1 hunks)src/frontend/src/error.rs(1 hunks)src/frontend/src/instance.rs(1 hunks)src/frontend/src/instance/grpc.rs(3 hunks)src/operator/src/statement/ddl.rs(4 hunks)src/servers/tests/mod.rs(1 hunks)tests-integration/src/standalone.rs(4 hunks)tests/cases/standalone/common/flow/flow_advance_ttl.result(4 hunks)tests/cases/standalone/common/flow/flow_advance_ttl.sql(2 hunks)tests/cases/standalone/common/flow/flow_auto_sink_table.result(4 hunks)tests/cases/standalone/common/flow/flow_auto_sink_table.sql(1 hunks)tests/cases/standalone/common/flow/flow_basic.result(15 hunks)tests/cases/standalone/common/flow/flow_basic.sql(10 hunks)tests/cases/standalone/common/flow/flow_blog.result(2 hunks)tests/cases/standalone/common/flow/flow_blog.sql(2 hunks)tests/cases/standalone/common/flow/flow_call_df_func.result(6 hunks)tests/cases/standalone/common/flow/flow_call_df_func.sql(6 hunks)tests/cases/standalone/common/flow/flow_flush.result(1 hunks)tests/cases/standalone/common/flow/flow_flush.sql(1 hunks)tests/cases/standalone/common/flow/flow_null.result(2 hunks)tests/cases/standalone/common/flow/flow_null.sql(2 hunks)tests/cases/standalone/common/flow/flow_rebuild.result(15 hunks)tests/cases/standalone/common/flow/flow_rebuild.sql(9 hunks)tests/cases/standalone/common/flow/flow_user_guide.result(4 hunks)tests/cases/standalone/common/flow/flow_user_guide.sql(4 hunks)tests/cases/standalone/common/flow/flow_view.result(1 hunks)tests/cases/standalone/common/flow/flow_view.sql(1 hunks)tests/conf/frontend-test.toml.template(1 hunks)tests/runner/src/server_mode.rs(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
tests-integration/src/standalone.rs (2)
src/flow/src/batching_mode/frontend_client.rs (1)
from_empty_grpc_handler(89-97)src/cmd/src/flownode.rs (1)
flownode(68-70)
src/operator/src/statement/ddl.rs (6)
src/flow/src/adapter/flownode_impl.rs (2)
create_flow(440-470)create_flow(783-785)src/common/meta/src/ddl/create_flow.rs (2)
value(424-429)value(430-435)src/operator/src/statement.rs (2)
plan(445-455)new(102-122)src/query/src/plan.rs (2)
extract_and_rewrite_full_table_names(116-123)f_down(37-102)src/frontend/src/instance.rs (1)
query_engine(137-139)src/query/src/query_engine.rs (1)
query_engine(150-152)
🔇 Additional comments (65)
tests/cases/standalone/common/flow/flow_null.sql (2)
9-10: Enable append mode forrequeststable
The newappend_mode = 'true'option correctly configures therequeststable for append‑only ingestion in streaming flows and aligns this test with the broader append‑mode changes in the PR.
64-65: Enable append mode forngx_access_logtable
Addingappend_mode = 'true'tongx_access_logensures this test exercises the updated append‑mode behavior for key‑value stream processing, consistent with other test cases.tests/cases/standalone/common/flow/flow_blog.sql (1)
18-20: Switch to explicit time binning for windowing is correct
Usingdate_bin(INTERVAL '5 second', ts)to derivestart_windowand then adding the interval forend_windowaligns with the PR’s move away fromtumble. The aliases are clear and should integrate smoothly with batching mode.src/frontend/Cargo.toml (1)
18-18: Verify thatbytesis needed infrontend.
You’ve addedbytes.workspace = true—please confirm this crate is actually used in the frontend module (e.g., for buffer manipulation in the new InsertIntoPlan support). If it’s unused, removing it will keep dependencies lean.src/common/meta/src/ddl/tests/create_flow.rs (1)
49-49: Approve realistic SQL in test utility.
Switching thesqlfield from a placeholder to"select 1"makes this helper exercise the SQL parsing and flow type detection logic in your production code. This is a positive change.Cargo.toml (1)
132-132: Validate thegreptime-protorevision bump.
Updating the git rev forgreptime-protomay change generated Protobuf types. Make sure to run the full test suite (including any codegen) and address any API mismatches introduced by this new commit.src/servers/tests/mod.rs (1)
135-135: Good update for handling the new InsertIntoPlan query type.This change adds the new
Query::InsertIntoPlan(_)variant to the match arm, ensuring the dummy test implementation handles this new query type the same way as LogicalPlan queries (with unimplemented!). This aligns with the broader addition of InsertIntoPlan in the PR.src/api/src/helper.rs (1)
517-517: LGTM: Proper type identification for the new InsertIntoPlan query.This change correctly adds support for the new
Query::InsertIntoPlanvariant in the query type identification function, returning the appropriate type string "query.insert_into_plan". This is consistent with how other query types are handled.src/flow/src/engine.rs (1)
52-53: LGTM: Added flow enumeration capability.The new
list_flowsmethod is a good addition to the FlowEngine trait, allowing enumeration of all current flows. This supports the new flow consistency checking functionality mentioned in the PR objectives.tests/runner/src/server_mode.rs (1)
392-394: LGTM: Essential server address configuration for local testing.Adding the
--rpc-server-addrargument with the same value as the bind address ensures proper server address advertisement in the cluster info when running tests locally. This is essential for the frontend client changes in this PR, where proper server addressing is crucial.tests/cases/standalone/common/flow/flow_view.result (2)
75-75: Acknowledge the TODO comment for tracking workThis TODO comment correctly identifies a known issue with flow statistics updates for batching mode flows. Ensure this is tracked in your issue management system.
81-81: Verify test result change is expectedThe test result now expects
last_execution_time IS NOT NULLto befalse, which is a behavior change from the previous implementation. This matches with the TODO comment about flow stat updates for batching mode.src/flow/src/adapter.rs (2)
61-61: Module visibility change supports new dual-engine architectureMaking
flownode_implmodule public within the crate enables the integration with the newFlowDualEnginearchitecture that combines streaming and batching engines.
161-161: Field visibility change for dual-engine integrationMaking the
node_idfield public allows other components in the crate to access this identifier directly, which supports the new dual-engine architecture. This is a reasonable change given the architectural updates.tests/cases/standalone/common/flow/flow_null.result (2)
8-9: Table schema updated for append mode compatibilityAdding the
append_mode = 'true'option to the table definition aligns with the batching mode flow enhancements. This change is consistent with similar updates across multiple flow-related tests.
98-99: Table schema updated for append mode compatibilityAdding the
append_mode = 'true'option to thengx_access_logtable definition maintains consistency with other flow-related test tables that have been updated to support batching mode.src/frontend/src/error.rs (1)
348-348: Clean update to error handling for substrait decode operations.The source error type for
SubstraitDecodeLogicalPlanhas been changed fromsubstrait::error::Errortocommon_query::error::Error, which aligns the error handling with the broader query error module used across the system. This change supports the modifications to logical plan decoding infrastructure.src/catalog/src/table_source.rs (1)
30-30: Module visibility change to support catalog list integration.Making the
dummy_catalogmodule public allows external modules to accessDummyCatalogList, which is needed for the newInsertIntoPlanhandling in the gRPC frontend instance. This change enables proper catalog management during plan decoding.src/flow/src/lib.rs (1)
47-47: Added export for the new error handling abstraction.Exposing
GrpcQueryHandlerWithBoxedErroralongsideFrontendClientsupports the enhanced frontend client abstraction that can work in both distributed and standalone modes. This export is essential for the dual-engine architecture mentioned in the PR objectives.src/flow/src/batching_mode/time_window.rs (2)
75-84: Added Display implementation to improve debugging.This implementation enhances logging and debugging capabilities for time window expressions, which are critical for batching mode operations. The formatted output includes all relevant fields needed for troubleshooting time window alignment issues.
270-271: Documentation improvement using backticks.The comment was updated to format the returned tuple elements using backticks, which improves readability and clarity in the documentation.
tests/cases/standalone/common/flow/flow_auto_sink_table.sql (2)
10-11: Time window implementation updated from tumble to date_binThe code now uses
date_binfunction for time windowing instead of the previoustumbleapproach. This change aligns with the batching mode improvements that provide better time window expressions and schema handling.
15-15: GROUP BY clause updated to use named window columnThe GROUP BY clause now references the aliased
time_windowcolumn, which provides better clarity and consistency with the SELECT clause definition.tests/cases/standalone/common/flow/flow_user_guide.result (4)
400-400: Column renamed from update_at to event_tsThe timestamp column has been renamed from
update_attoevent_tsfor better semantics, indicating that it represents an event timestamp rather than an update timestamp.
411-411: Added event timestamp to flow outputThe flow now explicitly selects
max(ts) as event_ts, ensuring that event timestamp information is preserved in the output. This improves data lineage tracking and helps with time-based analysis.
442-443: Updated SELECT query to include event_tsThe SELECT query has been updated to include the renamed
event_tscolumn alongsidemax_temp, maintaining consistency with the schema changes.
476-481: Updated result table to show event_ts columnThe result table now displays the
event_tsvalues alongside the sensor data, providing better visibility into when the temperature readings occurred.src/cmd/src/flownode.rs (2)
348-348: Updated to use new dual-engine architectureThe code now uses
flownode.flow_engine().streaming_engine()instead offlownode.flow_worker_manager(), reflecting the architectural change to a dual-engine model that separates streaming and batching engines.
358-360: Frontend invoker now set on streaming engineThe frontend invoker is now set on the streaming engine component of the flow engine rather than directly on the flow worker manager, aligning with the new architecture.
tests/cases/standalone/common/flow/flow_blog.result (2)
22-24: Enhanced flow definition with explicit time window calculationsThe flow now:
- Calculates average speed as
avg((left_wheel + right_wheel) / 2)with a clear alias- Uses
date_binfor start window computation instead of the previoustumblefunction- Adds an explicit end window calculation by adding the interval to the start window
This provides more flexibility and clearer window boundary representations.
33-33: GROUP BY clause updated to use named window columnThe GROUP BY clause now references the aliased
start_windowcolumn, providing better clarity and consistency with the window definition in the SELECT clause.tests/cases/standalone/common/flow/flow_advance_ttl.sql (3)
9-16: Excellent test case for validating flow creation constraints on instant TTL tables.This test correctly verifies that creating a flow on a table with TTL='instant' should fail, which aligns with the batching mode validation that rejects flows on tables with instant TTL.
17-17: Validates the flow creation transition path after TTL change.Good test approach - changing the TTL to a non-instant value should allow flow creation to succeed in the following steps.
47-67: Well-structured test for TTL expiration effects on flow data processing.This section properly tests how the flow behaves after TTL expiration by:
- Waiting for TTL to expire
- Flushing table and flow
- Inserting new data
- Verifying the flow state reflects the TTL-affected data correctly
The use of SQLNESS directives ensures stable test output.
tests/cases/standalone/common/flow/flow_user_guide.sql (3)
294-294: Good semantic improvement for timestamp column naming.Renaming from 'update_at' to 'event_ts' better represents the actual meaning of the timestamp - the time when the max temperature event occurred.
303-303: Flow definition properly updated to include event timestamp.Adding
max(ts) as event_tsensures the timestamp of the maximum temperature reading is propagated properly.
324-325: Queries consistently updated to reflect schema changes.The SELECT queries have been properly updated to match the schema changes in the table and flow definition.
Also applies to: 342-343
tests/cases/standalone/common/flow/flow_flush.sql (1)
1-38: Well-structured test for flow flushing functionality.This test effectively validates the FLUSH_FLOW command by:
- Creating a source table with timestamp data
- Defining a flow with time-windowed aggregation
- Executing FLUSH_FLOW to propagate data
- Verifying the aggregated results in the output table
The test includes proper data preparation, explicit time windows, and thorough cleanup.
src/common/meta/src/ddl/create_flow.rs (2)
174-174: Flow type now dynamically determined from task options.Good improvement. The flow type is now extracted from the options via
get_flow_type_from_optionsinstead of being hardcoded, supporting both streaming and batching modes.
199-201: Enhanced logging includes flow type information.Improved log message now includes the flow type alongside flow ID and peers, which will aid in debugging and observability.
tests/cases/standalone/common/flow/flow_rebuild.sql (1)
100-105: Added assertion is valuable but should be paired with deterministic expectation
SELECT count(*) FROM input_basic;is a good cross‑check, yet the expected value is not captured in the.sqlfile.
If the result is only verified indirectly in the paired.resultfile, please ensure:
- the count is stable across all rebuild/restart paths,
- and the
.resultsnapshot explicitly lists the expected row count, otherwise test drift may go unnoticed.If you prefer not to hard‑code the number, consider adding a
-- SQLNESS CHECKdirective with a pattern.tests-integration/src/standalone.rs (1)
191-192:flow_engine()replacesflow_worker_manager()– double‑check feature paritySwitching to the dual engine is the right direction, but some helper methods (metrics, diagnostic dumps) previously exposed by
flow_worker_manager()may not yet be forwarded byflow_engine().
Please verify that any test utilities or admin commands used elsewhere in the suite still work after this change.tests/cases/standalone/common/flow/flow_call_df_func.sql (4)
12-12: Consistent replacement oftumblewithdate_bintime window approachThe change from
tumbletodate_binfor time window grouping is well-implemented, maintaining the same time interval (1 second) and origin time ('2021-07-01 00:00:00'). This change aligns with the batch mode improvements described in the PR.
27-27: Correctly updated SELECT queries to use the new time_window columnThe SELECT queries have been properly updated to reference the new
time_windowcolumn instead of the previouswindow_start/window_endcolumns, maintaining query result consistency.Also applies to: 41-41
58-58: Consistent application ofdate_binin the second flow definitionThe second flow definition correctly implements the same time window pattern using
date_bin, maintaining consistency across all flow definitions.
72-72: Correctly updated all dependent SELECT queriesAll dependent queries have been consistently updated to use the new time_window schema, ensuring the test logic remains valid.
Also applies to: 85-85
tests/cases/standalone/common/flow/flow_call_df_func.result (2)
14-14: Consistent SQL output updates to match new time_window approachThe result file has been properly updated to reflect the new query structure using
date_binand thetime_windowcolumn, ensuring the expected outputs match the modified queries.Also applies to: 45-45, 79-79, 143-143, 176-176
47-51: Result table structure correctly updated for new schemaThe expected output tables have been properly adjusted to show a single
time_windowcolumn instead of separate window_start/window_end columns, with properly aligned column headers and data.Also applies to: 81-86, 145-149, 178-183
tests/cases/standalone/common/flow/flow_rebuild.result (5)
6-8: Added append_mode to table creation for batching compatibilityThe
append_mode = 'true'property was added to the table definition, which is necessary for proper operation with the new batching mode flow engine. This is consistent with the architectural changes described in the PR.
171-172: Added helpful comments documenting batching mode behaviorComments have been added to clarify that the test is now operating in flow batching mode, which helps with understanding the expected behavior changes.
Also applies to: 513-514
180-186: Added validation queries to verify batching mode behaviorNew queries were added to count rows in the input tables, providing additional verification of the batching mode's correct operation.
Also applies to: 522-528
315-322: Added synchronization points for flow rebuildsMultiple
SELECT 1;statements andSQLNESS SLEEP 3sdirectives have been added to ensure proper synchronization between test steps, addressing the asynchronous nature of flow rebuilds in the new dual-engine architecture.Also applies to: 323-324, 333-334, 382-389, 390-391, 400-401, 432-439, 440-441, 450-452, 484-494
519-520: Updated expected count to match batching mode behaviorThe expected count in the wildcard query output has been updated from 3 to 4, correctly reflecting the behavior change in batching mode where all rows are processed together.
tests/cases/standalone/common/flow/flow_auto_sink_table.result (3)
12-14: Updated flow definition with date_bin-based time windowThe flow definition has been updated to use
date_binfor time window calculations, consistent with the changes in other flow test files and aligning with the batching mode improvements.Also applies to: 17-17
28-29: Sink table schema correctly updated with higher precision timestampThe sink table schema has been properly updated to:
- Replace separate window_start/window_end columns with a single time_window column
- Increase time precision from TIMESTAMP(3) to TIMESTAMP(9) for more accurate time handling
- Update the TIME INDEX to reference the new time_window column
This change is properly maintained after system restart as confirmed in lines 54-56.
Also applies to: 30-30, 54-55, 56-56
65-71: Updated SHOW CREATE FLOW output to reflect new query structureThe SHOW CREATE FLOW command output has been updated to correctly reflect the new query structure that uses date_bin and the time_window column approach.
src/operator/src/statement/ddl.rs (1)
428-433: Confirm string representation ofFlowTypematches downstream parser
flow_options.insert(FLOW_TYPE_KEY, flow_type.to_string())relies on
Displayreturning exactly the sentinel strings compared
with inflow::adapter::flownode_impl("batching"/"streaming").Mismatch in capitalisation (“Batching” vs “batching”) will route the flow to
FlowType::Batchingby default and silently ignore the requested type.Please double‑check the two implementations or standardise via
flow_type.as_str()returning a lowercase&'static str.src/common/query/src/logical_plan.rs (1)
100-110: 🛠️ Refactor suggestionPartial
TableReferenceloses provided schema – use the value coming from the plan instead of the fallback oneWhen the incoming plan contains a
TableReference::Partial { schema, table }, the caller has already supplied a schema. Overwriting it with theschemaargument silently discards information and may route the insert to the wrong schema when the current session‐level default differs from the reference in the plan.- let table_name = match table_name { + let table_name = match table_name { TableReference::Bare { table } => TableName { catalog_name: catalog.to_string(), schema_name: schema.to_string(), table_name: table.to_string(), }, TableReference::Partial { schema, table } => TableName { catalog_name: catalog.to_string(), - schema_name: schema.to_string(), + // honour the schema provided by the plan + schema_name: schema.to_string(), table_name: table.to_string(), },Likely an incorrect or invalid review comment.
tests/cases/standalone/common/flow/flow_basic.result (7)
12-17: Flow query syntax updated to usedate_bininstead oftumbleThe test now uses
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_windowfor time windowing instead of the previoustumblefunction. This aligns with the PR's objective to use batching mode for aggregation queries and fix SQL correctness.
28-30: Schema updated to use TIMESTAMP(9) precision for time windowsThe output table schema now uses a single
time_windowcolumn with TIMESTAMP(9) precision instead of separate window_start/window_end columns. This higher precision timestamp provides more accurate time tracking for flow operations.
88-92: Output format updated to reflect schema changesThe query result format has been updated to show results with the new
time_windowcolumn format, maintaining consistency with the schema changes.
894-896: Table option added for append-only behaviorThe
append_mode = 'true'option has been added to the table definition, indicating that this table only supports appending data. This is important for the batching mode operation as mentioned in the PR objectives.
904-905: Timestamp column renamed and update_at column addedThe timestamp column was renamed from
tstoevent_tsand anupdate_attimestamp column was added. This provides better separation between the event time and update time, which is beneficial for correct time-based processing in flows.
915-915: Flow query updated to use event_ts for timestampThe query has been modified to select
max(ts) as event_tsto match the schema changes in the target table, ensuring proper time indexing.
1054-1056: All data source tables consistently configured for append-only modeAll source tables for flows (ngx_access_log, requests, android_log) have been updated with
append_mode = 'true', which indicates a consistent pattern of making tables append-only for use with flows in batching mode.Also applies to: 1190-1192, 1401-1403, 1514-1516
|
This looks like a breaking change. @discord9 |
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
related proto change(already merged):
GreptimeTeam/greptime-proto#231
What's changed and what's your intention?
BatchingModefor aggr query with flowInsertIntoplan ingreptime_requestproto to hack around that substrait WriteRel decode to datafusion INSERT INTO failure(see proto PR andsrc/common/query/src/logical_plan.rsfor it's handling)Weak<frontent::Instance>instead of using grpc conn(seesrc/cmd/src/standalone.rs&src/flow/src/batching_mode/frontend_client.rs), notice here useGrpcQueryHandlerWithBoxedErrorto type erase thefrontend::Instanceadd a determiner in metasrv to decide which flow type a query should use(for now it's aggr query for batching mode, and non-aggr query for streaming mode), seesrc/common/meta/src/ddl/create_flow.rsFlowTypeby visiting and search forAggregate&Distinct, seesrc/operator/src/statement/ddl.rsrecover_flowslogic on flownode start up to when the flownode start worker(seesrc/flow/src/adapter/flownode_impl.rs: ConsistentCheckTask::start_check_task) and make the task always running to check for inconsistent(i.e. metasrv have flow task's metadata, but flownode is not running it) and emit a warning when that happenflush_flowanddrop_flowlogic so when flownode startup and flows are not fully recovered they won't fail due to can't found flow, the logic here may need further cleanup, for now if flow is not found in drop flow, it will signal the check task and allow it to drop flow if it have no corrseponding metadata (seesrc/flow/src/adapter/flownode_impl.rs)ttl=instant,THIS IS BREAKING CHANGE to flow's behavior, as it accept ttl=instant beforetumblefrom sqlness for now(which is already removed from docs)AS <column_name>in query when needed, seesrc/flow/src/batching_mode/task.rs:gen_insert_plan & gen_query_with_time_window & AddAutoColumnRewriterChannelManagerin dist mode frontend client:src/flow/src/batching_mode/frontend_client.rsstate.rs: gen_filter_exprs&task.rs: mark_all_windows_as_dirty&engine.rs: flush_flow_innerPS: about 600+ line changes are sqlness update so this is not a very large PR(
non-blocking:
FlowStatneed to include batch mode flow, but prefer to do it in a separate PRPR Checklist
Please convert it to a draft if some of the following conditions are not met.
Summary by CodeRabbit
New Features
flush_flowoperation.date_bin.Improvements
time_windowcolumn instead ofwindow_startandwindow_end.Bug Fixes
Documentation
Chores