Skip to content
192 changes: 71 additions & 121 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,55 @@

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// An object with effects on progress
///
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
/// The object stores a number of updates and thus is able to describe it count
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
/// update count is zero.
///
/// A container must implement default. The default implementation is not required to allocate
/// memory for variable-length components.
///
/// We require the container to be cloneable to enable efficient copies when providing references
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
/// is efficient (which is not necessarily the case when deriving `Clone`.)
pub trait Container: Default {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
#[inline]
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}

/// The number of elements in this container
/// It must implement default for historic reason. The default implementation is not required
/// to allocate memory for variable-length components.
// TODO: Remove `Default` requirement in the future.
// The container is `Default` because `CapacityContainerBuilder` only implements `ContainerBuilder`
// for containers that implement `Default`, and we use the associated `::Container` all over Timely.
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
// satisfied.
pub trait WithProgress {
/// The number of updates
///
/// This number is used in progress tracking to confirm the receipt of some number
/// of outstanding records, and it is highly load bearing. The main restriction is
/// of outstanding updates, and it is highly load bearing. The main restriction is
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
/// must preserve the number of items.
fn len(&self) -> usize;

/// Determine if the container contains any elements, corresponding to `len() == 0`.
fn is_empty(&self) -> bool {
self.len() == 0
}
fn update_count(&self) -> i64;

/// Remove all contents from `self` while retaining allocated memory.
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
#[inline] fn is_empty(&self) -> bool { self.update_count() == 0 }
}

/// TODO
pub trait IterContainer {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;
/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;

/// Returns an iterator that reads the contents of this container.
fn iter(&self) -> Self::Iter<'_>;
}

/// TODO
pub trait DrainContainer {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider whether DrainContainer should have a clear function. It seems the best place if we'd like to have one.

We could also change the requirement for DrainContainer to be empty after calling drain, but that'd complicate the implementations of DrainContainer as the iterator needs to clear on drop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense (inventing trouble) to have RefIter and OwnIter? Partly to break the "drain" association of "it ends up empty", which is true only for Vec I think.

Alternately, it feels a bit like these might be requirements that &Self: IntoIterator and &mut Self: IntoIterator, without introducing new traits. Though the traits may be easier to state than then for<'a> bounds you'd need to state the above things. I think we can defer that to later though!

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;
/// Iterator type when draining the container.
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;

/// Returns an iterator that drains the contents of this container.
/// Drain leaves the container in an undefined state.
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
pub trait SizableContainer: Sized {
/// Indicates that the container is "full" and should be shipped.
fn at_capacity(&self) -> bool;
/// Restores `self` to its desired capacity, if it has one.
Expand Down Expand Up @@ -102,7 +94,9 @@ pub trait PushInto<T> {
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
// all over Timely. `'static` because we don't want lifetimes everywhere.
type Container: WithProgress + Default + Clone + 'static;
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
Expand All @@ -116,14 +110,14 @@ pub trait ContainerBuilder: Default + 'static {
/// Partitions `container` among `builders`, using the function `index` to direct items.
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
Self::Container: DrainContainer,
Self: for<'a> PushInto<<Self::Container as DrainContainer>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as DrainContainer>::Item<'a>) -> usize,
{
for datum in container.drain() {
let index = index(&datum);
builders[index].push_into(datum);
}
container.clear();
}

/// Indicates a good moment to release resources.
Expand Down Expand Up @@ -158,14 +152,14 @@ pub struct CapacityContainerBuilder<C>{
pending: VecDeque<C>,
}

impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
#[inline]
fn push_into(&mut self, item: T) {
// Ensure capacity
self.current.ensure_capacity(&mut self.empty);

// Push item
self.current.push(item);
self.current.push_into(item);

// Maybe flush
if self.current.at_capacity() {
Expand All @@ -174,7 +168,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
}
}

impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
impl<C: WithProgress + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
Expand All @@ -197,31 +191,25 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
}
}

impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }

impl<T> Container for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;

fn len(&self) -> usize {
Vec::len(self)
}

fn is_empty(&self) -> bool {
Vec::is_empty(self)
}
impl<C: WithProgress + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }

fn clear(&mut self) { Vec::clear(self) }
impl<T> WithProgress for Vec<T> {
#[inline] fn update_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
}

impl<T> IterContainer for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
#[inline] fn iter(&self) -> Self::Iter<'_> {
self.as_slice().iter()
}
}

impl<T> DrainContainer for Vec<T> {
type Item<'a> = T where T: 'a;
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
self.drain(..)
}
}
Expand Down Expand Up @@ -268,81 +256,43 @@ mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::Container;
use crate::{WithProgress, IterContainer, DrainContainer};

impl<T: Container> Container for Rc<T> {
impl<T: WithProgress> WithProgress for Rc<T> {
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Rc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: IterContainer> DrainContainer for Rc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
}
}

mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::Container;
use crate::{WithProgress, IterContainer, DrainContainer};

impl<T: Container> Container for Arc<T> {
impl<T: WithProgress> WithProgress for Arc<T> {
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
}
impl<T: IterContainer> IterContainer for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Arc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: IterContainer> DrainContainer for Arc<T> {
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type DrainIter<'a> = T::Iter<'a> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
}
}

Expand Down
34 changes: 14 additions & 20 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Wordcount based on the `columnar` crate.

use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};
use std::collections::HashMap;

use timely::container::{IterContainer, CapacityContainerBuilder};
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::InputHandleCore;
use timely::dataflow::operators::{Inspect, Operator, Probe};
use timely::dataflow::ProbeHandle;

// Creates `WordCountContainer` and `WordCountReference` structs,
// as well as various implementations relating them to `WordCount`.
Expand Down Expand Up @@ -177,21 +176,16 @@ mod container {
}
}

impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
fn len(&self) -> usize { self.borrow().len() }
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

impl<C: columnar::ContainerBytes> timely::WithProgress for Column<C> {
#[inline] fn update_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
}
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }

}
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }
Expand Down
1 change: 0 additions & 1 deletion timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl<T, C: Container> Message<T, C> {

if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
}
}
}
Expand Down
Loading
Loading