@@ -2,6 +2,7 @@ use std::collections::hash_map::DefaultHasher;
22use std:: collections:: HashMap ;
33use std:: hash:: { Hash , Hasher } ;
44
5+ use dataflow_expression:: eval:: json;
56use readyset_data:: dialect:: SqlEngine ;
67use readyset_data:: { Collation , DfType , Dialect } ;
78use readyset_errors:: { invariant, ReadySetResult } ;
@@ -22,7 +23,13 @@ pub enum Aggregation {
2223 /// Average the value of the `over` column. Maintains count and sum in HashMap
2324 Avg ,
2425 /// Concatenates using the given separator between values.
25- GroupConcat { separator : String } ,
26+ GroupConcat {
27+ separator : String ,
28+ } ,
29+
30+ JsonObjectAgg {
31+ allow_duplicate_keys : bool ,
32+ } ,
2633}
2734
2835impl Aggregation {
@@ -71,6 +78,7 @@ impl Aggregation {
7178 }
7279 }
7380 Aggregation :: GroupConcat { .. } => DfType :: Text ( /* TODO */ Collation :: default ( ) ) ,
81+ Aggregation :: JsonObjectAgg { .. } => DfType :: Text ( Collation :: default ( ) ) ,
7482 } ;
7583
7684 Ok ( GroupedOperator :: new (
@@ -150,6 +158,12 @@ impl AverageDataPair {
150158/// Auxiliary State for an Aggregator node, which is owned by a Domain
151159pub struct AggregatorState {
152160 count_sum_map : HashMap < GroupHash , AverageDataPair > ,
161+
162+ // Store all `json_object_agg` keys and values in vecs and compute the json from them
163+ // on-the-fly. This allows for easier handling of distinct (jsonb) behaviour,
164+ // especially with deletions.
165+ json_agg_keys : Vec < DfValue > ,
166+ json_agg_vals : Vec < DfValue > ,
153167}
154168
155169impl Aggregator {
@@ -222,10 +236,12 @@ impl GroupedOperation for Aggregator {
222236 }
223237 } ;
224238
225- let count_sum_map = match auxiliary_node_state {
226- Some ( AuxiliaryNodeState :: Aggregation ( ref mut aggregator_state) ) => {
227- & mut aggregator_state. count_sum_map
228- }
239+ let ( count_sum_map, json_agg_keys, json_agg_vals) = match auxiliary_node_state {
240+ Some ( AuxiliaryNodeState :: Aggregation ( ref mut aggregator_state) ) => (
241+ & mut aggregator_state. count_sum_map ,
242+ & mut aggregator_state. json_agg_keys ,
243+ & mut aggregator_state. json_agg_vals ,
244+ ) ,
229245 Some ( _) => internal ! ( "Incorrect auxiliary state for Aggregation node" ) ,
230246 None => internal ! ( "Missing auxiliary state for Aggregation node" ) ,
231247 } ;
@@ -240,6 +256,42 @@ impl GroupedOperation for Aggregator {
240256 . apply_diff ( diff)
241257 } ;
242258
259+ let mut apply_json_object_agg =
260+ |_curr, diff : Self :: Diff , allow_dups| -> ReadySetResult < DfValue > {
261+ let ( key, value) = diff
262+ . value
263+ . to_json ( ) ?
264+ . as_object ( )
265+ . ok_or_else ( || {
266+ internal_err ! ( "json_object_agg: json_object value is not an object" )
267+ } ) ?
268+ . iter ( )
269+ . next ( )
270+ . ok_or_else ( || internal_err ! ( "json_object_agg: json_object is empty" ) )
271+ . map ( |( k, v) | ( DfValue :: from ( k. as_str ( ) ) , DfValue :: from ( v) ) ) ?;
272+
273+ if diff. positive {
274+ json_agg_keys. push ( key) ;
275+ json_agg_vals. push ( value. clone ( ) ) ;
276+ } else if let Some ( pos) = json_agg_keys
277+ . iter ( )
278+ . zip ( json_agg_vals. iter_mut ( ) )
279+ . position ( |( k, v) | k == & key && v == & value)
280+ {
281+ json_agg_keys. remove ( pos) ;
282+ json_agg_vals. remove ( pos) ;
283+ } else {
284+ internal ! ( "json_object_agg: diff removed a non-existant key-value pair" )
285+ }
286+
287+ // TODO: Indent the output
288+ json:: json_object_from_keys_and_values (
289+ & json_agg_keys. clone ( ) . into ( ) ,
290+ & json_agg_vals. clone ( ) . into ( ) ,
291+ allow_dups,
292+ )
293+ } ;
294+
243295 let apply_diff =
244296 |curr : ReadySetResult < DfValue > , diff : Self :: Diff | -> ReadySetResult < DfValue > {
245297 if diff. value . is_none ( ) {
@@ -253,6 +305,9 @@ impl GroupedOperation for Aggregator {
253305 Aggregation :: GroupConcat { separator : _ } => internal ! (
254306 "GroupConcats are separate from the other aggregations in the dataflow."
255307 ) ,
308+ Aggregation :: JsonObjectAgg {
309+ allow_duplicate_keys,
310+ } => apply_json_object_agg ( curr?, diff, allow_duplicate_keys) ,
256311 }
257312 } ;
258313
@@ -270,6 +325,15 @@ impl GroupedOperation for Aggregator {
270325 Aggregation :: GroupConcat { separator : ref s } => {
271326 format ! ( "||({})" , s)
272327 }
328+ Aggregation :: JsonObjectAgg {
329+ allow_duplicate_keys,
330+ } => {
331+ if allow_duplicate_keys {
332+ "JsonObjectAgg" . to_owned ( )
333+ } else {
334+ "JsonbObjectAgg" . to_owned ( )
335+ }
336+ }
273337 } ;
274338 }
275339
@@ -278,6 +342,15 @@ impl GroupedOperation for Aggregator {
278342 Aggregation :: Sum => format ! ( "𝛴({})" , self . over) ,
279343 Aggregation :: Avg => format ! ( "Avg({})" , self . over) ,
280344 Aggregation :: GroupConcat { separator : ref s } => format ! ( "||({}, {})" , s, self . over) ,
345+ Aggregation :: JsonObjectAgg {
346+ allow_duplicate_keys,
347+ } => {
348+ if allow_duplicate_keys {
349+ format ! ( "JsonObjectAgg({})" , self . over)
350+ } else {
351+ format ! ( "JsonbObjectAgg({})" , self . over)
352+ }
353+ }
281354 } ;
282355 let group_cols = self
283356 . group
0 commit comments