Skip to content

Further trait clean-up #473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ where
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -462,7 +462,7 @@ where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
Tr::ValOwned: ExchangeData,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down
5 changes: 2 additions & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::difference::Semigroup;

use crate::Data;
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::trace::Batcher;

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -53,8 +53,7 @@ where
where
Tr: crate::trace::Trace<KeyOwned = D,ValOwned = (),Time=G::Timestamp,Diff=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
Expand Down
8 changes: 4 additions & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -273,7 +273,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
;
}
Expand All @@ -292,7 +292,7 @@ where
T2::ValOwned: Data,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -310,7 +310,7 @@ where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time> + 'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ where
T: Timestamp,
D: Semigroup,
{
type Input = Vec<Self::Item>;
type Item = ((K,V),T,D);
type Input = Vec<((K,V),T,D)>;
type Output = ((K,V),T,D);
type Time = T;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -38,7 +38,7 @@ where
}

#[inline(never)]
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
RefOrMut::Ref(reference) => {
Expand All @@ -59,7 +59,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ where
T: Columnation + Timestamp + 'static,
D: Columnation + Semigroup + 'static,
{
type Input = Vec<Self::Item>;
type Item = ((K,V),T,D);
type Input = Vec<((K,V),T,D)>;
type Output = ((K,V),T,D);
type Time = T;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -44,7 +44,7 @@ where
}

#[inline]
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
RefOrMut::Ref(reference) => {
Expand All @@ -63,7 +63,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ mod val_batch {

impl<L: Layout> Builder for OrdValBuilder<L> {

type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = OrdValBatch<L>;

Expand All @@ -560,7 +560,7 @@ mod val_batch {
}

#[inline]
fn push(&mut self, ((key, val), time, diff): Self::Item) {
fn push(&mut self, ((key, val), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -586,7 +586,7 @@ mod val_batch {
}

#[inline]
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down Expand Up @@ -1006,7 +1006,7 @@ mod key_batch {

impl<L: Layout> Builder for OrdKeyBuilder<L> {

type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = OrdKeyBatch<L>;

Expand All @@ -1024,7 +1024,7 @@ mod key_batch {
}

#[inline]
fn push(&mut self, ((key, ()), time, diff): Self::Item) {
fn push(&mut self, ((key, ()), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -1040,7 +1040,7 @@ mod key_batch {
}

#[inline]
fn copy(&mut self, ((key, ()), time, diff): &Self::Item) {
fn copy(&mut self, ((key, ()), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ mod val_batch {
<L::Target as Update>::Key: Default + HashOrdered,
// RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
{
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = RhhValBatch<L>;

Expand Down Expand Up @@ -763,7 +763,7 @@ mod val_batch {
}

#[inline]
fn push(&mut self, ((key, val), time, diff): Self::Item) {
fn push(&mut self, ((key, val), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -790,7 +790,7 @@ mod val_batch {
}

#[inline]
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down
2 changes: 1 addition & 1 deletion src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<B, BA, BU> Trace for Spine<B, BA, BU>
where
B: Batch+Clone+'static,
BA: Batcher<Time = B::Time>,
BU: Builder<Item=BA::Item, Time=BA::Time, Output = B>,
BU: Builder<Input=BA::Output, Time=BA::Time, Output = B>,
{
/// A type used to assemble batches from disordered updates.
type Batcher = BA;
Expand Down
32 changes: 14 additions & 18 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where <Self as TraceReader>::Batch: Batch {
/// A type used to assemble batches from disordered updates.
type Batcher: Batcher<Time = Self::Time>;
/// A type used to assemble batches from ordered update sequences.
type Builder: Builder<Item=<Self::Batcher as Batcher>::Item, Time=Self::Time, Output = Self::Batch>;
type Builder: Builder<Input=<Self::Batcher as Batcher>::Output, Time=Self::Time, Output = Self::Batch>;

/// Allocates a new empty trace.
fn new(
Expand Down Expand Up @@ -306,26 +306,26 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized {

/// Functionality for collecting and batching updates.
pub trait Batcher {
/// Type of update pushed into the batcher.
/// Type pushed into the batcher.
type Input;
/// Type of update pushed into the builder.
type Item;
/// Type produced by the batcher.
type Output;
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
/// Adds an unordered batch of elements to the batcher.
fn push_batch(&mut self, batch: RefOrMut<Self::Input>);
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
}

/// Functionality for building batches from ordered update sequences.
pub trait Builder: Sized {
/// Input item type.
type Item;
type Input;
/// Timestamp type.
type Time: Timestamp;
/// Output batch type.
Expand All @@ -344,15 +344,11 @@ pub trait Builder: Sized {
///
/// The default implementation uses `self.copy` with references to the owned arguments.
/// One should override it if the builder can take advantage of owned arguments.
fn push(&mut self, element: Self::Item) {
fn push(&mut self, element: Self::Input) {
self.copy(&element);
}
/// Adds an element to the batch.
fn copy(&mut self, element: &Self::Item);
/// Adds an ordered sequence of elements to the batch.
fn extend<I: Iterator<Item=Self::Item>>(&mut self, iter: I) {
for item in iter { self.push(item); }
}
fn copy(&mut self, element: &Self::Input);
/// Completes building and returns the batch.
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
}
Expand Down Expand Up @@ -460,12 +456,12 @@ pub mod rc_blanket_impls {

/// Functionality for building batches from ordered update sequences.
impl<B: Builder> Builder for RcBuilder<B> {
type Item = B::Item;
type Input = B::Input;
type Time = B::Time;
type Output = Rc<B::Output>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
}

Expand Down Expand Up @@ -569,12 +565,12 @@ pub mod abomonated_blanket_impls {
where
B::Output: Abomonation,
{
type Item = B::Item;
type Input = B::Input;
type Time = B::Time;
type Output = Abomonated<B::Output, Vec<u8>>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
let batch = self.builder.done(lower, upper, since);
let mut bytes = Vec::with_capacity(measure(&batch));
Expand Down