Skip to content

Commit b7524c7

Browse files
committed
Merge batcher announce records
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 3f3fc26 commit b7524c7

File tree

4 files changed

+148
-77
lines changed

4 files changed

+148
-77
lines changed

src/logging.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { D
5252
pub struct BatcherEvent {
5353
/// Operator identifier.
5454
pub operator: usize,
55+
/// Change in records.
56+
pub records_diff: isize,
5557
/// Change in used size.
5658
pub size_diff: isize,
5759
/// Change in capacity.

src/trace/implementations/merge_batcher.rs

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use timely::logging_core::Logger;
88
use timely::progress::{frontier::Antichain, Timestamp};
99

1010
use crate::difference::Semigroup;
11-
use crate::logging::DifferentialEvent;
11+
use crate::logging::{BatcherEvent, DifferentialEvent};
1212
use crate::trace::{Batcher, Builder};
1313

1414
/// Creates batches from unordered tuples.
@@ -28,11 +28,11 @@ where
2828
type Item = ((K,V),T,D);
2929
type Time = T;
3030

31-
fn new(_logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, _operator_id: usize) -> Self {
31+
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
3232
MergeBatcher {
33-
sorter: MergeSorter::new(),
33+
sorter: MergeSorter::new(logger, operator_id),
3434
frontier: Antichain::new(),
35-
lower: Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()),
35+
lower: Antichain::from_elem(T::minimum()),
3636
}
3737
}
3838

@@ -134,20 +134,23 @@ where
134134
self.sorter.push(&mut buffer);
135135
}
136136

137-
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
137+
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
138138
self.lower = upper;
139139
seal
140140
}
141141

142-
// the frontier of elements remaining after the most recent call to `self.seal`.
142+
/// The frontier of elements remaining after the most recent call to `self.seal`.
143143
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
144144
self.frontier.borrow()
145145
}
146146
}
147147

148148
struct MergeSorter<D, T, R> {
149-
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
149+
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
150+
queue: Vec<Vec<Vec<(D, T, R)>>>,
150151
stash: Vec<Vec<(D, T, R)>>,
152+
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
153+
operator_id: usize,
151154
}
152155

153156
impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
@@ -166,21 +169,20 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
166169
}
167170

168171
#[inline]
169-
pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } }
172+
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
173+
Self {
174+
logger,
175+
operator_id,
176+
queue: Vec::new(),
177+
stash: Vec::new(),
178+
}
179+
}
170180

171181
#[inline]
172182
pub fn empty(&mut self) -> Vec<(D, T, R)> {
173183
self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size()))
174184
}
175185

176-
#[inline(never)]
177-
pub fn _sort(&mut self, list: &mut Vec<Vec<(D, T, R)>>) {
178-
for mut batch in list.drain(..) {
179-
self.push(&mut batch);
180-
}
181-
self.finish_into(list);
182-
}
183-
184186
#[inline]
185187
pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
186188
// TODO: Reason about possible unbounded stash growth. How to / should we return them?
@@ -194,12 +196,13 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
194196

195197
if !batch.is_empty() {
196198
crate::consolidation::consolidate_updates(&mut batch);
197-
self.queue.push(vec![batch]);
199+
self.account([batch.len()], 1);
200+
self.queue_push(vec![batch]);
198201
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
199-
let list1 = self.queue.pop().unwrap();
200-
let list2 = self.queue.pop().unwrap();
202+
let list1 = self.queue_pop().unwrap();
203+
let list2 = self.queue_pop().unwrap();
201204
let merged = self.merge_by(list1, list2);
202-
self.queue.push(merged);
205+
self.queue_push(merged);
203206
}
204207
}
205208
}
@@ -208,31 +211,32 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
208211
// to break it down to be so.
209212
pub fn push_list(&mut self, list: Vec<Vec<(D, T, R)>>) {
210213
while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
211-
let list1 = self.queue.pop().unwrap();
212-
let list2 = self.queue.pop().unwrap();
214+
let list1 = self.queue_pop().unwrap();
215+
let list2 = self.queue_pop().unwrap();
213216
let merged = self.merge_by(list1, list2);
214-
self.queue.push(merged);
217+
self.queue_push(merged);
215218
}
216-
self.queue.push(list);
219+
self.queue_push(list);
217220
}
218221

