Skip to content

Commit bd12951

Browse files
committed
Allow custom exertion logic
1 parent 2b9ac68 commit bd12951

File tree

6 files changed

+81
-57
lines changed

6 files changed

+81
-57
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -563,15 +563,22 @@ where
563563
// Capabilities for the lower envelope of updates in `batcher`.
564564
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
565565

566-
let (activator, effort) =
566+
let activator = Some(self.scope().activator_for(&info.address[..]));
567+
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
568+
569+
// If idle merge effort exists, configure aggressive idle merging logic.
567570
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
568-
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
571+
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
572+
let mut non_empty = 0;
573+
for (_index, count, length) in batches {
574+
if count > 1 { return Some(effort as usize); }
575+
if length > 0 { non_empty += 1; }
576+
if non_empty > 1 { return Some(effort as usize); }
577+
}
578+
None
579+
})));
569580
}
570-
else {
571-
(None, None)
572-
};
573581

574-
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
575582
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
576583

577584
*reader = Some(reader_local);
@@ -672,9 +679,7 @@ where
672679
prev_frontier.extend(input.frontier().frontier().iter().cloned());
673680
}
674681

675-
if let Some(mut fuel) = effort.clone() {
676-
writer.exert(&mut fuel);
677-
}
682+
writer.exert();
678683
}
679684
})
680685
};

src/operators/arrange/upsert.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,25 @@ where
165165
register.get::<::logging::DifferentialEvent>("differential/arrange")
166166
};
167167

168-
// Establish compaction effort to apply even without updates.
169-
let (activator, effort) =
170-
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
171-
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
172-
}
173-
else {
174-
(None, None)
175-
};
176-
177168
// Tracks the lower envelope of times in `priority_queue`.
178169
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
179170
let mut buffer = Vec::new();
180171
// Form the trace we will both use internally and publish.
181-
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
172+
let activator = Some(stream.scope().activator_for(&info.address[..]));
173+
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
174+
// If idle merge effort exists, configure aggressive idle merging logic.
175+
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
176+
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
177+
let mut non_empty = 0;
178+
for (_index, count, length) in batches {
179+
if count > 1 { return Some(effort as usize); }
180+
if length > 0 { non_empty += 1; }
181+
if non_empty > 1 { return Some(effort as usize); }
182+
}
183+
None
184+
})));
185+
}
186+
182187
let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
183188
// Capture the reader outside the builder scope.
184189
*reader = Some(reader_local.clone());
@@ -334,9 +339,7 @@ where
334339
reader_local.set_physical_compaction(prev_frontier.borrow());
335340
}
336341

337-
if let Some(mut fuel) = effort.clone() {
338-
writer.exert(&mut fuel);
339-
}
342+
writer.exert();
340343
}
341344
})
342345
};

src/operators/arrange/writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ where
5252
}
5353

5454
/// Exerts merge effort, even without additional updates.
55-
pub fn exert(&mut self, fuel: &mut isize) {
55+
pub fn exert(&mut self) {
5656
if let Some(trace) = self.trace.upgrade() {
57-
trace.borrow_mut().trace.exert(fuel);
57+
trace.borrow_mut().trace.exert();
5858
}
5959
}
6060

src/operators/reduce.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -351,17 +351,21 @@ where
351351
register.get::<::logging::DifferentialEvent>("differential/arrange")
352352
};
353353

354-
// Determine if we should regularly exert the trace maintenance machinery,
355-
// and with what amount of effort each time.
356-
let (activator, effort) =
354+
let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
355+
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
356+
// If idle merge effort exists, configure aggressive idle merging logic.
357357
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
358-
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
358+
empty.set_exert_logic(Some(Box::new(move |batches| {
359+
let mut non_empty = 0;
360+
for (_index, count, length) in batches {
361+
if count > 1 { return Some(effort as usize); }
362+
if length > 0 { non_empty += 1; }
363+
if non_empty > 1 { return Some(effort as usize); }
364+
}
365+
None
366+
})));
359367
}
360-
else {
361-
(None, None)
362-
};
363368

364-
let empty = T2::new(operator_info.clone(), logger.clone(), activator);
365369
let mut source_trace = self.trace.clone();
366370

367371
let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
@@ -629,9 +633,7 @@ where
629633
}
630634

631635
// Exert trace maintenance if we have been so requested.
632-
if let Some(mut fuel) = effort.clone() {
633-
output_writer.exert(&mut fuel);
634-
}
636+
output_writer.exert();
635637
}
636638
}
637639
)

