Skip to content

Update batch processing to support compacted inputs #530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 50 additions & 31 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,51 +69,70 @@ where

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limit of received batches.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {

use crate::trace::cursor::IntoOwned;
let mut batch_cursors = Vec::new();
let mut batch_storage = Vec::new();

// Downgrade previous upper limit to be current lower limit.
lower_limit.clear();
lower_limit.extend(upper_limit.borrow().iter().cloned());

let mut cap = None;
input.for_each(|capability, batches| {
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
upper_limit.clone_from(batch.upper());

while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
}
});

batch_cursor.map_times(&batch, |time, diff| {
if let Some(capability) = cap {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
let mut session = output.session(&capability);

use crate::trace::cursor::CursorList;
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

while let Some(key) = batch_cursor.get_key(&batch_storage) {
let mut count: Option<T1::Diff> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});

batch_cursor.step_key(&batch);
}

batch_cursor.map_times(&batch_storage, |time, diff| {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});

batch_cursor.step_key(&batch_storage);
}
});
}

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
Expand Down
108 changes: 63 additions & 45 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::hashable::Hashable;
use crate::collection::AsCollection;
use crate::operators::arrange::{Arranged, ArrangeBySelf};
use crate::trace::{BatchReader, Cursor, TraceReader};
use crate::trace::cursor::IntoOwned;

/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
Expand Down Expand Up @@ -117,66 +116,85 @@ where

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limit of received batches.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {

let mut batch_cursors = Vec::new();
let mut batch_storage = Vec::new();

// Downgrde previous upper limit to be current lower limit.
lower_limit.clear();
lower_limit.extend(upper_limit.borrow().iter().cloned());

let mut cap = None;
input.for_each(|capability, batches| {
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
}
});

let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
use crate::trace::cursor::IntoOwned;
if let Some(capability) = cap {

upper_limit.clone_from(batch.upper());
let mut session = output.session(&capability);

while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;
use crate::trace::cursor::CursorList;
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

// Compute the multiplicity of this key before the current batch.
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}
while let Some(key) = batch_cursor.get_key(&batch_storage) {
let mut count: Option<T1::Diff> = None;

// Apply `thresh` both before and after `diff` is applied to `count`.
// If the result is non-zero, send it along.
batch_cursor.map_times(&batch, |time, diff| {

let difference =
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(&diff);
thresh(key, &temp, Some(old))
},
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(&diff);
}
else {
count = Some(diff.into_owned());
}
// Compute the multiplicity of this key before the current batch.
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

// Apply `thresh` both before and after `diff` is applied to `count`.
// If the result is non-zero, send it along.
batch_cursor.map_times(&batch_storage, |time, diff| {

let difference =
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(&diff);
thresh(key, &temp, Some(old))
},
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(&diff);
}
else {
count = Some(diff.into_owned());
}

if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.into_owned(), difference));
}
if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.into_owned(), difference));
}
});
}
});

batch_cursor.step_key(&batch);
}
batch_cursor.step_key(&batch_storage);
}
});
}

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
Expand Down