219222
#[inline(never)]
220223
pub fn finish_into(&mut self, target: &mut Vec<Vec<(D, T, R)>>) {
221224
while self.queue.len() > 1 {
222-
let list1 = self.queue.pop().unwrap();
223-
let list2 = self.queue.pop().unwrap();
225+
let list1 = self.queue_pop().unwrap();
226+
let list2 = self.queue_pop().unwrap();
224227
let merged = self.merge_by(list1, list2);
225-
self.queue.push(merged);
228+
self.queue_push(merged);
226229
}
227230

228-
if let Some(mut last) = self.queue.pop() {
231+
if let Some(mut last) = self.queue_pop() {
229232
::std::mem::swap(&mut last, target);
230233
}
231234
}
232235

233236
// merges two sorted input lists into one sorted output list.
234237
#[inline(never)]
235238
fn merge_by(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
239+
self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1);
236240

237241
use std::cmp::Ordering;
238242

@@ -307,3 +311,46 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
307311
output
308312
}
309313
}
314+
315+
impl<D, T, R> MergeSorter<D, T, R> {
316+
/// Pop a batch from `self.queue` and account size changes.
317+
#[inline]
318+
fn queue_pop(&mut self) -> Option<Vec<Vec<(D, T, R)>>> {
319+
let batch = self.queue.pop();
320+
self.account(batch.iter().flatten().map(Vec::len), -1);
321+
batch
322+
}
323+
324+
/// Push a batch to `self.queue` and account size changes.
325+
#[inline]
326+
fn queue_push(&mut self, batch: Vec<Vec<(D, T, R)>>) {
327+
self.account(batch.iter().map(Vec::len), 1);
328+
self.queue.push(batch);
329+
}
330+
331+
/// Account size changes. Only performs work if a logger exists.
332+
///
333+
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
334+
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
335+
fn account<I: IntoIterator<Item=usize>>(&self, items: I, diff: isize) {
336+
if let Some(logger) = &self.logger {
337+
let mut records= 0isize;
338+
for len in items {
339+
records = records.saturating_add_unsigned(len);
340+
}
341+
logger.log(BatcherEvent {
342+
operator: self.operator_id,
343+
records_diff: records * diff,
344+
size_diff: 0,
345+
capacity_diff: 0,
346+
allocations_diff: 0,
347+
})
348+
}
349+
}
350+
}
351+
352+
impl<D, T, R> Drop for MergeSorter<D, T, R> {
353+
fn drop(&mut self) {
354+
while self.queue_pop().is_some() { }
355+
}
356+
}

src/trace/implementations/merge_batcher_col.rs

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use crate::trace::{Batcher, Builder};
1414
/// Creates batches from unordered tuples.
1515
pub struct ColumnatedMergeBatcher<K, V, T, D>
1616
where
17-
K: Columnation,
18-
V: Columnation,
19-
T: Columnation,
20-
D: Columnation,
17+
K: Columnation + 'static,
18+
V: Columnation + 'static,
19+
T: Columnation + 'static,
20+
D: Columnation + 'static,
2121
{
2222
sorter: MergeSorterColumnation<(K, V), T, D>,
2323
lower: Antichain<T>,
@@ -132,12 +132,12 @@ where
132132
// Drain buffers (fast reclamation).
133133
self.sorter.clear_stash();
134134

135-
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
135+
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
136136
self.lower = upper;
137137
seal
138138
}
139139

140-
// the frontier of elements remaining after the most recent call to `self.seal`.
140+
/// The frontier of elements remaining after the most recent call to `self.seal`.
141141
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
142142
self.frontier.borrow()
143143
}
@@ -184,8 +184,9 @@ impl<T: Columnation> TimelyStackQueue<T> {
184184
}
185185
}
186186

187-
struct MergeSorterColumnation<D: Columnation, T: Columnation, R: Columnation> {
188-
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
187+
struct MergeSorterColumnation<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> {
188+
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
189+
queue: Vec<Vec<TimelyStack<(D, T, R)>>>,
189190
stash: Vec<TimelyStack<(D, T, R)>>,
190191
pending: Vec<(D, T, R)>,
191192
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
@@ -223,29 +224,6 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
223224
}
224225
}
225226

