Skip to content

Commit c564e8f

Browse files
Further clean-up (#473)
1 parent 37c505d commit c564e8f

File tree

10 files changed

+41
-46
lines changed

10 files changed

+41
-46
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ where
444444
T2::ValOwned: Data,
445445
T2::Diff: Abelian,
446446
T2::Batch: Batch,
447-
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
447+
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
448448
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
449449
{
450450
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
@@ -462,7 +462,7 @@ where
462462
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
463463
T2::ValOwned: Data,
464464
T2::Batch: Batch,
465-
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
465+
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
466466
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
467467
{
468468
use crate::operators::reduce::reduce_trace;

src/operators/arrange/upsert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ where
136136
Tr::ValOwned: ExchangeData,
137137
Tr::Time: TotalOrder+ExchangeData,
138138
Tr::Batch: Batch,
139-
Tr::Builder: Builder<Item = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
139+
Tr::Builder: Builder<Input = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
140140
{
141141
let mut reader: Option<TraceAgent<Tr>> = None;
142142

src/operators/consolidate.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::difference::Semigroup;
1313

1414
use crate::Data;
1515
use crate::lattice::Lattice;
16-
use crate::trace::{Batcher, Builder};
16+
use crate::trace::Batcher;
1717

1818
/// Methods which require data be arrangeable.
1919
impl<G, D, R> Collection<G, D, R>
@@ -53,8 +53,7 @@ where
5353
where
5454
Tr: crate::trace::Trace<KeyOwned = D,ValOwned = (),Time=G::Timestamp,Diff=R>+'static,
5555
Tr::Batch: crate::trace::Batch,
56-
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
57-
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
56+
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
5857
{
5958
use crate::operators::arrange::arrangement::Arrange;
6059
use crate::trace::cursor::MyTrait;

src/operators/reduce.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
251251
T2::ValOwned: Data,
252252
T2::Diff: Abelian,
253253
T2::Batch: Batch,
254-
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
254+
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
255255
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
256256
{
257257
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
@@ -273,7 +273,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
273273
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
274274
T2::ValOwned: Data,
275275
T2::Batch: Batch,
276-
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
276+
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
277277
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
278278
;
279279
}
@@ -292,7 +292,7 @@ where
292292
T2::ValOwned: Data,
293293
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
294294
T2::Batch: Batch,
295-
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
295+
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
296296
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
297297
{
298298
self.arrange_by_key_named(&format!("Arrange: {}", name))
@@ -310,7 +310,7 @@ where
310310
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time> + 'static,
311311
T2::ValOwned: Data,
312312
T2::Batch: Batch,
313-
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
313+
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
314314
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
315315
{
316316
let mut result_trace = None;

src/trace/implementations/merge_batcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ where
2525
T: Timestamp,
2626
D: Semigroup,
2727
{
28-
type Input = Vec<Self::Item>;
29-
type Item = ((K,V),T,D);
28+
type Input = Vec<((K,V),T,D)>;
29+
type Output = ((K,V),T,D);
3030
type Time = T;
3131

3232
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
@@ -38,7 +38,7 @@ where
3838
}
3939

4040
#[inline(never)]
41-
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
41+
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
4242
// `batch` is either a shared reference or an owned allocations.
4343
match batch {
4444
RefOrMut::Ref(reference) => {
@@ -59,7 +59,7 @@ where
5959
// which we call `lower`, by assumption that after sealing a batcher we receive no more
6060
// updates with times not greater or equal to `upper`.
6161
#[inline(never)]
62-
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
62+
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
6363

6464
let mut merged = Vec::new();
6565
self.sorter.finish_into(&mut merged);

src/trace/implementations/merge_batcher_col.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ where
3131
T: Columnation + Timestamp + 'static,
3232
D: Columnation + Semigroup + 'static,
3333
{
34-
type Input = Vec<Self::Item>;
35-
type Item = ((K,V),T,D);
34+
type Input = Vec<((K,V),T,D)>;
35+
type Output = ((K,V),T,D);
3636
type Time = T;
3737

3838
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
@@ -44,7 +44,7 @@ where
4444
}
4545

4646
#[inline]
47-
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
47+
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
4848
// `batch` is either a shared reference or an owned allocations.
4949
match batch {
5050
RefOrMut::Ref(reference) => {
@@ -63,7 +63,7 @@ where
6363
// which we call `lower`, by assumption that after sealing a batcher we receive no more
6464
// updates with times not greater or equal to `upper`.
6565
#[inline]
66-
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
66+
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
6767

6868
let mut merged = Default::default();
6969
self.sorter.finish_into(&mut merged);

src/trace/implementations/ord_neu.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ mod val_batch {
540540

541541
impl<L: Layout> Builder for OrdValBuilder<L> {
542542

543-
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
543+
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
544544
type Time = <L::Target as Update>::Time;
545545
type Output = OrdValBatch<L>;
546546

@@ -560,7 +560,7 @@ mod val_batch {
560560
}
561561

562562
#[inline]
563-
fn push(&mut self, ((key, val), time, diff): Self::Item) {
563+
fn push(&mut self, ((key, val), time, diff): Self::Input) {
564564

565565
// Perhaps this is a continuation of an already received key.
566566
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
@@ -586,7 +586,7 @@ mod val_batch {
586586
}
587587

588588
#[inline]
589-
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
589+
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {
590590

591591
// Perhaps this is a continuation of an already received key.
592592
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
@@ -1006,7 +1006,7 @@ mod key_batch {
10061006

10071007
impl<L: Layout> Builder for OrdKeyBuilder<L> {
10081008

1009-
type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
1009+
type Input = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
10101010
type Time = <L::Target as Update>::Time;
10111011
type Output = OrdKeyBatch<L>;
10121012

@@ -1024,7 +1024,7 @@ mod key_batch {
10241024
}
10251025

10261026
#[inline]
1027-
fn push(&mut self, ((key, ()), time, diff): Self::Item) {
1027+
fn push(&mut self, ((key, ()), time, diff): Self::Input) {
10281028

10291029
// Perhaps this is a continuation of an already received key.
10301030
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
@@ -1040,7 +1040,7 @@ mod key_batch {
10401040
}
10411041

10421042
#[inline]
1043-
fn copy(&mut self, ((key, ()), time, diff): &Self::Item) {
1043+
fn copy(&mut self, ((key, ()), time, diff): &Self::Input) {
10441044

10451045
// Perhaps this is a continuation of an already received key.
10461046
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {

src/trace/implementations/rhh.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ mod val_batch {
731731
<L::Target as Update>::Key: Default + HashOrdered,
732732
// RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
733733
{
734-
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
734+
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
735735
type Time = <L::Target as Update>::Time;
736736
type Output = RhhValBatch<L>;
737737

@@ -763,7 +763,7 @@ mod val_batch {
763763
}
764764

765765
#[inline]
766-
fn push(&mut self, ((key, val), time, diff): Self::Item) {
766+
fn push(&mut self, ((key, val), time, diff): Self::Input) {
767767

768768
// Perhaps this is a continuation of an already received key.
769769
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
@@ -790,7 +790,7 @@ mod val_batch {
790790
}
791791

792792
#[inline]
793-
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
793+
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {
794794

795795
// Perhaps this is a continuation of an already received key.
796796
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {

src/trace/implementations/spine_fueled.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl<B, BA, BU> Trace for Spine<B, BA, BU>
255255
where
256256
B: Batch+Clone+'static,
257257
BA: Batcher<Time = B::Time>,
258-
BU: Builder<Item=BA::Item, Time=BA::Time, Output = B>,
258+
BU: Builder<Input=BA::Output, Time=BA::Time, Output = B>,
259259
{
260260
/// A type used to assemble batches from disordered updates.
261261
type Batcher = BA;

src/trace/mod.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ where <Self as TraceReader>::Batch: Batch {
213213
/// A type used to assemble batches from disordered updates.
214214
type Batcher: Batcher<Time = Self::Time>;
215215
/// A type used to assemble batches from ordered update sequences.
216-
type Builder: Builder<Item=<Self::Batcher as Batcher>::Item, Time=Self::Time, Output = Self::Batch>;
216+
type Builder: Builder<Input=<Self::Batcher as Batcher>::Output, Time=Self::Time, Output = Self::Batch>;
217217

218218
/// Allocates a new empty trace.
219219
fn new(
@@ -306,26 +306,26 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized {
306306

307307
/// Functionality for collecting and batching updates.
308308
pub trait Batcher {
309-
/// Type of update pushed into the batcher.
309+
/// Type pushed into the batcher.
310310
type Input;
311-
/// Type of update pushed into the builder.
312-
type Item;
311+
/// Type produced by the batcher.
312+
type Output;
313313
/// Times at which batches are formed.
314314
type Time: Timestamp;
315315
/// Allocates a new empty batcher.
316316
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
317317
/// Adds an unordered batch of elements to the batcher.
318318
fn push_batch(&mut self, batch: RefOrMut<Self::Input>);
319319
/// Returns all updates not greater or equal to an element of `upper`.
320-
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
320+
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
321321
/// Returns the lower envelope of contained update times.
322322
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
323323
}
324324

325325
/// Functionality for building batches from ordered update sequences.
326326
pub trait Builder: Sized {
327327
/// Input item type.
328-
type Item;
328+
type Input;
329329
/// Timestamp type.
330330
type Time: Timestamp;
331331
/// Output batch type.
@@ -344,15 +344,11 @@ pub trait Builder: Sized {
344344
///
345345
/// The default implementation uses `self.copy` with references to the owned arguments.
346346
/// One should override it if the builder can take advantage of owned arguments.
347-
fn push(&mut self, element: Self::Item) {
347+
fn push(&mut self, element: Self::Input) {
348348
self.copy(&element);
349349
}
350350
/// Adds an element to the batch.
351-
fn copy(&mut self, element: &Self::Item);
352-
/// Adds an ordered sequence of elements to the batch.
353-
fn extend<I: Iterator<Item=Self::Item>>(&mut self, iter: I) {
354-
for item in iter { self.push(item); }
355-
}
351+
fn copy(&mut self, element: &Self::Input);
356352
/// Completes building and returns the batch.
357353
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
358354
}
@@ -460,12 +456,12 @@ pub mod rc_blanket_impls {
460456

461457
/// Functionality for building batches from ordered update sequences.
462458
impl<B: Builder> Builder for RcBuilder<B> {
463-
type Item = B::Item;
459+
type Input = B::Input;
464460
type Time = B::Time;
465461
type Output = Rc<B::Output>;
466462
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
467-
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
468-
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
463+
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
464+
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
469465
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
470466
}
471467

@@ -569,12 +565,12 @@ pub mod abomonated_blanket_impls {
569565
where
570566
B::Output: Abomonation,
571567
{
572-
type Item = B::Item;
568+
type Input = B::Input;
573569
type Time = B::Time;
574570
type Output = Abomonated<B::Output, Vec<u8>>;
575571
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
576-
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
577-
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
572+
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
573+
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
578574
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
579575
let batch = self.builder.done(lower, upper, since);
580576
let mut bytes = Vec::with_capacity(measure(&batch));

0 commit comments

Comments
 (0)