Skip to content

Introduce trait constraints; simplify elsewhere #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ pub fn count<G, Tr, R, F, P>(
index: usize,
) -> Collection<G, (P, usize, usize), R>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<ValOwned=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<ValOwned=(), Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
16 changes: 6 additions & 10 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,11 @@ pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope,
G::Timestamp: Lattice,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Diff: Semigroup,
Tr: TraceReader+Clone+'static,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
Expand Down Expand Up @@ -131,13 +129,11 @@ pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
mut output_func: S,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Diff: Semigroup,
Tr: TraceReader+Clone+'static,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
Expand Down
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
supplied_key2: Tr::KeyOwned,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
Expand Down
10 changes: 4 additions & 6 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ pub fn propose<G, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down Expand Up @@ -51,9 +50,8 @@ pub fn propose_distinct<G, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
5 changes: 2 additions & 3 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ pub fn validate<G, K, V, Tr, F, P>(
key_selector: F,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=(), Time=G::Timestamp>+Clone+'static,
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=()>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand Down
8 changes: 4 additions & 4 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ fn main() {
fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::KeyOwned: Debug + Clone,
Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
Tr::KeyOwned: Debug,
Tr::ValOwned: Debug,
Tr::Time: Debug,
Tr::Diff: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use crate::operators::arrange::Arranged;
/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs_arranged<G, N, Tr>(edges: &Arranged<G, Tr>, roots: &Collection<G, N>) -> Collection<G, (N, u32)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ pub fn bidijkstra_arranged<G, N, Tr>(
goals: &Collection<G, (N,N)>
) -> Collection<G, ((N,N), u32)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ use crate::operators::arrange::arrangement::Arranged;
/// of `logic should be a number in the interval [0,64],
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
17 changes: 1 addition & 16 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use timely::progress::Timestamp;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use crate::lattice::Lattice;
use crate::trace::{Trace, TraceReader, Batch, BatchReader};
use crate::trace::wrappers::rc::TraceBox;

Expand All @@ -29,7 +28,6 @@ use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
pub struct TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
trace: Rc<RefCell<TraceBox<Tr>>>,
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
Expand All @@ -44,7 +42,6 @@ where
impl<Tr> TraceReader for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
Expand Down Expand Up @@ -85,11 +82,7 @@ where
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
}

impl<Tr> TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Timestamp+Lattice,
{
impl<Tr: TraceReader> TraceAgent<Tr> {
/// Creates a new agent from a trace reader.
pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
where
Expand Down Expand Up @@ -177,7 +170,6 @@ where
impl<Tr> TraceAgent<Tr>
where
Tr: TraceReader+'static,
Tr::Time: Lattice+Ord+Clone+'static,
{
/// Copies an existing collection into the supplied scope.
///
Expand Down Expand Up @@ -233,7 +225,6 @@ where
pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
self.import_named(scope, "ArrangedSource")
}
Expand All @@ -242,7 +233,6 @@ where
pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
// Drop ShutdownButton and return only the arrangement.
self.import_core(scope, name).0
Expand Down Expand Up @@ -300,7 +290,6 @@ where
pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
let trace = self.clone();

Expand Down Expand Up @@ -418,7 +407,6 @@ where
pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
// This frontier describes our only guarantee on the compaction frontier.
Expand All @@ -437,7 +425,6 @@ where
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
let trace = self.clone();
Expand Down Expand Up @@ -541,7 +528,6 @@ impl<T> Drop for ShutdownDeadmans<T> {
impl<Tr> Clone for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
fn clone(&self) -> Self {

Expand Down Expand Up @@ -571,7 +557,6 @@ where
impl<Tr> Drop for TraceAgent<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
fn drop(&mut self) {

Expand Down
Loading