226-
/// Account size changes. Only performs work if a logger exists.
227-
///
228-
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
229-
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
230-
fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
231-
if let Some(logger) = &self.logger {
232-
let (mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize);
233-
for stack in items {
234-
stack.heap_size(|s, c| {
235-
siz = siz.saturating_add_unsigned(s);
236-
capacity = capacity.saturating_add_unsigned(c);
237-
allocations += isize::from(c > 0);
238-
});
239-
}
240-
logger.log(BatcherEvent {
241-
operator: self.operator_id,
242-
size_diff: siz * diff,
243-
capacity_diff: capacity * diff,
244-
allocations_diff: allocations * diff,
245-
})
246-
}
247-
}
248-
249227
fn empty(&mut self) -> TimelyStack<(D, T, R)> {
250228
self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size()))
251229
}
@@ -290,14 +268,12 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
290268
for tuple in self.pending.drain(..) {
291269
stack.copy(&tuple);
292270
}
293-
self.account([&stack], 1);
294-
self.queue.push(vec![stack]);
271+
self.queue_push(vec![stack]);
295272
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
296-
let list1 = self.queue.pop().unwrap();
297-
let list2 = self.queue.pop().unwrap();
273+
let list1 = self.queue_pop().unwrap();
274+
let list2 = self.queue_pop().unwrap();
298275
let merged = self.merge_by(list1, list2);
299-
self.account(&merged, 1);
300-
self.queue.push(merged);
276+
self.queue_push(merged);
301277
}
302278
}
303279
}
@@ -306,34 +282,31 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
306282
// to break it down to be so.
307283
fn push_list(&mut self, list: Vec<TimelyStack<(D, T, R)>>) {
308284
while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
309-
let list1 = self.queue.pop().unwrap();
310-
let list2 = self.queue.pop().unwrap();
285+
let list1 = self.queue_pop().unwrap();
286+
let list2 = self.queue_pop().unwrap();
311287
let merged = self.merge_by(list1, list2);
312-
self.account(&merged, 1);
313-
self.queue.push(merged);
288+
self.queue_push(merged);
314289
}
315-
self.queue.push(list);
290+
self.queue_push(list);
316291
}
317292

318293
fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
319294
crate::consolidation::consolidate_updates(&mut self.pending);
320295
self.flush_pending();
321296
while self.queue.len() > 1 {
322-
let list1 = self.queue.pop().unwrap();
323-
let list2 = self.queue.pop().unwrap();
297+
let list1 = self.queue_pop().unwrap();
298+
let list2 = self.queue_pop().unwrap();
324299
let merged = self.merge_by(list1, list2);
325-
self.queue.push(merged);
300+
self.queue_push(merged);
326301
}
327302

328-
if let Some(mut last) = self.queue.pop() {
303+
if let Some(mut last) = self.queue_pop() {
329304
std::mem::swap(&mut last, target);
330305
}
331306
}
332307

333308
// merges two sorted input lists into one sorted output list.
334309
fn merge_by(&mut self, list1: Vec<TimelyStack<(D, T, R)>>, list2: Vec<TimelyStack<(D, T, R)>>) -> Vec<TimelyStack<(D, T, R)>> {
335-
self.account(list1.iter().chain(list2.iter()), -1);
336-
337310
use std::cmp::Ordering;
338311

339312
// TODO: `list1` and `list2` get dropped; would be better to reuse?
@@ -411,3 +384,52 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
411384
output
412385
}
413386
}
387+
388+
impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> MergeSorterColumnation<D, T, R> {
389+
/// Pop a batch from `self.queue` and account size changes.
390+
#[inline]
391+
fn queue_pop(&mut self) -> Option<Vec<TimelyStack<(D, T, R)>>> {
392+
let batch = self.queue.pop();
393+
self.account(batch.iter().flatten(), -1);
394+
batch
395+
}
396+
397+
/// Push a batch to `self.queue` and account size changes.
398+
#[inline]
399+
fn queue_push(&mut self, batch: Vec<TimelyStack<(D, T, R)>>) {
400+
self.account(&batch, 1);
401+
self.queue.push(batch);
402+
}
403+
404+
/// Account size changes. Only performs work if a logger exists.
405+
///
406+
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
407+
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
408+
fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
409+
if let Some(logger) = &self.logger {
410+
let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
411+
for stack in items {
412+
records = records.saturating_add_unsigned(stack.len());
413+
stack.heap_size(|s, c| {
414+
siz = siz.saturating_add_unsigned(s);
415+
capacity = capacity.saturating_add_unsigned(c);
416+
allocations += isize::from(c > 0);
417+
});
418+
}
419+
logger.log(BatcherEvent {
420+
operator: self.operator_id,
421+
records_diff: records * diff,
422+
size_diff: siz * diff,
423+
capacity_diff: capacity * diff,
424+
allocations_diff: allocations * diff,
425+
})
426+
}
427+
}
428+
429+
}
430+
431+
impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> Drop for MergeSorterColumnation<D, T, R> {
432+
fn drop(&mut self) {
433+
while self.queue_pop().is_some() { }
434+
}
435+
}

src/trace/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use timely::logging::WorkerIdentifier;
1717
use timely::logging_core::Logger;
1818
use timely::progress::{Antichain, frontier::AntichainRef};
1919
use timely::progress::Timestamp;
20-
use logging::DifferentialEvent;
2120

21+
use crate::logging::DifferentialEvent;
2222
use crate::trace::cursor::MyTrait;
2323

2424
// use ::difference::Semigroup;

0 commit comments

Comments
 (0)