Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e04ffd5
feat: use flow batching engine
discord9 Apr 11, 2025
32da8ea
chore: update proto
discord9 Apr 18, 2025
80ef20d
chore: update proto to main branch
discord9 Apr 18, 2025
4a7491b
fix: add locks for create/drop flow&docs: update docs
discord9 Apr 18, 2025
e6d77f8
feat: flush_flow flush all ranges now
discord9 Apr 18, 2025
bb051ae
test: add align time window test
discord9 Apr 21, 2025
b70e683
docs: explain `nodeid` use in check task
discord9 Apr 21, 2025
d1f45d1
refactor: AddAutoColumnRewriter check for Projection
discord9 Apr 21, 2025
8c4fb8b
refactor: per review
discord9 Apr 21, 2025
343e56f
fix: query without time window also clean dirty time window
discord9 Apr 21, 2025
fa3cba8
chore: better logging
discord9 Apr 21, 2025
cb3d8aa
chore: add comments per review
discord9 Apr 21, 2025
1337a8c
refactor: per review
discord9 Apr 21, 2025
af81ccb
chore: per review
discord9 Apr 21, 2025
7be7857
chore: per review rename args
discord9 Apr 21, 2025
5b0e911
refactor: per review partially
discord9 Apr 21, 2025
da5ba63
chore: update docs
discord9 Apr 22, 2025
a57d4a6
chore: use better error variant
discord9 Apr 22, 2025
9a13dfc
chore: better error variant
discord9 Apr 22, 2025
e9b78a5
refactor: rename FlowWorkerManager to FlowStreamingEngine
discord9 Apr 23, 2025
5c3793b
rename again
discord9 Apr 23, 2025
18136e6
refactor: per review
discord9 Apr 23, 2025
5bbe4c0
chore: rebase after #5963 merged
discord9 Apr 23, 2025
5d24af1
refactor: rename all flow_worker_manager occurs
discord9 Apr 23, 2025
225ce36
docs: rm resolved TODO
discord9 Apr 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
Some(Query::Sql(_)) => "query.sql",
Some(Query::LogicalPlan(_)) => "query.logical_plan",
Some(Query::PromRangeQuery(_)) => "query.prom_range",
Some(Query::InsertIntoPlan(_)) => "query.insert_into_plan",
None => "query.empty",
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
mod dummy_catalog;
pub mod dummy_catalog;
use dummy_catalog::DummyCatalogList;
use table::TableRef;

Expand Down
6 changes: 4 additions & 2 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl StartCommand {
let client = Arc::new(NodeClients::new(channel_config));

let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(),
flownode.flow_engine().streaming_engine(),
catalog_manager.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
Expand All @@ -355,7 +355,9 @@ impl StartCommand {
.await
.context(StartFlownodeSnafu)?;
flownode
.flow_worker_manager()
.flow_engine()
.streaming_engine()
// TODO(discord9): refactor and avoid circular reference
.set_frontend_invoker(invoker)
.await;

Expand Down
47 changes: 28 additions & 19 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
FrontendClient, FrontendInvoker,
FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::builder::FrontendBuilder;
Expand Down Expand Up @@ -524,17 +524,17 @@ impl StartCommand {
..Default::default()
};

// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
Arc::new(frontend_client.clone()),
);
let flownode = flow_builder
.build()
Expand All @@ -544,15 +544,15 @@ impl StartCommand {

// set the ref to query for the local flow state
{
let flow_worker_manager = flownode.flow_worker_manager();
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
information_extension
.set_flow_worker_manager(flow_worker_manager.clone())
.set_flow_streaming_engine(flow_streaming_engine)
.await;
}

let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
flow_server: flownode.flow_engine(),
});

let table_id_sequence = Arc::new(
Expand Down Expand Up @@ -606,10 +606,19 @@ impl StartCommand {
.context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance);

let flow_worker_manager = flownode.flow_worker_manager();
// set the frontend client for flownode
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);

Comment thread
discord9 marked this conversation as resolved.
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(),
flow_streaming_engine.clone(),
catalog_manager.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Expand All @@ -618,7 +627,7 @@ impl StartCommand {
)
.await
.context(error::StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;
flow_streaming_engine.set_frontend_invoker(invoker).await;

let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
Expand Down Expand Up @@ -694,7 +703,7 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
}

impl StandaloneInformationExtension {
Expand All @@ -703,14 +712,14 @@ impl StandaloneInformationExtension {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_worker_manager: RwLock::new(None),
flow_streaming_engine: RwLock::new(None),
}
}

