Skip to content

Commit 236c46b

Browse files
committed
Restore clear, cleanup
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent ef54810 commit 236c46b

35 files changed

+225
-187
lines changed

container/src/lib.rs

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,25 @@ use std::collections::VecDeque;
66

77
/// A type representing progress, with an update count.
88
///
9-
/// It describes its update count (`count()`) and whether it is empty (`is_empty()`).
9+
/// It describes its update count (`count()`).
1010
///
1111
/// We require [`Default`] for convenience purposes.
12-
pub trait WithProgress: Default {
12+
pub trait Container: Default {
1313
/// The number of elements in this container
1414
///
1515
/// This number is used in progress tracking to confirm the receipt of some number
1616
/// of outstanding records, and it is highly load bearing. The main restriction is
17-
/// imposed on the [`CountPreservingContainerBuilder`] trait, whose implementors
17+
/// imposed on the [`LengthPreservingContainerBuilder`] trait, whose implementors
1818
/// must preserve the number of items.
1919
fn count(&self) -> usize;
20+
21+
/// Remove all contents from `self` while retaining allocated memory.
22+
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
23+
fn clear(&mut self);
2024
}
2125

2226
/// A container that can reveal its contents through iterating by reference and draining.
23-
pub trait IterableContainer: WithProgress {
27+
pub trait IterContainer: Container {
2428
/// The type of elements when reading non-destructively from the container.
2529
type ItemRef<'a> where Self: 'a;
2630

@@ -42,7 +46,7 @@ pub trait IterableContainer: WithProgress {
4246
}
4347

4448
/// A container that can be sized and reveals its capacity.
45-
pub trait SizableContainer: WithProgress {
49+
pub trait SizableContainer: Container {
4650
/// Indicates that the container is "full" and should be shipped.
4751
fn at_capacity(&self) -> bool;
4852
/// Restores `self` to its desired capacity, if it has one.
@@ -84,7 +88,7 @@ pub trait PushInto<T> {
8488
/// decide to represent a push order for `extract` and `finish`, or not.
8589
pub trait ContainerBuilder: Default + 'static {
8690
/// The container type we're building.
87-
type Container: WithProgress + Default + Clone + 'static;
91+
type Container: Container + Clone + 'static;
8892
/// Extract assembled containers, potentially leaving unfinished data behind. Can
8993
/// be called repeatedly, for example while the caller can send data.
9094
///
@@ -98,14 +102,15 @@ pub trait ContainerBuilder: Default + 'static {
98102
/// Partitions `container` among `builders`, using the function `index` to direct items.
99103
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
100104
where
101-
Self: for<'a> PushInto<<Self::Container as IterableContainer>::Item<'a>>,
102-
I: for<'a> FnMut(&<Self::Container as IterableContainer>::Item<'a>) -> usize,
103-
Self::Container: IterableContainer,
105+
Self: for<'a> PushInto<<Self::Container as IterContainer>::Item<'a>>,
106+
I: for<'a> FnMut(&<Self::Container as IterContainer>::Item<'a>) -> usize,
107+
Self::Container: IterContainer,
104108
{
105109
for datum in container.drain() {
106110
let index = index(&datum);
107111
builders[index].push_into(datum);
108112
}
113+
container.clear();
109114
}
110115

111116
/// Indicates a good moment to release resources.
@@ -122,7 +127,7 @@ pub trait ContainerBuilder: Default + 'static {
122127
/// Specifically, the sum of lengths of all extracted and finished containers must equal the
123128
/// number of times that `push_into` is called on the container builder.
124129
/// If you have any questions about this trait you are best off not implementing it.
125-
pub trait CountPreservingContainerBuilder: ContainerBuilder { }
130+
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
126131

127132
/// A container builder that never produces any outputs, and can be used to pass through data in
128133
/// operators.
@@ -136,7 +141,7 @@ impl<C> Default for PassthroughContainerBuilder<C> {
136141
}
137142
}
138143

139-
impl<C: WithProgress + Clone + 'static> ContainerBuilder for PassthroughContainerBuilder<C>
144+
impl<C: Container + Clone + 'static> ContainerBuilder for PassthroughContainerBuilder<C>
140145
{
141146
type Container = C;
142147

@@ -183,7 +188,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
183188
}
184189
}
185190

186-
impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
191+
impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
187192
type Container = C;
188193

189194
#[inline]
@@ -206,9 +211,14 @@ impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBu
206211
}
207212
}
208213

209-
impl<C: WithProgress + SizableContainer + Clone + 'static> CountPreservingContainerBuilder for CapacityContainerBuilder<C> { }
214+
impl<C: Container + SizableContainer + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
215+
216+
impl<T> Container for Vec<T> {
217+
#[inline(always)] fn count(&self) -> usize { Vec::len(self) }
218+
#[inline(always)] fn clear(&mut self) { Vec::clear(self) }
219+
}
210220

