Skip to content

Let Layout specify OffsetContainer #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::OrdKeySpine as KeySpine;

use std::borrow::{ToOwned};
use std::cmp::Ordering;

use timely::container::columnation::{Columnation, TimelyStack};
use crate::lattice::Lattice;
Expand Down Expand Up @@ -97,6 +98,8 @@ pub trait Layout {
/// Container for update vals.
type UpdContainer:
for<'a> BatchContainer<PushItem=(<Self::Target as Update>::Time, <Self::Target as Update>::Diff), ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
/// Container for offsets.
type OffsetContainer: BatchContainer<PushItem=usize>;
}

/// A layout that uses vectors
Expand All @@ -113,6 +116,7 @@ where
type KeyContainer = Vec<U::Key>;
type ValContainer = Vec<U::Val>;
type UpdContainer = Vec<(U::Time, U::Diff)>;
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
Expand All @@ -131,6 +135,7 @@ where
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
type OffsetContainer = OffsetList;
}

/// A type with a preferred container.
Expand Down Expand Up @@ -183,10 +188,13 @@ where
type KeyContainer = K::Container;
type ValContainer = V::Container;
type UpdContainer = Vec<(T, D)>;
type OffsetContainer = OffsetList;
}

use std::convert::TryInto;
use std::ops::Deref;
use abomonation_derive::Abomonation;
use crate::trace::cursor::MyTrait;

/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)]
Expand Down Expand Up @@ -271,6 +279,87 @@ impl OffsetList {
}
}

/// Helper struct to provide `MyTrait` for `Copy` types.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
pub struct Wrapper<T: Copy>(T);

impl<T: Copy> Deref for Wrapper<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper<T> {
type Owned = T;

fn into_owned(self) -> Self::Owned {
self.0
}

fn clone_onto(&self, other: &mut Self::Owned) {
*other = self.0;
}

fn compare(&self, other: &Self::Owned) -> Ordering {
self.0.cmp(other)
}

fn borrow_as(other: &'a Self::Owned) -> Self {
Self(*other)
}
}

impl BatchContainer for OffsetList {
type PushItem = usize;
type ReadItem<'a> = Wrapper<usize>;

fn push(&mut self, item: Self::PushItem) {
self.push(item);
}

fn copy_push(&mut self, item: &Self::PushItem) {
self.push(*item);
}

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.push(item.0);
}

fn copy_slice(&mut self, slice: &[Self::PushItem]) {
for index in slice {
self.push(*index);
}
}

fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for offset in start..end {
self.push(other.index(offset));
}
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}

fn reserve(&mut self, _additional: usize) {
// Nop
}

fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::with_capacity(cont1.len() + cont2.len())
}

fn index(&self, index: usize) -> Self::ReadItem<'_> {
Wrapper(self.index(index))
}

fn len(&self) -> usize {
self.len()
}
}

pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2};

