Skip to content

Commit 9fe620d

Browse files
committed
Allow Batcher::seal to be generic in Builder
1 parent f4c3dd1 commit 9fe620d

File tree

10 files changed

+102
-115
lines changed

10 files changed

+102
-115
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability;
2929
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
3030
use ::difference::Semigroup;
3131
use lattice::Lattice;
32-
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
32+
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
3333
use trace::implementations::{KeySpine, ValSpine};
3434

3535
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
@@ -456,8 +456,8 @@ where
456456
R: ExchangeData,
457457
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
458458
Tr::Batch: Batch,
459-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
460-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
459+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
460+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
461461
{
462462
self.arrange_named("Arrange")
463463
}
@@ -474,8 +474,8 @@ where
474474
R: ExchangeData,
475475
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
476476
Tr::Batch: Batch,
477-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
478-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
477+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
478+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
479479
{
480480
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
481481
self.arrange_core(exchange, name)
@@ -491,8 +491,8 @@ where
491491
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
492492
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
493493
Tr::Batch: Batch,
494-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
495-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
494+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
495+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
496496
;
497497
}
498498

@@ -510,8 +510,8 @@ where
510510
V: ExchangeData,
511511
R: ExchangeData,
512512
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
513-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
514-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
513+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
514+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
515515
{
516516
self.arrange_named("Arrange")
517517
}
@@ -522,8 +522,8 @@ where
522522
V: ExchangeData,
523523
R: ExchangeData,
524524
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
525-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
526-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
525+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
526+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
527527
{
528528
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
529529
self.arrange_core(exchange, name)
@@ -534,8 +534,8 @@ where
534534
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
535535
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
536536
Tr::Batch: Batch,
537-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
538-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
537+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
538+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
539539
{
540540
// The `Arrange` operator is tasked with reacting to an advancing input
541541
// frontier by producing the sequence of batches whose lower and upper
@@ -645,7 +645,7 @@ where
645645
}
646646

647647
// Extract updates not in advance of `upper`.
648-
let batch = batcher.seal(upper.clone());
648+
let batch = batcher.seal::<<Tr::Batch as Batch>::Builder>(upper.clone());
649649

650650
writer.insert(batch.clone(), Some(capability.time().clone()));
651651

@@ -673,7 +673,7 @@ where
673673
}
674674
else {
675675
// Announce progress updates, even without data.
676-
let _batch = batcher.seal(input.frontier().frontier().to_owned());
676+
let _batch = batcher.seal::<<Tr::Batch as Batch>::Builder>(input.frontier().frontier().to_owned());
677677
writer.seal(input.frontier().frontier().to_owned());
678678
}
679679