211-
impl<T> IterableContainer for Vec<T> {
221+
impl<T> IterContainer for Vec<T> {
212222
type ItemRef<'a> = &'a T where T: 'a;
213223
type Item<'a> = T where T: 'a;
214224
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
@@ -266,9 +276,22 @@ mod rc {
266276
use std::ops::Deref;
267277
use std::rc::Rc;
268278

269-
use crate::IterableContainer;
279+
use crate::{Container, IterContainer};
280+
281+
impl<T: Container> Container for Rc<T> {
282+
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
283+
#[inline(always)] fn clear(&mut self) {
284+
// Try to reuse the allocation if possible
285+
if let Some(inner) = Rc::get_mut(self) {
286+
inner.clear();
287+
} else {
288+
*self = Self::default();
289+
}
290+
}
291+
}
292+
270293

271-
impl<T: IterableContainer> IterableContainer for Rc<T> {
294+
impl<T: IterContainer> IterContainer for Rc<T> {
272295
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
273296
type Item<'a> = T::ItemRef<'a> where Self: 'a;
274297
type Iter<'a> = T::Iter<'a> where Self: 'a;
@@ -289,9 +312,21 @@ mod arc {
289312
use std::ops::Deref;
290313
use std::sync::Arc;
291314

292-
use crate::IterableContainer;
315+
use crate::{Container, IterContainer};
316+
317+
impl<T: Container> Container for std::sync::Arc<T> {
318+
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
319+
#[inline(always)] fn clear(&mut self) {
320+
// Try to reuse the allocation if possible
321+
if let Some(inner) = Arc::get_mut(self) {
322+
inner.clear();
323+
} else {
324+
*self = Self::default();
325+
}
326+
}
327+
}
293328

294-
impl<T: IterableContainer> IterableContainer for Arc<T> {
329+
impl<T: IterContainer> IterContainer for Arc<T> {
295330
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
296331
type Item<'a> = T::ItemRef<'a> where Self: 'a;
297332
type Iter<'a> = T::Iter<'a> where Self: 'a;
@@ -328,15 +363,3 @@ pub mod buffer {
328363
}
329364
}
330365
}
331-
332-
impl<T> WithProgress for Vec<T> {
333-
#[inline(always)] fn count(&self) -> usize { self.len() }
334-
}
335-
336-
impl<T: WithProgress> WithProgress for std::rc::Rc<T> {
337-
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
338-
}
339-
340-
impl<T: WithProgress> WithProgress for std::sync::Arc<T> {
341-
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
342-
}

timely/examples/columnar.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use {
44
std::collections::HashMap,
5-
timely::container::{CapacityContainerBuilder, IterableContainer},
5+
timely::container::{CapacityContainerBuilder, IterContainer},
66
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
77
timely::dataflow::InputHandleCore,
88
timely::dataflow::operators::{Inspect, Operator, Probe},
@@ -165,11 +165,19 @@ mod container {
165165
}
166166
}
167167

168-
impl<C: columnar::ContainerBytes> timely::container::WithProgress for Column<C> {
168+
impl<C: columnar::ContainerBytes> timely::container::Container for Column<C> {
169+
#[inline(always)] fn count(&self) -> usize { self.borrow().len() }
170+
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
169171
#[inline(always)]
170-
fn count(&self) -> usize { self.borrow().len() }
172+
fn clear(&mut self) {
173+
match self {
174+
Column::Typed(t) => t.clear(),
175+
Column::Bytes(_) => *self = Column::Typed(Default::default()),
176+
Column::Align(_) => *self = Column::Typed(Default::default()),
177+
}
178+
}
171179
}
172-
impl<C: columnar::ContainerBytes> timely::container::IterableContainer for Column<C> {
180+
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
173181
type ItemRef<'a> = C::Ref<'a>;
174182
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
175183
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
@@ -278,7 +286,7 @@ mod builder {
278286
}
279287
}
280288

281-
use timely::container::{ContainerBuilder, CountPreservingContainerBuilder};
289+
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
282290
impl<C: columnar::ContainerBytes> ContainerBuilder for ColumnBuilder<C> {
283291
type Container = Column<C>;
284292

@@ -311,5 +319,5 @@ mod builder {
311319
}
312320
}
313321

314-
impl<C: columnar::ContainerBytes> CountPreservingContainerBuilder for ColumnBuilder<C> { }
322+
impl<C: columnar::ContainerBytes> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
315323
}

timely/src/dataflow/channels/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use serde::{Deserialize, Serialize};
44
use crate::communication::Push;
5-
use crate::container::WithProgress;
5+
use crate::Container;
66

77
/// A collection of types that may be pushed at.
88
pub mod pushers;
@@ -32,15 +32,14 @@ impl<T, C> Message<T, C> {
3232
}
3333
}
3434

35-
impl<T, C: WithProgress> Message<T, C> {
35+
impl<T, C: Container> Message<T, C> {
3636
/// Creates a new message instance from arguments.
3737
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
3838
Message { time, data, from, seq }
3939
}
4040

4141
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42-
/// leaves in place, or the container's default element. The buffer's contents are left in an
43-
/// undefined state, specifically the caller cannot rely on this function clearing the buffer.
42+
/// leaves in place, or the container's default element. The buffer is cleared.
4443
#[inline]
4544
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
4645

@@ -52,6 +51,7 @@ impl<T, C: WithProgress> Message<T, C> {
5251

5352
if let Some(message) = bundle {
5453
*buffer = message.data;
54+
buffer.clear();
5555
}
5656
}
5757
}

timely/src/dataflow/channels/pact.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use std::rc::Rc;
1212

13-
use crate::container::{ContainerBuilder, CountPreservingContainerBuilder, IterableContainer, WithProgress, SizableContainer, CapacityContainerBuilder, PushInto};
13+
use crate::container::{ContainerBuilder, LengthPreservingContainerBuilder, IterContainer, Container, SizableContainer, CapacityContainerBuilder, PushInto};
1414
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1515
use crate::communication::{Push, Pull};
1616
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
@@ -34,7 +34,7 @@ pub trait ParallelizationContract<T, C> {
3434
#[derive(Debug)]
3535
pub struct Pipeline;
3636

37-
impl<T: 'static, C: WithProgress + 'static> ParallelizationContract<T, C> for Pipeline {
37+
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
3838
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
3939
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
4040
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -52,8 +52,8 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
5252

5353
impl<CB, F> ExchangeCore<CB, F>
5454
where
55-
CB: CountPreservingContainerBuilder<Container: IterableContainer>,
56-
for<'a> F: FnMut(&<CB::Container as IterableContainer>::Item<'a>)->u64
55+
CB: LengthPreservingContainerBuilder<Container: IterContainer>,
56+
for<'a> F: FnMut(&<CB::Container as IterContainer>::Item<'a>)->u64
5757
{
5858
/// Allocates a new `Exchange` pact from a distribution function.
5959
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
@@ -66,7 +66,7 @@ where
6666

6767
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
6868
where
69-
C: SizableContainer + IterableContainer,
69+
C: SizableContainer + IterContainer,
7070
for<'a> F: FnMut(&C::Item<'a>)->u64
7171
{
7272
/// Allocates a new `Exchange` pact from a distribution function.
@@ -81,10 +81,10 @@ where
8181
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
8282
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8383
where
84-
CB: ContainerBuilder<Container: IterableContainer>,
85-
CB: for<'a> PushInto<<CB::Container as IterableContainer>::Item<'a>>,
84+
CB: ContainerBuilder<Container: IterContainer>,
85+
CB: for<'a> PushInto<<CB::Container as IterContainer>::Item<'a>>,
8686
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87-
for<'a> H: FnMut(&<CB::Container as IterableContainer>::Item<'a>) -> u64
87+
for<'a> H: FnMut(&<CB::Container as IterContainer>::Item<'a>) -> u64
8888
{
8989
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
9090
type Puller = LogPuller<T, CB::Container, Box<dyn Pull<Message<T, CB::Container>>>>;
@@ -129,7 +129,7 @@ impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
129129
}
130130
}
131131

132-
impl<T, C: WithProgress, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
132+
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
133133
#[inline]
134134
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135135
if let Some(bundle) = pair {
@@ -179,7 +179,7 @@ impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
179179
}
180180
}
181181

