Skip to content
200 changes: 78 additions & 122 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,66 @@

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// An type containing a number of records accounted for by progress tracking.
///
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
///
/// A container must implement default. The default implementation is not required to allocate
/// memory for variable-length components.
///
/// We require the container to be cloneable to enable efficient copies when providing references
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
/// is efficient (which is not necessarily the case when deriving `Clone`.)
pub trait Container: Default {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
#[inline]
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}

/// The number of elements in this container
/// The object stores a number of updates and thus is able to describe it count
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
/// update count is zero.
pub trait Accountable {
/// The number of records
///
/// This number is used in progress tracking to confirm the receipt of some number
/// of outstanding records, and it is highly load bearing. The main restriction is
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
/// must preserve the number of items.
fn len(&self) -> usize;
/// must preserve the number of records.
fn record_count(&self) -> i64;

/// Determine if the container contains any elements, corresponding to `len() == 0`.
fn is_empty(&self) -> bool {
self.len() == 0
}

/// Remove all contents from `self` while retaining allocated memory.
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
/// It is a correctness error for this to by anything other than `self.record_count() == 0`.
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
}

/// A container that allows iteration morally equivalent to [`IntoIterator`].
///
/// Iterating the container presents items in an implementation-specific order.
/// The container's contents are not changed.
pub trait IterContainer {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;
/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;

/// Returns an iterator that reads the contents of this container.
fn iter(&self) -> Self::Iter<'_>;
}

/// A container that can drain itself.
///
/// Draining the container presents items in an implementation-specific order.
/// The container is in an undefined state after calling [`drain`]. Dropping
/// the iterator also leaves the container in an undefined state.
pub trait DrainContainer {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider whether DrainContainer should have a clear function. It seems the best place if we'd like to have one.

We could also change the requirement for DrainContainer to be empty after calling drain, but that'd complicate the implementations of DrainContainer as the iterator needs to clear on drop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense (inventing trouble) to have RefIter and OwnIter? Partly to break the "drain" association of "it ends up empty", which is true only for Vec I think.

Alternately, it feels a bit like these might be requirements that &Self: IntoIterator and &mut Self: IntoIterator, without introducing new traits. Though the traits may be easier to state than then for<'a> bounds you'd need to state the above things. I think we can defer that to later though!

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;
/// Iterator type when draining the container.
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;

/// Returns an iterator that drains the contents of this container.
/// Drain leaves the container in an undefined state.
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
pub trait SizableContainer {
/// Indicates that the container is "full" and should be shipped.
fn at_capacity(&self) -> bool;
/// Restores `self` to its desired capacity, if it has one.
///
/// The `stash` argument is available, and may have the intended capacity.
/// However, it may be non-empty, and may be of the wrong capacity. The
/// method should guard against these cases.
fn ensure_capacity(&mut self, stash: &mut Option<Self>);
///
/// Assume that the `stash` is in an undefined state, and properly clear it
/// before re-using it.
fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
}

/// A container that can absorb items of a specific type.
Expand All @@ -95,14 +90,19 @@ pub trait PushInto<T> {
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
/// any remaining elements.
///
/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
/// but they should ensure that they clear the contents before doing so.
///
/// For example, a consolidating builder can aggregate differences in-place, but it has
/// to ensure that it preserves the intended information.
///
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
// all over Timely. `'static` because we don't want lifetimes everywhere.
type Container: Accountable + Default + Clone + 'static;
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
Expand All @@ -124,8 +124,8 @@ pub trait ContainerBuilder: Default + 'static {

/// A wrapper trait indicating that the container building will preserve the number of records.
///
/// Specifically, the sum of lengths of all extracted and finished containers must equal the
/// number of times that `push_into` is called on the container builder.
/// Specifically, the sum of record counts of all extracted and finished containers must equal the
/// number of accounted records that are pushed into the container builder.
/// If you have any questions about this trait you are best off not implementing it.
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }

Expand All @@ -145,14 +145,14 @@ pub struct CapacityContainerBuilder<C>{
pending: VecDeque<C>,
}

impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
#[inline]
fn push_into(&mut self, item: T) {
// Ensure capacity
self.current.ensure_capacity(&mut self.empty);

// Push item
self.current.push(item);
self.current.push_into(item);

// Maybe flush
if self.current.at_capacity() {
Expand All @@ -161,7 +161,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
}
}

impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
impl<C: Accountable + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
Expand All @@ -184,31 +184,25 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
}
}

impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
impl<C: Accountable + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }

impl<T> Container for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;

fn len(&self) -> usize {
Vec::len(self)
}

fn is_empty(&self) -> bool {
Vec::is_empty(self)
}

fn clear(&mut self) { Vec::clear(self) }
impl<T> Accountable for Vec<T> {
#[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
}

impl<T> IterContainer for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
#[inline] fn iter(&self) -> Self::Iter<'_> {
self.as_slice().iter()
}
}

impl<T> DrainContainer for Vec<T> {
type Item<'a> = T where T: 'a;
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
self.drain(..)
}
}
Expand Down Expand Up @@ -255,81 +249,43 @@ mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::Container;
use crate::{IterContainer, DrainContainer};

impl<T: Container> Container for Rc<T> {
impl<T: crate::Accountable> crate::Accountable for Rc<T> {
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Rc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: IterContainer> DrainContainer for Rc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
}
}

mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::Container;
use crate::{IterContainer, DrainContainer};

impl<T: Container> Container for Arc<T> {
impl<T: crate::Accountable> crate::Accountable for Arc<T> {
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Arc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: IterContainer> DrainContainer for Arc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
}
}

Expand Down
34 changes: 14 additions & 20 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Wordcount based on the `columnar` crate.

use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};
use std::collections::HashMap;

use timely::container::{IterContainer, CapacityContainerBuilder};
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::InputHandleCore;
use timely::dataflow::operators::{Inspect, Operator, Probe};
use timely::dataflow::ProbeHandle;

// Creates `WordCountContainer` and `WordCountReference` structs,
// as well as various implementations relating them to `WordCount`.
Expand Down Expand Up @@ -177,21 +176,16 @@ mod container {
}
}

impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
fn len(&self) -> usize { self.borrow().len() }
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
}
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }

}
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<T, C: Container> Message<T, C> {
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element. The buffer is cleared.
/// leaves in place, or the container's default element. The buffer is left in an undefined state.
#[inline]
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

Expand All @@ -51,7 +51,6 @@ impl<T, C: Container> Message<T, C> {

if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
}
}
}
Expand Down
Loading
Loading