11//! This module implements SQL aggregation functions that accumulate multiple input values
22//! into a single output value. Currently only supports mysql's `GROUP_CONCAT`, postgres' `STRING_AGG`
33//! and `ARRAY_AGG`, and the various JSON object aggregation functions.
4+ //!
5+ //! Both mysql and postgres generally allow the argument to the function to be a free-wheeling `expr`,
6+ //! but we currently limit this to being a column.
7+ //!
8+ //! Some of the supported functions allow elements to be distinct and/or ordered within their result.
9+ //! For example, `select group_concat(distinct col1 order by col1 desc nulls last separator "::")`.
10+ //! Currently, the `ORDER BY` expr must either be a positional indicator and must be a value of `1`,
11+ //! or must match the column name of the function's expr (which currently must be a column).
12+
13+ use std:: cmp:: Ordering ;
414use std:: collections:: BTreeMap ;
15+ use std:: fmt:: { Display , Formatter , Result } ;
516
617use crate :: eval:: json;
718use readyset_data:: DfValue ;
8- use readyset_errors:: { internal, internal_err, ReadySetResult } ;
9- use readyset_sql:: ast:: DistinctOption ;
19+ use readyset_errors:: { internal, internal_err, unsupported, ReadySetError , ReadySetResult } ;
20+ use readyset_sql:: ast:: {
21+ Column , DistinctOption , Expr , FieldReference , FunctionExpr , NullOrder , OrderClause , OrderType ,
22+ } ;
1023use serde:: { Deserialize , Serialize } ;
1124
1225/// Supported accumulation operators.
1326#[ derive( Debug , Clone , PartialEq , Eq , Serialize , Deserialize ) ]
1427pub enum AccumulationOp {
1528 /// Concatenate values into an array. Allows NULL values in the output array.
16- ArrayAgg { distinct : DistinctOption } ,
29+ ArrayAgg {
30+ distinct : DistinctOption ,
31+ order_by : Option < ( OrderType , NullOrder ) > ,
32+ } ,
1733 /// Concatenates using the given separator between values. The string result is with the concatenated non-NULL
1834 /// values from a group. It returns NULL if there are no non-NULL values.
1935 GroupConcat {
2036 separator : String ,
2137 distinct : DistinctOption ,
38+ order_by : Option < ( OrderType , NullOrder ) > ,
2239 } ,
2340 /// Aggregates key-value pairs into JSON object.
2441 JsonObjectAgg { allow_duplicate_keys : bool } ,
@@ -28,6 +45,7 @@ pub enum AccumulationOp {
2845 StringAgg {
2946 separator : Option < String > ,
3047 distinct : DistinctOption ,
48+ order_by : Option < ( OrderType , NullOrder ) > ,
3149 } ,
3250}
3351
@@ -41,14 +59,23 @@ impl AccumulationOp {
4159
4260 fn is_distinct ( & self ) -> bool {
4361 match self {
44- Self :: ArrayAgg { distinct }
62+ Self :: ArrayAgg { distinct, .. }
4563 | Self :: GroupConcat { distinct, .. }
4664 | Self :: StringAgg { distinct, .. } => * distinct == DistinctOption :: IsDistinct ,
4765 // uniqueness is handled is slightly differently for the JSON aggregators
4866 Self :: JsonObjectAgg { .. } => false ,
4967 }
5068 }
5169
70+ fn order_by ( & self ) -> Option < ( OrderType , NullOrder ) > {
71+ match self {
72+ AccumulationOp :: ArrayAgg { order_by, .. }
73+ | AccumulationOp :: GroupConcat { order_by, .. }
74+ | AccumulationOp :: StringAgg { order_by, .. } => * order_by,
75+ AccumulationOp :: JsonObjectAgg { .. } => None ,
76+ }
77+ }
78+
5279 fn apply_array_agg ( & self , data : & AccumulatorData ) -> ReadySetResult < DfValue > {
5380 if data. is_empty ( ) {
5481 return Ok ( DfValue :: Array ( std:: sync:: Arc :: new (
@@ -65,7 +92,7 @@ impl AccumulationOp {
6592 . iter ( )
6693 . flat_map ( |( k, & count) | {
6794 let repeat_count = if self . is_distinct ( ) { 1 } else { count } ;
68- std:: iter:: repeat_n ( k. clone ( ) , repeat_count)
95+ std:: iter:: repeat_n ( k. value . clone ( ) , repeat_count)
6996 } )
7097 . collect ( ) ;
7198
@@ -172,12 +199,54 @@ pub enum AccumulatorData {
172199 /// Note: the current implementation will order keys by their natural order (`DfValue`).
173200 /// This has the effect of sorting SQL NULLs (`DfValue::None`) to the beginning of the orderings;
174201 /// for most the functions implemented here, the upstream databases default NULLs to the end of the orderings.
175- /// When we add support for `order by`, we'll need to do some surgery here anyway, so we can
176- /// resolve the default NULL ordering then, as well. (Also, fwiw, if the user didn't specify an ordering
177- /// via an `order by` clause, they get what they get and this "NULLs ordered first" is not a bug :shrug:).
178- /// Further note: the `group_concat()` and `string_agg()` functions don't output NULLs, so really this
179- /// only affects `array_agg()` which does output NULLs.
180- DistinctOrdered ( BTreeMap < DfValue , usize > ) ,
202+ /// If the user didn't specify an ordering via an `order by` clause, they get what they get and
203+ /// this "NULLs ordered first" is not a bug :shrug:). Further note: the `group_concat()` and `string_agg()`
204+ /// functions don't output NULLs, so really this only affects `array_agg()` which does output NULLs.
205+ DistinctOrdered ( BTreeMap < OrderableDfValue , usize > ) ,
206+ }
207+
208+ /// A wrapper for DfValue + order_by information.
209+ ///
210+ /// Wanted this to be a newtype wrapper, but life is hard.
211+ /// The comparison functions consider the order_by then the DfValue;
212+ /// `Eq` simply defers to the DfValue.
213+ #[ derive( Clone , Debug , Serialize , Deserialize ) ]
214+ pub struct OrderableDfValue {
215+ value : DfValue ,
216+ order_by : Option < ( OrderType , NullOrder ) > ,
217+ }
218+
219+ impl Ord for OrderableDfValue {
220+ fn cmp ( & self , other : & Self ) -> Ordering {
221+ if let Some ( ( order_type, null_order) ) = self . order_by {
222+ null_order
223+ . apply ( self . value . is_none ( ) , other. value . is_none ( ) )
224+ . then ( order_type. apply ( self . value . cmp ( & other. value ) ) )
225+ } else {
226+ self . value . cmp ( & other. value )
227+ }
228+ }
229+ }
230+
231+ impl PartialOrd for OrderableDfValue {
232+ fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
233+ Some ( self . cmp ( other) )
234+ }
235+ }
236+
237+ impl PartialEq for OrderableDfValue {
238+ fn eq ( & self , other : & Self ) -> bool {
239+ self . value == other. value
240+ }
241+ }
242+
243+ impl Eq for OrderableDfValue { }
244+
245+ impl Display for OrderableDfValue {
246+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> Result {
247+ write ! ( f, "{}" , self . value) ?;
248+ Ok ( ( ) )
249+ }
181250}
182251
183252impl AccumulatorData {
@@ -189,12 +258,14 @@ impl AccumulatorData {
189258 v. push ( value) ;
190259 }
191260 AccumulatorData :: DistinctOrdered ( v) => {
192- v. entry ( value) . and_modify ( |cnt| * cnt += 1 ) . or_insert ( 1 ) ;
261+ let order_by = op. order_by ( ) ;
262+ let key = OrderableDfValue { value, order_by } ;
263+ v. entry ( key) . and_modify ( |cnt| * cnt += 1 ) . or_insert ( 1 ) ;
193264 }
194265 }
195266 }
196267
197- pub fn remove ( & mut self , value : DfValue ) -> ReadySetResult < ( ) > {
268+ pub fn remove ( & mut self , op : & AccumulationOp , value : DfValue ) -> ReadySetResult < ( ) > {
198269 match self {
199270 AccumulatorData :: Simple ( v) => {
200271 let item_pos = v
@@ -204,10 +275,14 @@ impl AccumulatorData {
204275 v. remove ( item_pos) ;
205276 }
206277 AccumulatorData :: DistinctOrdered ( v) => {
207- match v. get_mut ( & value) {
278+ let key = OrderableDfValue {
279+ value,
280+ order_by : op. order_by ( ) ,
281+ } ;
282+ match v. get_mut ( & key) {
208283 Some ( cnt) => {
209284 if * cnt == 1 {
210- v. remove ( & value ) ;
285+ v. remove ( & key ) ;
211286 } else {
212287 * cnt -= 1 ;
213288 }
@@ -237,8 +312,14 @@ impl From<&AccumulationOp> for AccumulatorData {
237312 match value {
238313 // uniqueness is handled is slightly differently for the JSON aggregators
239314 JsonObjectAgg { .. } => AccumulatorData :: Simple ( Default :: default ( ) ) ,
240- ArrayAgg { distinct } | GroupConcat { distinct, .. } | StringAgg { distinct, .. } => {
241- if * distinct == DistinctOption :: IsDistinct {
315+ ArrayAgg { distinct, order_by }
316+ | GroupConcat {
317+ distinct, order_by, ..
318+ }
319+ | StringAgg {
320+ distinct, order_by, ..
321+ } => {
322+ if * distinct == DistinctOption :: IsDistinct || order_by. is_some ( ) {
242323 AccumulatorData :: DistinctOrdered ( Default :: default ( ) )
243324 } else {
244325 AccumulatorData :: Simple ( Default :: default ( ) )
@@ -247,3 +328,94 @@ impl From<&AccumulationOp> for AccumulatorData {
247328 }
248329 }
249330}
331+
332+ fn validate_accumulator_order_by (
333+ over_col : & Column ,
334+ order_by : & Option < OrderClause > ,
335+ ) -> ReadySetResult < Option < ( OrderType , NullOrder ) > > {
336+ match order_by {
337+ Some ( o) if o. order_by . is_empty ( ) => Ok ( None ) ,
338+ Some ( o) => {
339+ if o. order_by . len ( ) > 1 {
340+ unsupported ! ( "Multiple ORDER BY expressions not supported" )
341+ }
342+
343+ let order_by_expr = & o. order_by [ 0 ] ;
344+
345+ // Ensure the ORDER BY expr is either a position indicator (which must be 1)
346+ // or match the column name in the function.
347+ match & order_by_expr. field {
348+ FieldReference :: Numeric ( ref i) => {
349+ if * i != 1 {
350+ unsupported ! ( "ORDER BY position indicator must be '1'" )
351+ }
352+ }
353+ FieldReference :: Expr ( e) => {
354+ let order_by_col = match e {
355+ Expr :: Column ( c) => c,
356+ _ => unsupported ! ( "ORDER BY expr must be a column" ) ,
357+ } ;
358+
359+ if over_col != order_by_col {
360+ unsupported ! ( "ORDER BY column must equal the function's column: {:?}, order_by expr: {:?}" , over_col, order_by_col)
361+ }
362+ }
363+ } ;
364+ let order_type = order_by_expr. order_type . unwrap_or_default ( ) ;
365+ Ok ( Some ( ( order_type, order_by_expr. null_order ) ) )
366+ }
367+ None => Ok ( None ) ,
368+ }
369+ }
370+ impl TryFrom < & FunctionExpr > for AccumulationOp {
371+ type Error = ReadySetError ;
372+
373+ fn try_from ( fn_expr : & FunctionExpr ) -> ReadySetResult < AccumulationOp > {
374+ // Enforce only column references, not any random expr
375+ let over_col = |expr : & Expr | -> ReadySetResult < Column > {
376+ match * expr {
377+ Expr :: Column ( ref c) => Ok ( c. clone ( ) ) ,
378+ _ => unsupported ! ( "expr must be a column" ) ,
379+ }
380+ } ;
381+
382+ let op = match fn_expr {
383+ FunctionExpr :: ArrayAgg {
384+ expr,
385+ distinct,
386+ order_by,
387+ } => AccumulationOp :: ArrayAgg {
388+ distinct : * distinct,
389+ order_by : validate_accumulator_order_by ( & over_col ( expr) ?, order_by) ?,
390+ } ,
391+ FunctionExpr :: GroupConcat {
392+ expr,
393+ separator,
394+ distinct,
395+ order_by,
396+ } => AccumulationOp :: GroupConcat {
397+ separator : separator. clone ( ) . unwrap_or_else ( || "," . to_owned ( ) ) ,
398+ distinct : * distinct,
399+ order_by : validate_accumulator_order_by ( & over_col ( expr) ?, order_by) ?,
400+ } ,
401+ FunctionExpr :: JsonObjectAgg {
402+ allow_duplicate_keys,
403+ ..
404+ } => AccumulationOp :: JsonObjectAgg {
405+ allow_duplicate_keys : * allow_duplicate_keys,
406+ } ,
407+ FunctionExpr :: StringAgg {
408+ expr,
409+ separator,
410+ distinct,
411+ order_by,
412+ } => AccumulationOp :: StringAgg {
413+ separator : separator. clone ( ) ,
414+ distinct : * distinct,
415+ order_by : validate_accumulator_order_by ( & over_col ( expr) ?, order_by) ?,
416+ } ,
417+ _ => internal ! ( "Unsupported accumulation for function expr: {:?}" , fn_expr) ,
418+ } ;
419+ Ok ( op)
420+ }
421+ }
0 commit comments