/// Set the flow worker manager for the standalone instance.
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
let mut guard = self.flow_worker_manager.write().await;
*guard = Some(flow_worker_manager);
/// Set the flow streaming engine for the standalone instance.
pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
let mut guard = self.flow_streaming_engine.write().await;
*guard = Some(flow_streaming_engine);
}
}

Expand Down Expand Up @@ -789,7 +798,7 @@ impl InformationExtension for StandaloneInformationExtension {

async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_worker_manager
self.flow_streaming_engine
.read()
.await
.as_ref()
Expand Down
24 changes: 18 additions & 6 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use table::metadata::TableId;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl CreateFlowProcedure {
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(determine_flow_type(&self.data.task));
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);

Ok(Status::executing(true))
}
Expand All @@ -196,8 +196,8 @@ impl CreateFlowProcedure {
});
}
info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
"Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.flow_type, self.data.peers
);
join_all(create_flow)
.await
Expand Down Expand Up @@ -306,8 +306,20 @@ impl Procedure for CreateFlowProcedure {
}
}

pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
FlowType::Batching
pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
let flow_type = flow_task
.flow_options
.get(FlowType::FLOW_TYPE_KEY)
.map(|s| s.as_str());
match flow_type {
Some(FlowType::BATCHING) => Ok(FlowType::Batching),
Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
Some(unknown) => UnexpectedSnafu {
err_msg: format!("Unknown flow type: {}", unknown),
}
.fail(),
None => Ok(FlowType::Batching),
}
}

/// The state of [CreateFlowProcedure].
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/tests/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) fn test_create_flow_task(
create_if_not_exists,
expire_after: Some(300),
comment: "".to_string(),
sql: "raw_sql".to_string(),
sql: "select 1".to_string(),
flow_options: Default::default(),
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid flow request body: {:?}", body))]
InvalidFlowRequestBody {
body: Box<Option<api::v1::flow::flow_request::Body>>,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
GetKvCache { err_msg: String },

Expand Down Expand Up @@ -863,6 +870,7 @@ impl ErrorExt for Error {
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. }
| InvalidTimeZone { .. } => StatusCode::InvalidArguments,
InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,

FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
Expand Down
77 changes: 74 additions & 3 deletions src/common/query/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ mod udaf;

use std::sync::Arc;

use api::v1::TableName;
use datafusion::catalog::CatalogProviderList;
use datafusion::error::Result as DatafusionResult;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion_common::Column;
use datafusion_expr::col;
use datafusion_common::{Column, TableReference};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{col, DmlStatement, WriteOp};
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
use snafu::ResultExt;

pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::udaf::AggregateFunction;
use crate::error::Result;
use crate::error::{GeneralDataFusionSnafu, Result};
use crate::logical_plan::accumulator::*;
use crate::signature::{Signature, Volatility};

Expand Down Expand Up @@ -79,6 +82,74 @@ pub fn rename_logical_plan_columns(
LogicalPlanBuilder::from(plan).project(projection)?.build()
}

/// Convert a insert into logical plan to an (table_name, logical_plan)
/// where table_name is the name of the table to insert into.
/// logical_plan is the plan to be executed.
///
/// if input logical plan is not `insert into table_name <input>`, return None
///
/// Returned TableName will use provided catalog and schema if not specified in the logical plan,
/// if table scan in logical plan have full table name, will **NOT** override it.
pub fn breakup_insert_plan(
plan: &LogicalPlan,
default_catalog: &str,
default_schema: &str,
) -> Option<(TableName, Arc<LogicalPlan>)> {
if let LogicalPlan::Dml(dml) = plan {
if dml.op != WriteOp::Insert(InsertOp::Append) {
return None;
}
let table_name = &dml.table_name;
let table_name = match table_name {
TableReference::Bare { table } => TableName {
catalog_name: default_catalog.to_string(),
schema_name: default_schema.to_string(),
table_name: table.to_string(),
},
TableReference::Partial { schema, table } => TableName {
catalog_name: default_catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
TableReference::Full {
catalog,
schema,
table,
} => TableName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
},
};
let logical_plan = dml.input.clone();
Some((table_name, logical_plan))
} else {
None
}
}

/// create a `insert into table_name <input>` logical plan
pub fn add_insert_to_logical_plan(
table_name: TableName,
table_schema: datafusion_common::DFSchemaRef,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let table_name = TableReference::Full {
catalog: table_name.catalog_name.into(),
schema: table_name.schema_name.into(),
table: table_name.table_name.into(),
};

let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
table_schema,
WriteOp::Insert(InsertOp::Append),
Arc::new(input),
));
let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
Ok(plan)
}

/// The datafusion `[LogicalPlan]` decoder.
#[async_trait::async_trait]
pub trait SubstraitPlanDecoder {
Expand Down
Loading
Loading