Skip to content

Arrangement GATs #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr: TraceReader<ValOwned=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::Key| { *k = key_selector(&p.0); },
move |(p,c,i), r, &(), s| {
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
else { ((p.clone(), s, index), r.clone()) }
Expand Down
29 changes: 13 additions & 16 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
//! of logical compaction, which should not be done in a way that prevents
//! the correct determination of the total order comparison.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::ops::Mul;


use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
Expand Down Expand Up @@ -70,8 +68,8 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -80,20 +78,19 @@ pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, &Tr::Val)->DOut+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -125,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -136,19 +133,18 @@ pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + std::borrow::Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -158,7 +154,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -216,8 +212,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, key.borrow());
if cursor.get_key(&storage) == Some(key.borrow()) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
24 changes: 12 additions & 12 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::Key,
supplied_key1: Tr::Key,
supplied_key2: Tr::Key,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Sized,
Tr::Val: Clone,
Tr::KeyOwned: Hashable,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -51,14 +50,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::Key = supplied_key0;
let mut key: Tr::KeyOwned = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::Key = supplied_key1;
let mut key2: Tr::Key = supplied_key2;
let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -96,8 +95,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
cursor.seek_key(&storage, &key1);
if cursor.get_key(&storage) == Some(&key1) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
23 changes: 11 additions & 12 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -18,22 +19,21 @@ pub fn propose<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.clone()), diff.clone().multiply(sum)),
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -49,22 +49,21 @@ pub fn propose_distinct<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.clone()), diff.clone()),
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn validate<G, K, V, Tr, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand All @@ -32,7 +32,7 @@ where
extensions,
arrangement,
move |(pre,val),key| { *key = (key_selector(pre), val.clone()); },
|(pre,val),r,&(),_| ((pre.clone(), val.clone()), r.clone()),
|(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ fn main() {
fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::Key: Debug + Clone,
Tr::Val: Debug + Clone,
Tr::KeyOwned: Debug + Clone,
Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
{
Expand Down
29 changes: 16 additions & 13 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore};
use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
Expand All @@ -44,28 +44,31 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
// "rhh" => {
// use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
// let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],[u8],_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],u8,_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k,&(),&()| Option::<()>::None)
join_traces(&keys, &data, |k,v1,v2,t,r1,r2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
.probe_with(&mut probe);
},
_ => {
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
5 changes: 3 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::hash::Hash;
use timely::dataflow::*;

use ::{Collection, ExchangeData};
use ::operators::*;
use ::lattice::Lattice;
use ::difference::{Abelian, Multiply};
use ::operators::arrange::arrangement::ArrangeByKey;
Expand Down Expand Up @@ -64,7 +63,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand All @@ -90,6 +89,8 @@ where

use timely::order::Product;

use operators::join::JoinCore;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

Expand Down
6 changes: 4 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
type Key = Tr::Key;
type Val = Tr::Val;
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type ValOwned = Tr::ValOwned;
type Time = Tr::Time;
type Diff = Tr::Diff;

Expand Down
Loading