182-
impl<T, C: WithProgress, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
182+
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
183183
#[inline]
184184
fn pull(&mut self) -> &mut Option<Message<T, C>> {
185185
let result = self.puller.pull();

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::cell::RefCell;
66
use crate::dataflow::channels::Message;
77
use crate::progress::ChangeBatch;
88
use crate::communication::Pull;
9-
use crate::container::WithProgress;
9+
use crate::Container;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
1212
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
@@ -36,7 +36,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3636
}
3737
}
3838

39-
impl<T:Ord+Clone+'static, C: WithProgress, P: Pull<Message<T, C>>> Counter<T, C, P> {
39+
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
4040
/// Retrieves the next timestamp and batch of data.
4141
#[inline]
4242
pub fn next(&mut self) -> Option<&mut Message<T, C>> {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
//! with the performance of batched sends.
33
44
use crate::communication::Push;
5-
use crate::container::{ContainerBuilder, CapacityContainerBuilder, WithProgress, PushInto, SizableContainer};
5+
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::Data;
9+
use crate::{Container, Data};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: SizableContainer + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder<C>, P> {

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::cell::RefCell;
77
use crate::progress::{ChangeBatch, Timestamp};
88
use crate::dataflow::channels::Message;
99
use crate::communication::Push;
10-
use crate::container::WithProgress;
10+
use crate::Container;
1111

1212
/// A wrapper which updates shared `produced` based on the number of records pushed.
1313
#[derive(Debug)]
@@ -17,7 +17,7 @@ pub struct Counter<T, C, P: Push<Message<T, C>>> {
1717
phantom: PhantomData<C>,
1818
}
1919

20-
impl<T: Timestamp, C: WithProgress, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
20+
impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {

0 commit comments

Comments
 (0)