src/trace/implementations/spine_fueled.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ pub struct Spine<B: Batch> where B::Time: Lattice+Ord, B::R: Semigroup {
9797
upper: Antichain<B::Time>,
9898
effort: usize,
9999
activator: Option<timely::scheduling::activate::Activator>,
100+
/// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
101+
exert_logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>,
100102
}
101103

102104
impl<B> TraceReader for Spine<B>
@@ -264,22 +266,21 @@ where
264266

265267
/// Apply some amount of effort to trace maintenance.
266268
///
267-
/// The units of effort are updates, and the method should be
268-
/// thought of as analogous to inserting as many empty updates,
269-
/// where the trace is permitted to perform proportionate work.
270-
fn exert(&mut self, effort: &mut isize) {
269+
/// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
270+
fn exert(&mut self) {
271271
// If there is work to be done, ...
272272
self.tidy_layers();
273-
if !self.reduced() {
273+
// Determine whether we should apply effort independent of updates.
274+
if let Some(effort) = self.exert_effort() {
274275

275276
// If any merges exist, we can directly call `apply_fuel`.
276277
if self.merging.iter().any(|b| b.is_double()) {
277-
self.apply_fuel(effort);
278+
self.apply_fuel(&mut (effort as isize));
278279
}
279280
// Otherwise, we'll need to introduce fake updates to move merges along.
280281
else {
281282
// Introduce an empty batch with roughly *effort number of virtual updates.
282-
let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize;
283+
let level = effort.next_power_of_two().trailing_zeros() as usize;
283284
self.introduce_batch(None, level);
284285
}
285286
// We were not in reduced form, so let's check again in the future.
@@ -289,6 +290,10 @@ where
289290
}
290291
}
291292

293+
fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>) {
294+
self.exert_logic = logic;
295+
}
296+
292297
// Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
293298
// merging the batch. This means it is a good time to perform amortized work proportional
294299
// to the size of batch.
@@ -388,19 +393,20 @@ where
388393
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
389394
B::R: Semigroup,
390395
{
391-
/// True iff there is at most one non-empty batch in `self.merging`.
396+
/// Determine the amount of effort we should exert in the absence of updates.
392397
///
393-
/// When true, there is no maintenance work to perform in the trace, other than compaction.
394-
/// We do not yet have logic in place to determine if compaction would improve a trace, so
395-
/// for now we are ignoring that.
396-
fn reduced(&self) -> bool {
397-
let mut non_empty = 0;
398-
for index in 0 .. self.merging.len() {
399-
if self.merging[index].is_double() { return false; }
400-
if self.merging[index].len() > 0 { non_empty += 1; }
401-
if non_empty > 1 { return false; }
402-
}
403-
true
398+
/// This method prepares an iterator over batches, including the level, count, and length of each layer.
399+
/// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
400+
fn exert_effort(&self) -> Option<usize> {
401+
self.exert_logic.as_ref().and_then(|l| (**l)(
402+
Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
403+
match batch {
404+
MergeState::Vacant => (index, 0, 0),
405+
MergeState::Single(_) => (index, 1, batch.len()),
406+
MergeState::Double(_) => (index, 2, batch.len()),
407+
}
408+
}))
409+
))
404410
}
405411

406412
/// Describes the merge progress of layers in the trace.
@@ -443,6 +449,7 @@ where
443449
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
444450
effort,
445451
activator,
452+
exert_logic: None,
446453
}
447454
}
448455

@@ -483,7 +490,7 @@ where
483490
}
484491

485492
// Having performed all of our work, if more than one batch remains reschedule ourself.
486-
if !self.reduced() {
493+
if !self.exert_effort().is_some() {
487494
if let Some(activator) = &self.activator {
488495
activator.activate();
489496
}

src/trace/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,15 @@ where <Self as TraceReader>::Batch: Batch {
208208
activator: Option<timely::scheduling::activate::Activator>,
209209
) -> Self;
210210

211-
/// Exert merge effort, even without updates.
212-
fn exert(&mut self, effort: &mut isize);
211+
/// Exert merge effort, even without updates.
212+
fn exert(&mut self);
213+
214+
/// Sets the logic for exertion in the absence of updates.
215+
///
216+
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
217+
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
218+
/// updates to perform, or `None` if no work is required.
219+
fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>);
213220

214221
/// Introduces a batch of updates to the trace.
215222
///

0 commit comments

Comments
 (0)