@@ -2622,52 +2622,78 @@ impl SqlToMirConverter {
26222622 project_order,
26232623 ) ;
26242624
2625- let post_lookup_aggregates = if view_key. index_type == IndexType :: HashMap {
2626- // If we have aggregates under the IndexType::HashMap, they aren't necessarily
2627- // post-lookup operations. For example, `select sum(col2) from t where col1 =
2628- // ?`, the aggregate will be handled in the dataflow graph.
2629- // But if the query originally contained a `where col1 in
2630- // (?, ?)`, the aggregate does need to be executed as a
2631- // post-lookup. Adding a post-lookup is necessary for `where in` for correctly
2632- // aggregating results, but a mild perf impediment for aggregates with a simple
2633- // equality (we'll run an aggregation on a single row). However, we've lost the
2634- // "did this come from a `where in` information" way above, as it's rewritten in
2635- // the adapter. Hence, to avoid that penalty on all users,
2636- // only add the post-lookup to users who have opted in to
2637- // using post-lookups.
2625+ let order_by = query_graph
2626+ . order
2627+ . as_ref ( )
2628+ . map ( |order| order. iter ( ) . map ( |( c, ot) | ( Column :: from ( c) , * ot) ) . collect ( ) ) ;
2629+
2630+ let mut limit = query_graph. pagination . as_ref ( ) . map ( |p| p. limit ) ;
2631+ let offset = query_graph. pagination . as_ref ( ) . and_then ( |p| p. offset ) ;
2632+ let is_topk_query = order_by. is_some ( ) && limit. is_some ( ) && offset. is_none ( ) ;
2633+ let is_range_query = view_key. index_type == IndexType :: BTreeMap ;
2634+
2635+ let are_post_lookups_required = query_graph. collapsed_where_in
2636+ || ( order_by. is_some ( ) && !is_topk_query)
2637+ || ( is_topk_query && !self . config . allow_topk )
2638+ || is_range_query;
2639+
2640+ let mut post_lookup_aggregates = if are_post_lookups_required {
2641+ // When a query contains WHERE col IN (?, ?, ...), it gets rewritten
2642+ // (or collapsed) to WHERE col = ? during SQL parsing, with the
2643+ // collapsed_where_in flag set to indicate this transformation.
2644+ //
2645+ // This creates a correctness issue for aggregates: the original multi-value IN clause
2646+ // should aggregate across all matching rows, but the rewritten single-value equality
2647+ // will only see one row at a time. To fix this, we need post-lookup aggregation that
2648+ // combines results from multiple point lookups.
2649+ //
2650+ // Example:
2651+ // Original: SELECT sum(amount) FROM orders WHERE id IN (1, 2, 3)
2652+ // Rewritten: SELECT sum(amount) FROM orders WHERE id = ? (executed 3 times)
2653+ // Solution: Sum the results from each execution via post-lookup aggregation
2654+ //
2655+ // Another scenario is when aggregated results are used in an order by clause
2656+ // without a topk node (either because the query didn't have a limit or the
2657+ // feature wasn't enabled). In this case, we also need post-lookup aggregation
2658+ // for correctness.
2659+ //
2660+ // And obviously if the query is a range query, we need post-lookup aggregation
2661+ // since we can't precompute aggregations over different ranges.
2662+ //
2663+ // Note: Post-lookup operations have performance overhead, so they're gated behind
2664+ // the allow_post_lookup config flag.
26382665 if self . config . allow_post_lookup {
2639- match post_lookup_aggregates ( query_graph, query_name) {
2640- Ok ( aggs) => aggs,
2641- // This part is a hack. When we get an ReadySetError::Unsupported,
2642- // that is because the aggregate was a AVG, COUNT(DISTINCT..), or
2643- // SUM(DISTINCT..). We can only support those (currently!) when the
2644- // query contained an equality clause, and
2645- // not a `where in` clause (that was
2646- // rewritten as an equality). As mentioned above, we don't know which
2647- // one the original query had, thus this
2648- // code opts to preserve the functionality
2649- // of the simple equality. Once again, this only applies if the user
2650- // opted in to using "experimental"
2651- // post-lookups.
2652- Err ( ReadySetError :: Unsupported ( ..) ) => None ,
2653- Err ( e) => return Err ( e) ,
2654- }
2666+ post_lookup_aggregates ( query_graph, query_name) ?
26552667 } else {
2656- None
2668+ unsupported ! (
2669+ "Queries which perform operations post-lookup are not supported"
2670+ ) ;
26572671 }
26582672 } else {
2659- post_lookup_aggregates ( query_graph , query_name ) ?
2673+ None
26602674 } ;
26612675
2662- let order_by = query_graph
2663- . order
2664- . as_ref ( )
2665- . map ( |order| order. iter ( ) . map ( |( c, ot) | ( Column :: from ( c) , * ot) ) . collect ( ) ) ;
2666-
2667- let limit = query_graph. pagination . as_ref ( ) . map ( |p| p. limit ) ;
2676+ // If the query is a topk query, and the user has opted in to using
2677+ // topk feature, remove the limit, order by and agg from the post-lookup
2678+ // operations UNLESS the original query had a WHERE IN. In that case,
2679+ // post-lookups are required for correctness.
2680+ if self . config . allow_topk && is_topk_query && !are_post_lookups_required {
2681+ limit = None ;
2682+ post_lookup_aggregates = None ;
2683+ // TODO: even though we are doing topk, we still need the reader
2684+ // to order stuff. Becuase TopK communictes the diff,
2685+ // and the reader keeps the values in ASC order if ORDER BY
2686+ // is not specified. Please refer to [reader_map::Values] struct.
2687+ // order_by = None;
2688+ }
26682689
2690+ // order_by is required by almost all queries. Only complain if it does
2691+ // aggregations as well.
2692+ // If a query contains just a limit without an order by, the adapter will
2693+ // automatically remove the limit and keep it for itself, so the server won't have
2694+ // to worry about it.
26692695 if !self . config . allow_post_lookup
2670- && ( post_lookup_aggregates. is_some ( ) || order_by. is_some ( ) || limit. is_some ( ) )
2696+ && ( ( post_lookup_aggregates. is_some ( ) && order_by. is_some ( ) ) || limit. is_some ( ) )
26712697 {
26722698 unsupported ! ( "Queries which perform operations post-lookup are not supported" ) ;
26732699 }
@@ -2740,3 +2766,195 @@ impl SqlToMirConverter {
27402766 Ok ( leaf)
27412767 }
27422768}
2769+
2770+ #[ cfg( test) ]
2771+ mod tests {
2772+ use std:: collections:: HashMap ;
2773+
2774+ use crate :: {
2775+ controller:: sql:: mir:: SqlToMirConverter ,
2776+ sql:: mir:: { Config , LeafBehavior } ,
2777+ } ;
2778+ use mir:: node:: MirNodeInner ;
2779+ use mir:: NodeIndex ;
2780+ use readyset_errors:: ReadySetResult ;
2781+ use readyset_sql:: ast:: { Column , ColumnSpecification , Relation , SelectMetadata , SqlType } ;
2782+
2783+ use crate :: controller:: sql:: query_graph:: to_query_graph;
2784+ use readyset_sql_parsing:: parse_select;
2785+
2786+ fn sql_to_mir_test (
2787+ name : & str ,
2788+ qg : crate :: sql:: query_graph:: QueryGraph ,
2789+ ) -> ReadySetResult < ( SqlToMirConverter , NodeIndex ) > {
2790+ let mut converter = SqlToMirConverter :: default ( ) ;
2791+ converter. set_config ( Config {
2792+ allow_topk : true ,
2793+ allow_post_lookup : true ,
2794+ ..Default :: default ( )
2795+ } ) ;
2796+
2797+ let _ = converter. make_base_node (
2798+ & Relation :: from ( "topk_test" ) ,
2799+ & [
2800+ ColumnSpecification {
2801+ column : Column :: from ( "topk_test.a" ) ,
2802+ sql_type : SqlType :: Int ( None ) ,
2803+ generated : None ,
2804+ constraints : vec ! [ ] ,
2805+ comment : None ,
2806+ } ,
2807+ ColumnSpecification {
2808+ column : Column :: from ( "topk_test.b" ) ,
2809+ sql_type : SqlType :: Int ( None ) ,
2810+ generated : None ,
2811+ constraints : vec ! [ ] ,
2812+ comment : None ,
2813+ } ,
2814+ ColumnSpecification {
2815+ column : Column :: from ( "topk_test.c" ) ,
2816+ sql_type : SqlType :: Int ( None ) ,
2817+ generated : None ,
2818+ constraints : vec ! [ ] ,
2819+ comment : None ,
2820+ } ,
2821+ ] ,
2822+ None ,
2823+ ) ?;
2824+
2825+ let node = converter. named_query_to_mir (
2826+ & Relation :: from ( name) ,
2827+ & qg,
2828+ & HashMap :: new ( ) ,
2829+ LeafBehavior :: Leaf ,
2830+ ) ?;
2831+
2832+ Ok ( ( converter, node) )
2833+ }
2834+
2835+ macro_rules! test_topk_scenario {
2836+ (
2837+ name: $test_name: ident,
2838+ query: $query_str: literal,
2839+ query_name: $query_name: literal,
2840+ collapsed_where_in: $collapsed: expr,
2841+ expect_leaf: {
2842+ aggregates: $expect_agg: expr,
2843+ order_by: $expect_order: expr,
2844+ limit: $expect_limit: expr
2845+ } ,
2846+ expect_topk_node: $expect_topk: expr
2847+ ) => {
2848+ #[ test]
2849+ fn $test_name( ) -> ReadySetResult <( ) > {
2850+ let mut query =
2851+ parse_select( readyset_sql:: Dialect :: PostgreSQL , $query_str) . unwrap( ) ;
2852+
2853+ if $collapsed {
2854+ query. metadata. push( SelectMetadata :: CollapsedWhereIn ) ;
2855+ }
2856+
2857+ let qg = to_query_graph( query) . unwrap( ) ;
2858+ let ( mut converter, node) = sql_to_mir_test( $query_name, qg) ?;
2859+ let query = converter. make_mir_query( $query_name. into( ) , node) ;
2860+
2861+ // Check leaf node properties
2862+ if let MirNodeInner :: Leaf {
2863+ aggregates,
2864+ order_by,
2865+ limit,
2866+ ..
2867+ } = & query. get_node( node) . unwrap( ) . inner
2868+ {
2869+ assert_eq!( aggregates. is_some( ) , $expect_agg, "aggregates mismatch" ) ;
2870+ assert_eq!( order_by. is_some( ) , $expect_order, "order_by mismatch" ) ;
2871+ assert_eq!( limit. is_some( ) , $expect_limit, "limit mismatch" ) ;
2872+ } else {
2873+ panic!( "Expected leaf node" ) ;
2874+ }
2875+
2876+ // Check for TopK node existence
2877+ let mut has_topk = false ;
2878+ for node in query. topo_nodes( ) {
2879+ if let MirNodeInner :: TopK { .. } = & query. get_node( node) . unwrap( ) . inner {
2880+ has_topk = true ;
2881+ break ;
2882+ }
2883+ }
2884+
2885+ if $expect_topk {
2886+ assert!( has_topk, "topk node not found" ) ;
2887+ } else {
2888+ assert!( !has_topk, "unexpected topk node found" ) ;
2889+ }
2890+
2891+ Ok ( ( ) )
2892+ }
2893+ } ;
2894+ }
2895+
2896+ test_topk_scenario ! {
2897+ name: topk_node_exists,
2898+ query: "SELECT a FROM topk_test ORDER BY b LIMIT 3" ,
2899+ query_name: "q1" ,
2900+ collapsed_where_in: false ,
2901+ expect_leaf: {
2902+ aggregates: false ,
2903+ order_by: true ,
2904+ limit: false
2905+ } ,
2906+ expect_topk_node: true
2907+ }
2908+
2909+ test_topk_scenario ! {
2910+ name: topk_node_exists_with_where_in,
2911+ query: "SELECT a FROM topk_test WHERE b = 1 ORDER BY c LIMIT 3" ,
2912+ query_name: "q1" ,
2913+ collapsed_where_in: true ,
2914+ expect_leaf: {
2915+ aggregates: false ,
2916+ order_by: true ,
2917+ limit: true
2918+ } ,
2919+ expect_topk_node: true
2920+ }
2921+
2922+ test_topk_scenario ! {
2923+ name: aggregate_with_where_in,
2924+ query: "SELECT sum(topk_test.a) FROM topk_test WHERE b = 5 GROUP BY c ORDER BY b" ,
2925+ query_name: "q2" ,
2926+ collapsed_where_in: true ,
2927+ expect_leaf: {
2928+ aggregates: true ,
2929+ order_by: true ,
2930+ limit: false
2931+ } ,
2932+ expect_topk_node: false
2933+ }
2934+
2935+ test_topk_scenario ! {
2936+ name: topk_without_where_in,
2937+ query: "SELECT avg(topk_test.a) FROM topk_test WHERE topk_test.b = 5 GROUP BY topk_test.c ORDER BY topk_test.b LIMIT 10" ,
2938+ query_name: "q2" ,
2939+ collapsed_where_in: false ,
2940+ expect_leaf: {
2941+ aggregates: false ,
2942+ order_by: true ,
2943+ limit: false
2944+ } ,
2945+ expect_topk_node: true
2946+ }
2947+
2948+ test_topk_scenario ! {
2949+ name: topk_with_where_in,
2950+ query: "SELECT a FROM topk_test WHERE b = 5 ORDER BY a LIMIT 10" ,
2951+ query_name: "q1" ,
2952+ collapsed_where_in: true ,
2953+ expect_leaf: {
2954+ aggregates: false ,
2955+ order_by: true ,
2956+ limit: true
2957+ } ,
2958+ expect_topk_node: true
2959+ }
2960+ }
0 commit comments