/// Containers for data that resemble `Vec<T>`, with leaner implementations.
Expand All @@ -282,7 +371,7 @@ pub mod containers {
use crate::trace::MyTrait;

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: Default + 'static {
pub trait BatchContainer: 'static {
/// The type of contained item.
///
/// The container only supplies references to the item, so it needn't be sized.
Expand Down
42 changes: 23 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod val_batch {
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update};
Expand All @@ -83,7 +83,7 @@ mod val_batch {
/// Offsets used to provide indexes from keys to values.
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
pub vals: L::ValContainer,
/// Offsets used to provide indexes from values to updates.
Expand All @@ -94,20 +94,20 @@ mod val_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
pub vals_offs: OffsetList,
pub vals_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}

impl<L: Layout> OrdValStorage<L> {
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn values_for_key(&self, index: usize) -> (usize, usize) {
(self.keys_offs.index(index), self.keys_offs.index(index+1))
(self.keys_offs.index(index).into_owned(), self.keys_offs.index(index+1).into_owned())
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
let mut lower = self.vals_offs.index(index);
let upper = self.vals_offs.index(index+1);
let mut lower = self.vals_offs.index(index).into_owned();
let upper = self.vals_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand Down Expand Up @@ -206,14 +206,17 @@ mod val_batch {

let mut storage = OrdValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
};

storage.keys_offs.push(0);
storage.vals_offs.push(0);
// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.push(0);

OrdValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -546,9 +549,9 @@ mod val_batch {
Self {
result: OrdValStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: OffsetList::with_capacity(vals + 1),
vals_offs: L::OffsetContainer::with_capacity(vals + 1),
updates: L::UpdContainer::with_capacity(upds),
},
singleton: None,
Expand Down Expand Up @@ -636,7 +639,7 @@ mod key_batch {
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update};
Expand All @@ -654,16 +657,16 @@ mod key_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}

impl<L: Layout> OrdKeyStorage<L> {
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn updates_for_key(&self, index: usize) -> (usize, usize) {
let mut lower = self.keys_offs.index(index);
let upper = self.keys_offs.index(index+1);
let mut lower = self.keys_offs.index(index).into_owned();
let upper = self.keys_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand Down Expand Up @@ -763,11 +766,12 @@ mod key_batch {

let mut storage = OrdKeyStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
};

storage.keys_offs.push(0);
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);

OrdKeyMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -1011,7 +1015,7 @@ mod key_batch {
Self {
result: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
updates: L::UpdContainer::with_capacity(upds),
},
singleton: None,
Expand Down
31 changes: 17 additions & 14 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ mod val_batch {
use crate::hashable::Hashable;

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, OffsetList};
use crate::trace::implementations::BatchContainer;
use crate::trace::cursor::MyTrait;

use super::{Layout, Update, HashOrdered};
Expand Down Expand Up @@ -122,7 +122,7 @@ mod val_batch {
/// Offsets used to provide indexes from keys to values.
///
/// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
pub keys_offs: OffsetList,
pub keys_offs: L::OffsetContainer,
/// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
pub vals: L::ValContainer,
/// Offsets used to provide indexes from values to updates.
Expand All @@ -133,7 +133,7 @@ mod val_batch {
/// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
///
/// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
pub vals_offs: OffsetList,
pub vals_offs: L::OffsetContainer,
/// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
pub updates: L::UpdContainer,
}
Expand All @@ -144,16 +144,16 @@ mod val_batch {
{
/// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
fn values_for_key(&self, index: usize) -> (usize, usize) {
let lower = self.keys_offs.index(index);
let upper = self.keys_offs.index(index+1);
let lower = self.keys_offs.index(index).into_owned();
let upper = self.keys_offs.index(index+1).into_owned();
// Looking up values for an invalid key indicates something is wrong.
assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
(lower, upper)
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
let mut lower = self.vals_offs.index(index);
let upper = self.vals_offs.index(index+1);
let mut lower = self.vals_offs.index(index).into_owned();
let upper = self.vals_offs.index(index+1).into_owned();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
Expand All @@ -178,7 +178,7 @@ mod val_batch {
// push additional blank entries in.
while self.keys.len() < desired {
// We insert a default (dummy) key and repeat the offset to indicate this.
let current_offset = self.keys_offs.index(self.keys.len());
let current_offset = self.keys_offs.index(self.keys.len()).into_owned();
self.keys.push(Default::default());
self.keys_offs.push(current_offset);
}
Expand Down Expand Up @@ -339,17 +339,20 @@ mod val_batch {

let mut storage = RhhValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
key_count: 0,
key_capacity: rhh_cap,
divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
};

storage.keys_offs.push(0);
storage.vals_offs.push(0);
// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.push(0);

RhhValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -746,9 +749,9 @@ mod val_batch {
Self {
result: RhhValStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: OffsetList::with_capacity(keys + 1),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: OffsetList::with_capacity(vals + 1),
vals_offs: L::OffsetContainer::with_capacity(vals + 1),
updates: L::UpdContainer::with_capacity(upds),
key_count: 0,
key_capacity: rhh_capacity,
Expand Down