Skip to content

feat: support WITHIN filter #5397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sysinfo = "0.30"
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [
sqlparser = { git = "https://github.com/NiwakaDev/sqlparser-rs", rev = "daf8c33234f1b4568824b32efcc5d611091c4834", features = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the repo now?

"visitor",
"serde",
] } # on branch v0.44.x
Expand Down
37 changes: 37 additions & 0 deletions src/common/function/src/scalars/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl MathFunction {
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(WithinFilterFunction));
}
}

Expand Down Expand Up @@ -87,3 +88,39 @@ impl Function for RangeFunction {
.context(GeneralDataFusionSnafu)
}
}

#[derive(Clone, Debug, Default)]
struct WithinFilterFunction;

impl fmt::Display for WithinFilterFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WithinFilterFunction")
}
}

pub const WITHIN_FILTER_NAME: &str = "within_filter";

impl Function for WithinFilterFunction {
fn name(&self) -> &str {
WITHIN_FILTER_NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}

fn signature(&self) -> Signature {
Signature::uniform(
2,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
Err(DataFusionError::Internal(
"within_filter function just a empty function, it should not be eval!".into(),
Copy link
Preview

Copilot AI Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is unclear. It should be changed to 'The within_filter function is a placeholder and should not be evaluated.'

Suggested change
"within_filter function just a empty function, it should not be eval!".into(),
"The within_filter function is a placeholder and should not be evaluated.".into(),

Copilot uses AI. Check for mistakes.

))
.context(GeneralDataFusionSnafu)
}
}
1 change: 0 additions & 1 deletion src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ impl MergeScanExec {
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_channel = self.query_ctx.channel();

let stream = Box::pin(stream!({
// only report metrics once for each MergeScan
if partition == 0 {
Expand Down
11 changes: 10 additions & 1 deletion src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Within filter interval error: {}", message))]
WithinFilterInternal {
message: String,
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
Expand Down Expand Up @@ -370,7 +377,9 @@ impl ErrorExt for Error {

RegionQuery { source, .. } => source.status_code(),
TableMutation { source, .. } => source.status_code(),
MissingTableMutationHandler { .. } => StatusCode::Unexpected,
WithinFilterInternal { .. } | MissingTableMutationHandler { .. } => {
StatusCode::Unexpected
}
GetRegionMetadata { .. } => StatusCode::RegionNotReady,
TableReadOnly { .. } => StatusCode::Unsupported,
GetFulltextOptions { source, .. } | GetSkippingIndexOptions { source, .. } => {
Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod region_query;
pub mod sql;
pub mod stats;
pub(crate) mod window_sort;
mod within_filter;

#[cfg(test)]
pub(crate) mod test_util;
Expand Down
1 change: 0 additions & 1 deletion src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = Some(region_scan_exec.time_index());
tag_columns = Some(region_scan_exec.tag_columns());

// set distinguish_partition_ranges to true, this is an incorrect workaround
if !is_batch_coalesced {
region_scan_exec.with_distinguish_partition_range(true);
Expand Down
1 change: 1 addition & 0 deletions src/query/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl DfLogicalPlanner {
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;
common_telemetry::debug!("Logical planner, statement to plan result: {result}");

let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
.rewrite(result)
.await?;
Expand Down
7 changes: 6 additions & 1 deletion src/query/src/query_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::query_engine::options::QueryOptions;
use crate::query_engine::DefaultSerializer;
use crate::range_select::planner::RangeSelectPlanner;
use crate::region_query::RegionQueryHandlerRef;
use crate::within_filter::WithinFilterRule;
use crate::QueryEngineContext;

/// Query engine global state
Expand Down Expand Up @@ -93,10 +94,14 @@ impl QueryEngineState {
let runtime_env = Arc::new(RuntimeEnv::default());
let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
// Apply extension rules
let mut extension_rules = Vec::new();
// TODO: remove Vec<Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>>
let mut extension_rules: Vec<
Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>,
> = Vec::new();

// The [`TypeConversionRule`] must be at first
extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
extension_rules.push(Arc::new(WithinFilterRule::new()));
Comment on lines +97 to +104
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: remove Vec<Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>>
let mut extension_rules: Vec<
Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>,
> = Vec::new();
// The [`TypeConversionRule`] must be at first
extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
extension_rules.push(Arc::new(WithinFilterRule::new()));
// TODO: remove Vec<Arc<(dyn ExtensionAnalyzerRule + Send + Sync)>>
let extension_rules: Vec<Arc<(dyn ExtensionAnalyzerRule + Send + Sync)>> = vec![
// The [`TypeConversionRule`] must be at first
Arc::new(TypeConversionRule),
Arc::new(WithinFilterRule),
];


// Apply the datafusion rules
let mut analyzer = Analyzer::new();
Expand Down
Loading