Skip to content

Log size/capacity/allocations from the merge batchers #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum DifferentialEvent {
MergeShortfall(MergeShortfall),
/// Trace sharing event.
TraceShare(TraceShare),
/// Batcher size event
Batcher(BatcherEvent),
}

/// Either the start or end of a merge event.
Expand All @@ -45,6 +47,23 @@ pub struct BatchEvent {
impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }


/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
/// Change in records.
pub records_diff: isize,
/// Change in used size.
pub size_diff: isize,
/// Change in capacity.
pub capacity_diff: isize,
/// Change in number of allocations.
pub allocations_diff: isize,
}

impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct DropEvent {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = Tr::Batcher::new();
let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id);

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down
105 changes: 77 additions & 28 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};

use crate::difference::Semigroup;

use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder};

/// Creates batches from unordered tuples.
Expand All @@ -26,11 +28,11 @@ where
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
sorter: MergeSorter::new(logger, operator_id),
frontier: Antichain::new(),
lower: Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()),
lower: Antichain::from_elem(T::minimum()),
}
}

Expand Down Expand Up @@ -132,20 +134,23 @@ where
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
/// The frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}

struct MergeSorter<D, T, R> {
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
queue: Vec<Vec<Vec<(D, T, R)>>>,
stash: Vec<Vec<(D, T, R)>>,
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
operator_id: usize,
}

impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
Expand All @@ -164,21 +169,20 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
}

#[inline]
pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } }
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
queue: Vec::new(),
stash: Vec::new(),
}
}

#[inline]
pub fn empty(&mut self) -> Vec<(D, T, R)> {
self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size()))
}

#[inline(never)]
pub fn _sort(&mut self, list: &mut Vec<Vec<(D, T, R)>>) {
for mut batch in list.drain(..) {
self.push(&mut batch);
}
self.finish_into(list);
}

#[inline]
pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
// TODO: Reason about possible unbounded stash growth. How to / should we return them?
Expand All @@ -192,12 +196,13 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {

if !batch.is_empty() {
crate::consolidation::consolidate_updates(&mut batch);
self.queue.push(vec![batch]);
self.account([batch.len()], 1);
self.queue_push(vec![batch]);
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}
}
}
Expand All @@ -206,31 +211,32 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
// to break it down to be so.
pub fn push_list(&mut self, list: Vec<Vec<(D, T, R)>>) {
while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}
self.queue.push(list);
self.queue_push(list);
}

#[inline(never)]
pub fn finish_into(&mut self, target: &mut Vec<Vec<(D, T, R)>>) {
while self.queue.len() > 1 {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}

if let Some(mut last) = self.queue.pop() {
if let Some(mut last) = self.queue_pop() {
::std::mem::swap(&mut last, target);
}
}

// merges two sorted input lists into one sorted output list.
#[inline(never)]
fn merge_by(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1);

use std::cmp::Ordering;

Expand Down Expand Up @@ -305,3 +311,46 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
output
}
}

impl<D, T, R> MergeSorter<D, T, R> {
/// Pop a batch from `self.queue` and account size changes.
#[inline]
fn queue_pop(&mut self) -> Option<Vec<Vec<(D, T, R)>>> {
let batch = self.queue.pop();
self.account(batch.iter().flatten().map(Vec::len), -1);
batch
}

/// Push a batch to `self.queue` and account size changes.
#[inline]
fn queue_push(&mut self, batch: Vec<Vec<(D, T, R)>>) {
self.account(batch.iter().map(Vec::len), 1);
self.queue.push(batch);
}

/// Account size changes. Only performs work if a logger exists.
///
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
fn account<I: IntoIterator<Item=usize>>(&self, items: I, diff: isize) {
if let Some(logger) = &self.logger {
let mut records= 0isize;
for len in items {
records = records.saturating_add_unsigned(len);
}
logger.log(BatcherEvent {
operator: self.operator_id,
records_diff: records * diff,
size_diff: 0,
capacity_diff: 0,
allocations_diff: 0,
})
}
}
}

impl<D, T, R> Drop for MergeSorter<D, T, R> {
fn drop(&mut self) {
while self.queue_pop().is_some() { }
}
}
Loading