Skip to content

Allow halfjoin to yield tastefully #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 88 additions & 57 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
};
half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func)
half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, |timer, count| false, output_func)
}

/// An unsafe variant of `half_join` where the `output_func` closure takes
Expand All @@ -117,11 +117,17 @@ where
///
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, I, S>(
///
/// The `yield_function` allows the caller to indicate when the operator should
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
yield_function: Y,
mut output_func: S,
) -> Collection<G, DOut, ROut>
where
Expand All @@ -138,6 +144,7 @@ where
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Monoid,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static,
{
Expand All @@ -154,81 +161,105 @@ where
// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();

stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,_| move |input1, input2, output| {
stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
});
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);

// Drain input batches; although we do not observe them, we want access to the input
// to observe the frontier and to drive scheduling.
input2.for_each(|_, _| { });
move |input1, input2, output| {

if let Some(ref mut trace) = arrangement_trace {
// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
});

for (capability, proposals) in stash.iter_mut() {
// Drain input batches; although we do not observe them, we want access to the input
// to observe the frontier and to drive scheduling.
input2.for_each(|_, _| { });

// defer requests at incomplete times.
// TODO: Verify this is correct for TOTAL ORDER.
if !input2.frontier.less_equal(capability.time()) {
// Local variables to track if and when we should exit early.
// The rough logic is that we fully process inputs and set their differences to zero,
// stopping at any point. We clean up all of the zeros in buffers that did any work,
// and reactivate at the end if the yield function still says so.
let mut yielded = false;
let timer = std::time::Instant::now();
let mut work = 0;

let mut session = output.session(capability);
if let Some(ref mut trace) = arrangement_trace {

// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);
for (capability, proposals) in stash.iter_mut() {

let (mut cursor, storage) = trace.cursor();
// Avoid computation if we should already yield.
// TODO: Verify this is correct for TOTAL ORDER.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankmcsherry Did you verify this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. This is an old comment though. I think it is green because the indentation level changed.

yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.less_equal(capability.time()) {

for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
}
});
consolidate(&mut output_buffer);
for (time, diff2) in output_buffer.drain(..) {
for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) {
session.give(dout);
let mut session = output.session(capability);

// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);

let (mut cursor, storage) = trace.cursor();

// Process proposals one at a time, stopping if we should yield.
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
}
});
consolidate(&mut output_buffer);
work += output_buffer.len();
for (time, diff2) in output_buffer.drain(..) {
for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) {
session.give(dout);
}
}
cursor.step_val(&storage);
}
cursor.step_val(&storage);
cursor.rewind_vals(&storage);
}
cursor.rewind_vals(&storage);
*diff1 = Tr::R::zero();
}
*diff1 = Tr::R::zero();
}
}

proposals.retain(|ptd| !ptd.2.is_zero());
proposals.retain(|ptd| !ptd.2.is_zero());
}
}
}
}

// drop fully processed capabilities.
stash.retain(|_,proposals| !proposals.is_empty());
// If we yielded, re-activate the operator.
if yielded {
activator.activate();
}

// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
for time in input1.frontier().frontier().iter() {
frontier.insert(frontier_func(time));
}
for key in stash.keys() {
frontier.insert(frontier_func(key.time()));
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
// drop fully processed capabilities.
stash.retain(|_,proposals| !proposals.is_empty());

if input1.frontier().is_empty() && stash.is_empty() {
arrangement_trace = None;
}
// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
for time in input1.frontier().frontier().iter() {
frontier.insert(frontier_func(time));
}
for key in stash.keys() {
frontier.insert(frontier_func(key.time()));
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));

if input1.frontier().is_empty() && stash.is_empty() {
arrangement_trace = None;
}
}
}).as_collection()
}