@@ -699,8 +699,8 @@ where
699699
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
700700
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
701701
Tr::Batch: Batch,
702-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,()),G::Timestamp,R)>,
703-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
702+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
703+
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
704704
{
705705
self.map(|k| (k, ()))
706706
.arrange_core(pact, name)

src/operators/arrange/upsert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ where
146146
Tr::Val: ExchangeData,
147147
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
148148
Tr::Batch: Batch,
149-
<Tr::Batch as Batch>::Builder: Builder<Tr::Batch, Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
149+
<Tr::Batch as Batch>::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
150150
{
151151
let mut reader: Option<TraceAgent<Tr>> = None;
152152

src/operators/consolidate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use ::difference::Semigroup;
1313

1414
use Data;
1515
use lattice::Lattice;
16-
use trace::{Batch, Batcher};
16+
use trace::{Batch, Batcher, Builder};
1717

1818
/// Methods which require data be arrangeable.
1919
impl<G, D, R> Collection<G, D, R>
@@ -58,8 +58,8 @@ where
5858
where
5959
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
6060
Tr::Batch: crate::trace::Batch,
61-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((D,()),G::Timestamp,R)>,
62-
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
61+
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
62+
<Tr::Batch as Batch>::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
6363
{
6464
use operators::arrange::arrangement::Arrange;
6565
self.map(|k| (k, ()))

src/operators/reduce.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
276276
T2::Val: Data,
277277
T2::R: Abelian,
278278
T2::Batch: Batch,
279-
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
279+
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
280280
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
281281
{
282282
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
@@ -299,7 +299,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
299299
T2::Val: Data,
300300
T2::R: Semigroup,
301301
T2::Batch: Batch,
302-
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
302+
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
303303
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
304304
;
305305
}
@@ -318,7 +318,7 @@ where
318318
T2::R: Semigroup,
319319
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
320320
T2::Batch: Batch,
321-
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
321+
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
322322
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
323323
{
324324
self.arrange_by_key_named(&format!("Arrange: {}", name))
@@ -337,7 +337,7 @@ where
337337
T2::Val: Data,
338338
T2::R: Semigroup,
339339
T2::Batch: Batch,
340-
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
340+
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
341341
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
342342

343343
let mut result_trace = None;

src/trace/implementations/merge_batcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ pub struct MergeBatcher<B: Batch> where B::Key: Ord, B::Val: Ord, B::Time: Ord,
1818
phantom: ::std::marker::PhantomData<B>,
1919
}
2020

21-
impl<B> Batcher<B> for MergeBatcher<B>
21+
impl<B> Batcher for MergeBatcher<B>
2222
where
2323
B: Batch,
2424
B::Key: Ord+Clone,
2525
B::Val: Ord+Clone,
2626
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone,
2727
B::R: Semigroup,
28-
B::Builder: Builder<B, Item = ((B::Key, B::Val), B::Time, B::R)>,
28+
B::Builder: Builder<Output=B, Item = ((B::Key, B::Val), B::Time, B::R)>,
2929
{
3030
type Item = ((B::Key,B::Val),B::Time,B::R);
3131
type Time = B::Time;
@@ -61,9 +61,9 @@ where
6161
// which we call `lower`, by assumption that after sealing a batcher we receive no more
6262
// updates with times not greater or equal to `upper`.
6363
#[inline(never)]
64-
fn seal(&mut self, upper: Antichain<B::Time>) -> B {
64+
fn seal<B2: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<B::Time>) -> B2::Output {
6565

66-
let mut builder = B::Builder::new();
66+
let mut builder = B2::new();
6767

6868
let mut merged = Vec::new();
6969
self.sorter.finish_into(&mut merged);

src/trace/implementations/merge_batcher_col.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ pub struct ColumnatedMergeBatcher<B: Batch>
2525
phantom: PhantomData<B>,
2626
}
2727

28-
impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
28+
impl<B: Batch> Batcher for ColumnatedMergeBatcher<B>
2929
where
3030
B::Key: Ord+Clone+Columnation+'static,
3131
B::Val: Ord+Clone+Columnation+'static,
3232
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static,
3333
B::R: Semigroup+Columnation+'static,
34-
B::Builder: Builder<B, Item = ((B::Key, B::Val), B::Time, B::R)>,
34+
B::Builder: Builder<Output=B, Item = ((B::Key, B::Val), B::Time, B::R)>,
3535
{
3636
type Item = ((B::Key,B::Val),B::Time,B::R);
3737
type Time = B::Time;
@@ -65,9 +65,9 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
6565
// which we call `lower`, by assumption that after sealing a batcher we receive no more
6666
// updates with times not greater or equal to `upper`.
6767
#[inline]
68-
fn seal(&mut self, upper: Antichain<B::Time>) -> B {
68+
fn seal<B2: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<B::Time>) -> B2::Output {
6969

70-
let mut builder = B::Builder::new();
70+
let mut builder = B2::new();
7171

7272
let mut merged = Default::default();
7373
self.sorter.finish_into(&mut merged);

src/trace/implementations/ord.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<L: Layout, C> BatchReader for OrdValBatch<L, C> {
9595
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>>
9696
{
9797
type Batcher = MergeBatcher<Self>;
98-
type Builder = OrdValBuilder<L>;
98+
type Builder = OrdValBuilder<L, Vec<L::Target>>;
9999
type Merger = OrdValMerger<L>;
100100

101101
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -112,7 +112,7 @@ where
112112
Self::R: Columnation + 'static,
113113
{
114114
type Batcher = ColumnatedMergeBatcher<Self>;
115-
type Builder = OrdValBuilder<L>;
115+
type Builder = OrdValBuilder<L, TimelyStack<L::Target>>;
116116
type Merger = OrdValMerger<L>;
117117

118118
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -337,26 +337,30 @@ impl<L: Layout, C> Cursor<OrdValBatch<L, C>> for OrdValCursor<L, C> {
337337
}
338338

339339
/// A builder for creating layers from unsorted update tuples.
340-
pub struct OrdValBuilder<L: Layout> {
340+
pub struct OrdValBuilder<L: Layout, C> {
341341
builder: KVTDBuilder<L>,
342+
phantom: PhantomData<C>,
342343
}
343344

344345

345-
impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
346+
impl<L: Layout, C> Builder for OrdValBuilder<L, C>
346347
where
347348
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
348349
{
349350
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
350351
type Time = <L::Target as Update>::Time;
352+
type Output = OrdValBatch<L, C>;
351353

352354
fn new() -> Self {
353355
OrdValBuilder {
354-
builder: <KVTDBuilder<L>>::new()
356+
builder: <KVTDBuilder<L>>::new(),
357+
phantom: std::marker::PhantomData,
355358
}
356359
}
357360
fn with_capacity(cap: usize) -> Self {
358361
OrdValBuilder {
359-
builder: <KVTDBuilder<L> as TupleBuilder>::with_capacity(cap)
362+
builder: <KVTDBuilder<L> as TupleBuilder>::with_capacity(cap),
363+
phantom: std::marker::PhantomData,
360364
}
361365
}
362366

@@ -413,7 +417,7 @@ impl<L: Layout, C> BatchReader for OrdKeyBatch<L, C> {
413417

414418
impl<L: Layout> Batch for OrdKeyBatch<L, Vec<L::Target>> {
415419
type Batcher = MergeBatcher<Self>;
416-
type Builder = OrdKeyBuilder<L>;
420+
type Builder = OrdKeyBuilder<L, Vec<L::Target>>;
417421
type Merger = OrdKeyMerger<L>;
418422

419423
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -429,7 +433,7 @@ where
429433
Self::R: Columnation + 'static,
430434
{
431435
type Batcher = ColumnatedMergeBatcher<Self>;
432-
type Builder = OrdKeyBuilder<L>;
436+
type Builder = OrdKeyBuilder<L, TimelyStack<L::Target>>;
433437
type Merger = OrdKeyMerger<L>;
434438

435439
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -636,26 +640,30 @@ impl<L: Layout, C> Cursor<OrdKeyBatch<L, C>> for OrdKeyCursor<L, C> {
636640

637641

638642
/// A builder for creating layers from unsorted update tuples.
639-
pub struct OrdKeyBuilder<L: Layout> {
643+
pub struct OrdKeyBuilder<L: Layout, C> {
640644
builder: KTDBuilder<L>,
645+
phantom: std::marker::PhantomData<C>
641646
}
642647

643-
impl<L: Layout, C> Builder<OrdKeyBatch<L, C>> for OrdKeyBuilder<L>
648+
impl<L: Layout, C> Builder for OrdKeyBuilder<L, C>
644649
where
645650
OrdKeyBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=(), Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
646651
{
647652
type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
648653
type Time = <L::Target as Update>::Time;
654+
type Output = OrdKeyBatch<L, C>;
649655

650656
fn new() -> Self {
651657
OrdKeyBuilder {
652-
builder: <KTDBuilder<L>>::new()
658+
builder: <KTDBuilder<L>>::new(),
659+
phantom: std::marker::PhantomData,
653660
}
654661
}
655662

656663
fn with_capacity(cap: usize) -> Self {
657664
OrdKeyBuilder {
658-
builder: <KTDBuilder<L> as TupleBuilder>::with_capacity(cap)
665+
builder: <KTDBuilder<L> as TupleBuilder>::with_capacity(cap),
666+
phantom: std::marker::PhantomData,
659667
}
660668
}
661669

src/trace/implementations/ord_neu.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ mod val_batch {
129129

130130
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>> {
131131
type Batcher = MergeBatcher<Self>;
132-
type Builder = OrdValBuilder<L>;
132+
type Builder = OrdValBuilder<L, Vec<L::Target>>;
133133
type Merger = OrdValMerger<L>;
134134

135135
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -146,7 +146,7 @@ mod val_batch {
146146
Self::R: Columnation + 'static,
147147
{
148148
type Batcher = ColumnatedMergeBatcher<Self>;
149-
type Builder = OrdValBuilder<L>;
149+
type Builder = OrdValBuilder<L, TimelyStack<L::Target>>;
150150
type Merger = OrdValMerger<L>;
151151

152152
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
@@ -479,17 +479,19 @@ mod val_batch {
479479
}
480480

481481
/// A builder for creating layers from unsorted update tuples.
482-
pub struct OrdValBuilder<L: Layout> {
482+
pub struct OrdValBuilder<L: Layout, C> {
483483
result: OrdValStorage<L>,
484484
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
485485
/// Counts the number of singleton optimizations we performed.
486486
///
487487
/// This number allows us to correctly gauge the total number of updates reflected in a batch,
488488
/// even though `updates.len()` may be much shorter than this amount.
489489
singletons: usize,
490+
/// Phantom marker for Rust happiness.
491+
pub phantom: PhantomData<C>,
490492
}
491493

492-
impl<L: Layout> OrdValBuilder<L> {
494+
impl<L: Layout, C> OrdValBuilder<L, C> {
493495
/// Pushes a single update, which may set `self.singleton` rather than push.
494496
///
495497
/// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
@@ -517,13 +519,13 @@ mod val_batch {
517519
}
518520
}
519521

520-
impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
522+
impl<L: Layout, C> Builder for OrdValBuilder<L, C>
521523
where
522524
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
523525
{
524-
525526
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
526527
type Time = <L::Target as Update>::Time;
528+
type Output = OrdValBatch<L, C>;
527529

528530
fn new() -> Self { Self::with_capacity(0) }
529531
fn with_capacity(cap: usize) -> Self {
@@ -538,6 +540,7 @@ mod val_batch {
538540
},
539541
singleton: None,
540542
singletons: 0,
543+
phantom: std::marker::PhantomData,
541544
}
542545
}
543546

0 commit comments

Comments
 (0)