From e66b1ebadc096ef966df34202864c6168851df22 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Mar 2020 14:30:55 -0400 Subject: [PATCH 1/5] initial commit --- examples/upsert.rs | 51 +++++++ src/operators/arrange/mod.rs | 2 + src/operators/arrange/upsert.rs | 251 ++++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+) create mode 100644 examples/upsert.rs create mode 100644 src/operators/arrange/upsert.rs diff --git a/examples/upsert.rs b/examples/upsert.rs new file mode 100644 index 0000000000..b1130acaf2 --- /dev/null +++ b/examples/upsert.rs @@ -0,0 +1,51 @@ +extern crate timely; +extern crate differential_dataflow; + +fn main() { + + // define a new timely dataflow computation. + timely::execute_from_args(std::env::args().skip(1), move |worker| { + + type Key = String; + type Val = String; + + let mut input = timely::dataflow::InputHandle::, usize)>::new(); + let mut probe = timely::dataflow::ProbeHandle::new(); + + // create a dataflow managing an ever-changing edge collection. + worker.dataflow::(|scope| { + + use timely::dataflow::operators::Input; + use differential_dataflow::trace::implementations::ord::OrdValSpine; + use differential_dataflow::operators::arrange::upsert; + + let stream = scope.input_from(&mut input); + let arranged = upsert::arrange_from_upsert::<_, OrdValSpine>(&stream, &"test"); + + arranged + .as_collection(|k,v| (k.clone(), v.clone())) + .inspect(|x| println!("Observed: {:?}", x)) + .probe_with(&mut probe); + }); + + input.send(("frank".to_string(), Some("mcsherry".to_string()), 3)); + input.advance_to(4); + while probe.less_than(input.time()) { worker.step(); } + + input.send(("frank".to_string(), Some("zappa".to_string()), 4)); + input.advance_to(5); + while probe.less_than(input.time()) { worker.step(); } + + input.send(("frank".to_string(), None, 5)); + input.advance_to(9); + while probe.less_than(input.time()) { worker.step(); } + + input.send(("frank".to_string(), Some("oz".to_string()), 9)); + input.advance_to(10); + while probe.less_than(input.time()) { worker.step(); } + + input.send(("frank".to_string(), None, 15)); + input.close(); + + }).unwrap(); +} diff --git a/src/operators/arrange/mod.rs b/src/operators/arrange/mod.rs index caac946a16..c937e938e3 100644 --- a/src/operators/arrange/mod.rs +++ b/src/operators/arrange/mod.rs @@ -66,6 +66,8 @@ pub mod writer; pub mod agent; pub mod arrangement; +pub mod upsert; + pub use self::writer::TraceWriter; pub use self::agent::{TraceAgent, ShutdownButton}; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs new file mode 100644 index 0000000000..fac40f4147 --- /dev/null +++ b/src/operators/arrange/upsert.rs @@ -0,0 +1,251 @@ +//! Support for upsert based collections. + +use std::collections::{BinaryHeap, HashMap}; + +use timely::order::{PartialOrder, TotalOrder}; +use timely::dataflow::{Scope, Stream}; +use timely::dataflow::operators::generic::Operator; +use timely::dataflow::channels::pact::Exchange; +use timely::progress::Timestamp; +use timely::progress::Antichain; +use timely::dataflow::operators::Capability; + +use timely_sort::Unsigned; + +use ::{ExchangeData, Hashable}; +use lattice::Lattice; +use trace::{Trace, TraceReader, Batch, Cursor}; + +use trace::Builder; + +use operators::arrange::arrangement::Arranged; + +use super::TraceAgent; + +/// Arrange data from a stream of keyed upserts. +/// +/// The input should be a stream of timestamped pairs of Key and Option. +/// The contents of the collection are defined key-by-key, where each optional +/// value in sequence either replaces or removes the existing value, should it +/// exist. +/// +/// This method is only implemented for totally ordered times, as we do not yet +/// understand what a "sequence" of upserts would mean for partially ordered +/// timestamps. +pub fn arrange_from_upsert(stream: &Stream, G::Timestamp)>, name: &str) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData, + Tr::Key: ExchangeData+Hashable+std::hash::Hash, + Tr::Val: ExchangeData, + Tr: Trace+TraceReader+'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, +{ + let mut reader: Option> = None; + + // fabricate a data-parallel operator using the `unary_notify` pattern. + let stream = { + + let reader = &mut reader; + + let exchange = Exchange::new(move |update: &(Tr::Key,Option,G::Timestamp)| (update.0).hashed().as_u64()); + + stream.unary_frontier(exchange, name, move |_capability, info| { + + // Acquire a logger for arrange events. + let logger = { + let scope = stream.scope(); + let register = scope.log_register(); + register.get::<::logging::DifferentialEvent>("differential/arrange") + }; + + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); + + let mut buffer = Vec::new(); + + let (activator, effort) = + if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") { + let effort = text.parse::().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer"); + (Some(stream.scope().activator_for(&info.address[..])), Some(effort)) + } + else { + (None, None) + }; + + let empty_trace = Tr::new(info.clone(), logger.clone(), activator); + let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + + *reader = Some(reader_local.clone()); + + // Initialize to the minimal input frontier. + // Tracks the upper bound of minted batches, use to populate the lower bound of new batches. + let mut input_frontier = Antichain::from_elem(::minimum()); + + // For stashing input upserts, ordered primarily by time (then by key, but we'll + // need to re-sort them anyhow). + let mut priority_queue = BinaryHeap::)>>::new(); + + move |input, output| { + + // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. + // We don't have to keep all capabilities, but we need to be able to form output messages + // when we realize that time intervals are complete. + + input.for_each(|cap, data| { + capabilities.insert(cap.retain()); + data.swap(&mut buffer); + for (key, val, time) in buffer.drain(..) { + priority_queue.push(std::cmp::Reverse((time, key, val))) + } + }); + + // The frontier may have advanced by multiple elements, which is an issue because + // timely dataflow currently only allows one capability per message. This means we + // must pretend to process the frontier advances one element at a time, batching + // and sending smaller bites than we might have otherwise done. + + // Assert that the frontier never regresses. + assert!(input.frontier().frontier().iter().all(|t1| input_frontier.elements().iter().any(|t2: &G::Timestamp| t2.less_equal(t1)))); + + // Test to see if strict progress has occurred (any of the old frontier less equal + // to the new frontier). + let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2)); + + if progress { + + // There are two cases to handle with some care: + // + // 1. If any held capabilities are not in advance of the new input frontier, + // we must carve out updates now in advance of the new input frontier and + // transmit them as batches, which requires appropriate *single* capabilites; + // Until timely dataflow supports multiple capabilities on messages, at least. + // + // 2. If there are no held capabilities in advance of the new input frontier, + // then there are no updates not in advance of the new input frontier and + // we can simply create an empty input batch with the new upper frontier + // and feed this to the trace agent (but not along the timely output). + + // If there is at least one capability not in advance of the input frontier ... + if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + + let mut upper = Antichain::new(); // re-used allocation for sealing batches. + + // For each capability not in advance of the input frontier ... + for (index, capability) in capabilities.elements().iter().enumerate() { + + if !input.frontier().less_equal(capability.time()) { + + // Assemble the upper bound on times we can commit with this capabilities. + // We must respect the input frontier, and *subsequent* capabilities, as + // we are pretending to retire the capability changes one by one. + upper.clear(); + for time in input.frontier().frontier().iter() { + upper.insert(time.clone()); + } + for other_capability in &capabilities.elements()[(index + 1) .. ] { + upper.insert(other_capability.time().clone()); + } + + // START NEW CODE + // Extract upserts available to process as of this `upper`. + let mut to_process = HashMap::new(); + while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) { + let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty"); + to_process.entry(key).or_insert(Vec::new()).push((time, val)); + } + + let mut builder = >::Builder::new(); + let (mut trace_cursor, trace_storage) = reader_local.cursor(); + for (key, mut list) in to_process.drain() { + + let mut prev_value: Option = None; + + // Attempt to find the key in the trace. + trace_cursor.seek_key(&trace_storage, &key); + if trace_cursor.get_key(&trace_storage) == Some(&key) { + // Determine the prior value associated with the key. + while let Some(val) = trace_cursor.get_val(&trace_storage) { + let mut count = 0; + trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff); + assert!(count == 0 || count == 1); + if count == 1 { + assert!(prev_value.is_none()); + prev_value = Some(val.clone()); + } + trace_cursor.step_val(&trace_storage); + } + trace_cursor.step_key(&trace_storage); + } + + list.sort(); + // Two updates at the exact same time should produce only one actual + // change, ideally to a deterministically chosen value. + let mut cursor = 1; + while cursor < list.len() { + if list[cursor-1].0 == list[cursor].0 { + list.remove(cursor); + } + else { + cursor += 1; + } + } + // Process distinct times + for (time, next) in list { + if let Some(prev) = prev_value { + builder.push((key.clone(), prev, time.clone(), -1)); + } + if let Some(next) = next.as_ref() { + builder.push((key.clone(), next.clone(), time.clone(), 1)); + } + prev_value = next; + } + } + let batch = builder.done(input_frontier.elements(), upper.elements(), &[G::Timestamp::minimum()]); + input_frontier.clone_from(&upper); + + // END NEW CODE + writer.insert(batch.clone(), Some(capability.time().clone())); + + // send the batch to downstream consumers, empty or not. + output.session(&capabilities.elements()[index]).give(batch); + } + } + + // Having extracted and sent batches between each capability and the input frontier, + // we should downgrade all capabilities to match the batcher's lower update frontier. + // This may involve discarding capabilities, which is fine as any new updates arrive + // in messages with new capabilities. + + let mut new_capabilities = Antichain::new(); + if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() { + if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + new_capabilities.insert(capability.delayed(time)); + } + else { + panic!("failed to find capability"); + } + } + + capabilities = new_capabilities; + } + else { + // Announce progress updates, even without data. + writer.seal(&input.frontier().frontier()[..]); + } + + input_frontier.clear(); + input_frontier.extend(input.frontier().frontier().iter().cloned()); + } + + if let Some(mut fuel) = effort.clone() { + writer.exert(&mut fuel); + } + } + }) + }; + + Arranged { stream: stream, trace: reader.unwrap() } + +} From 221a41a7403d3fa623b9bacbe7e2bc33d6bafaab Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Mar 2020 16:51:04 -0400 Subject: [PATCH 2/5] tidy; downgrade capabilities --- src/operators/arrange/upsert.rs | 87 +++++++++++---------------------- 1 file changed, 29 insertions(+), 58 deletions(-) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index fac40f4147..bef8e4dd5d 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -60,11 +60,7 @@ where register.get::<::logging::DifferentialEvent>("differential/arrange") }; - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - let mut buffer = Vec::new(); - + // Establish compaction effort to apply even without updates. let (activator, effort) = if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") { let effort = text.parse::().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer"); @@ -74,25 +70,24 @@ where (None, None) }; + // Tracks the lower envelope of times in `priority_queue`. + let mut capabilities = Antichain::>::new(); + let mut buffer = Vec::new(); + // Form the trace we will both use internally and publish. let empty_trace = Tr::new(info.clone(), logger.clone(), activator); let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - + // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); - // Initialize to the minimal input frontier. - // Tracks the upper bound of minted batches, use to populate the lower bound of new batches. + // Tracks the input frontier, used to populate the lower bound of new batches. let mut input_frontier = Antichain::from_elem(::minimum()); - // For stashing input upserts, ordered primarily by time (then by key, but we'll - // need to re-sort them anyhow). + // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). let mut priority_queue = BinaryHeap::)>>::new(); move |input, output| { - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. - + // Stash capabilities and associated data (ordered by time). input.for_each(|cap, data| { capabilities.insert(cap.retain()); data.swap(&mut buffer); @@ -101,32 +96,12 @@ where } }); - // The frontier may have advanced by multiple elements, which is an issue because - // timely dataflow currently only allows one capability per message. This means we - // must pretend to process the frontier advances one element at a time, batching - // and sending smaller bites than we might have otherwise done. - - // Assert that the frontier never regresses. - assert!(input.frontier().frontier().iter().all(|t1| input_frontier.elements().iter().any(|t2: &G::Timestamp| t2.less_equal(t1)))); - - // Test to see if strict progress has occurred (any of the old frontier less equal - // to the new frontier). + // Test to see if strict progress has occurred, which happens whenever any element of + // the old frontier is not greater or equal to the new frontier. It is only in this + // case that we have any data processing to do. let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2)); - if progress { - // There are two cases to handle with some care: - // - // 1. If any held capabilities are not in advance of the new input frontier, - // we must carve out updates now in advance of the new input frontier and - // transmit them as batches, which requires appropriate *single* capabilites; - // Until timely dataflow supports multiple capabilities on messages, at least. - // - // 2. If there are no held capabilities in advance of the new input frontier, - // then there are no updates not in advance of the new input frontier and - // we can simply create an empty input batch with the new upper frontier - // and feed this to the trace agent (but not along the timely output). - // If there is at least one capability not in advance of the input frontier ... if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { @@ -148,7 +123,6 @@ where upper.insert(other_capability.time().clone()); } - // START NEW CODE // Extract upserts available to process as of this `upper`. let mut to_process = HashMap::new(); while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) { @@ -160,6 +134,7 @@ where let (mut trace_cursor, trace_storage) = reader_local.cursor(); for (key, mut list) in to_process.drain() { + // The prior value associated with the key. let mut prev_value: Option = None; // Attempt to find the key in the trace. @@ -179,36 +154,27 @@ where trace_cursor.step_key(&trace_storage); } + // Sort the list of upserts to `key` by their time, suppress multiple updates. list.sort(); - // Two updates at the exact same time should produce only one actual - // change, ideally to a deterministically chosen value. - let mut cursor = 1; - while cursor < list.len() { - if list[cursor-1].0 == list[cursor].0 { - list.remove(cursor); - } - else { - cursor += 1; - } - } + list.dedup_by(|(t1,_), (t2,_)| t1 == t2); // Process distinct times for (time, next) in list { - if let Some(prev) = prev_value { - builder.push((key.clone(), prev, time.clone(), -1)); - } - if let Some(next) = next.as_ref() { - builder.push((key.clone(), next.clone(), time.clone(), 1)); + if prev_value != next { + if let Some(prev) = prev_value { + builder.push((key.clone(), prev, time.clone(), -1)); + } + if let Some(next) = next.as_ref() { + builder.push((key.clone(), next.clone(), time.clone(), 1)); + } + prev_value = next; } - prev_value = next; } } let batch = builder.done(input_frontier.elements(), upper.elements(), &[G::Timestamp::minimum()]); input_frontier.clone_from(&upper); - // END NEW CODE + // Communicate `batch` to the arrangement and the stream. writer.insert(batch.clone(), Some(capability.time().clone())); - - // send the batch to downstream consumers, empty or not. output.session(&capabilities.elements()[index]).give(batch); } } @@ -235,8 +201,13 @@ where writer.seal(&input.frontier().frontier()[..]); } + // Update our view of the input frontier. input_frontier.clear(); input_frontier.extend(input.frontier().frontier().iter().cloned()); + + // Downgrade capabilities for `reader_local`. + reader_local.advance_by(input_frontier.elements()); + reader_local.distinguish_since(input_frontier.elements()); } if let Some(mut fuel) = effort.clone() { From c6c05428fee4173f0a3b2d5f2775f24de15308cf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 26 Mar 2020 09:38:00 -0400 Subject: [PATCH 3/5] move example to doctext --- src/operators/arrange/upsert.rs | 116 ++++++++++++++++++++++++++++++-- 1 file changed, 112 insertions(+), 4 deletions(-) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index bef8e4dd5d..a779fbd98a 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -1,4 +1,109 @@ -//! Support for upsert based collections. +//! Support for forming collections from streams of upsert. +//! +//! Upserts are sequences of keyed optional values, and they define a collection of +//! the pairs of keys and each's most recent value, if it is present. Element in the +//! sequence effectively overwrites the previous value at the key, if present, and if +//! the value is not present it uninstalls the key. +//! +//! Upserts are non-trivial because they do not themselves describe the deletions that +//! the `Collection` update stream must present. However, if one creates an `Arrangement` +//! then this state provides sufficient information. The arrangement will continue to +//! exist even if dropped until the input or dataflow shuts down, as the upsert operator +//! itself needs access to its accumulated state. +//! +//! # Notes +//! +//! Upserts currently only work with totally ordered timestamps. +//! +//! In the case of ties in timestamps (concurrent updates to the same key) they choose +//! the *greatest* value according to `Option` ordering, which will prefer a value +//! to `None` and choose the greatest value (informally, as if applied in order of value). +//! +//! If the same value is repeated, no change will occur in the output. That may make this +//! operator effective at determining the difference between collections of keyed values, +//! but note that it will not notice the absence of keys in a collection. +//! +//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to +//! the input stream, mapping values that fail the predicate to `None` values, like so: +//! +//! ```ignore +//! // Dropped values should be retained as "uninstall" upserts. +//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate))) +//! ``` +//! +//! # Example +//! +//! ```rust +//! extern crate timely; +//! extern crate differential_dataflow; +//! +//! fn main() { +//! +//! // define a new timely dataflow computation. +//! timely::execute_from_args(std::env::args().skip(1), move |worker| { +//! +//! type Key = String; +//! type Val = String; +//! +//! let mut input = timely::dataflow::InputHandle::new(); +//! let mut probe = timely::dataflow::ProbeHandle::new(); +//! +//! // Create a dataflow demonstrating upserts. +//! // +//! // Upserts are a sequence of records (key, option) where the intended +//! // value associated with a key is the most recent value, and if that is a +//! // `none` then the key is removed (until a new value shows up). +//! // +//! // The challenge with upserts is that the value to *retract* isn't supplied +//! // as part of the input stream. We have to determine what it should be! +//! +//! worker.dataflow(|scope| { +//! +//! use timely::dataflow::operators::Input; +//! use differential_dataflow::trace::implementations::ord::OrdValSpine; +//! use differential_dataflow::operators::arrange::upsert; +//! +//! let stream = scope.input_from(&mut input); +//! let arranged = upsert::arrange_from_upsert::<_, OrdValSpine>(&stream, &"test"); +//! +//! arranged +//! .as_collection(|k,v| (k.clone(), v.clone())) +//! .inspect(|x| println!("Observed: {:?}", x)) +//! .probe_with(&mut probe); +//! }); +//! +//! // Introduce the key, with a specific value. +//! input.send(("frank".to_string(), Some("mcsherry".to_string()), 3)); +//! input.advance_to(4); +//! while probe.less_than(input.time()) { worker.step(); } +//! +//! // Change the value to a different value. +//! input.send(("frank".to_string(), Some("zappa".to_string()), 4)); +//! input.advance_to(5); +//! while probe.less_than(input.time()) { worker.step(); } +//! +//! // Remove the key and its value. +//! input.send(("frank".to_string(), None, 5)); +//! input.advance_to(9); +//! while probe.less_than(input.time()) { worker.step(); } +//! +//! // Introduce a new totally different value +//! input.send(("frank".to_string(), Some("oz".to_string()), 9)); +//! input.advance_to(10); +//! while probe.less_than(input.time()) { worker.step(); } +//! +//! // Repeat the value, which should produce no output. +//! input.send(("frank".to_string(), Some("oz".to_string()), 11)); +//! input.advance_to(12); +//! while probe.less_than(input.time()) { worker.step(); } + +//! // Remove the key and value. +//! input.send(("frank".to_string(), None, 15)); +//! input.close(); +//! +//! }).unwrap(); +//! } +//! ``` use std::collections::{BinaryHeap, HashMap}; @@ -32,7 +137,10 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert(stream: &Stream, G::Timestamp)>, name: &str) -> Arranged> +pub fn arrange_from_upsert( + stream: &Stream, G::Timestamp)>, + name: &str, +) -> Arranged> where G: Scope, G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData, @@ -127,7 +235,7 @@ where let mut to_process = HashMap::new(); while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) { let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty"); - to_process.entry(key).or_insert(Vec::new()).push((time, val)); + to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val))); } let mut builder = >::Builder::new(); @@ -158,7 +266,7 @@ where list.sort(); list.dedup_by(|(t1,_), (t2,_)| t1 == t2); // Process distinct times - for (time, next) in list { + for (time, std::cmp::Reverse(next)) in list { if prev_value != next { if let Some(prev) = prev_value { builder.push((key.clone(), prev, time.clone(), -1)); From 8ba044749aef94c5ef08591bc9784f4923ab4c28 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 26 Mar 2020 09:41:38 -0400 Subject: [PATCH 4/5] remove example --- examples/upsert.rs | 51 ---------------------------------------------- 1 file changed, 51 deletions(-) delete mode 100644 examples/upsert.rs diff --git a/examples/upsert.rs b/examples/upsert.rs deleted file mode 100644 index b1130acaf2..0000000000 --- a/examples/upsert.rs +++ /dev/null @@ -1,51 +0,0 @@ -extern crate timely; -extern crate differential_dataflow; - -fn main() { - - // define a new timely dataflow computation. - timely::execute_from_args(std::env::args().skip(1), move |worker| { - - type Key = String; - type Val = String; - - let mut input = timely::dataflow::InputHandle::, usize)>::new(); - let mut probe = timely::dataflow::ProbeHandle::new(); - - // create a dataflow managing an ever-changing edge collection. - worker.dataflow::(|scope| { - - use timely::dataflow::operators::Input; - use differential_dataflow::trace::implementations::ord::OrdValSpine; - use differential_dataflow::operators::arrange::upsert; - - let stream = scope.input_from(&mut input); - let arranged = upsert::arrange_from_upsert::<_, OrdValSpine>(&stream, &"test"); - - arranged - .as_collection(|k,v| (k.clone(), v.clone())) - .inspect(|x| println!("Observed: {:?}", x)) - .probe_with(&mut probe); - }); - - input.send(("frank".to_string(), Some("mcsherry".to_string()), 3)); - input.advance_to(4); - while probe.less_than(input.time()) { worker.step(); } - - input.send(("frank".to_string(), Some("zappa".to_string()), 4)); - input.advance_to(5); - while probe.less_than(input.time()) { worker.step(); } - - input.send(("frank".to_string(), None, 5)); - input.advance_to(9); - while probe.less_than(input.time()) { worker.step(); } - - input.send(("frank".to_string(), Some("oz".to_string()), 9)); - input.advance_to(10); - while probe.less_than(input.time()) { worker.step(); } - - input.send(("frank".to_string(), None, 15)); - input.close(); - - }).unwrap(); -} From a9a1118f3909e6afda496d426a82317cb794bcfb Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 26 Mar 2020 12:59:51 -0400 Subject: [PATCH 5/5] sort by key, to match cursor order --- src/operators/arrange/upsert.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index a779fbd98a..1d28bc44d2 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -238,9 +238,15 @@ where to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val))); } - let mut builder = >::Builder::new(); + // Put (key, list) into key order, to match cursor enumeration. + let mut to_process = to_process.into_iter().collect::>(); + to_process.sort(); + + // Prepare a cursor to the existing arrangement, and a batch builder for + // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - for (key, mut list) in to_process.drain() { + let mut builder = >::Builder::new(); + for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. let mut prev_value: Option = None;