Skip to content

Commit 508caff

Browse files
Clarify Trace capability nomenclature (#308)
* clarify nomenclature * re-introduce deprecated forms of originals * tidy unintended renames
1 parent 2e38abb commit 508caff

File tree

29 files changed

+233
-215
lines changed

29 files changed

+233
-215
lines changed

dogsdogsdogs/src/operators/lookup_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ where
4545
S: FnMut(&D, &R, &Tr::Val, &Tr::R)->(DOut, ROut)+'static,
4646
{
4747
// No need to block physical merging for this operator.
48-
arrangement.trace.distinguish_since(Antichain::new().borrow());
48+
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
4949
let mut propose_trace = Some(arrangement.trace);
5050
let propose_stream = arrangement.stream;
5151

@@ -138,7 +138,7 @@ where
138138
for key in stash.keys() {
139139
frontier.insert(key.time().clone());
140140
}
141-
propose_trace.as_mut().map(|trace| trace.advance_by(frontier.borrow()));
141+
propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
142142

143143
if input1.frontier().is_empty() && stash.is_empty() {
144144
propose_trace = None;

examples/cursors.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ fn main() {
7878
graph.close();
7979
for i in 1..rounds + 1 {
8080
/* Advance the trace frontier to enable trace compaction. */
81-
graph_trace.distinguish_since(AntichainRef::new(&[i]));
82-
graph_trace.advance_by(AntichainRef::new(&[i]));
81+
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
82+
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
8383
worker.step_while(|| probe.less_than(&i));
8484
dump_cursor(i, worker.index(), &mut graph_trace);
8585
}
@@ -93,8 +93,8 @@ fn main() {
9393
}
9494
graph.advance_to(i);
9595
graph.flush();
96-
graph_trace.distinguish_since(AntichainRef::new(&[i]));
97-
graph_trace.advance_by(AntichainRef::new(&[i]));
96+
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
97+
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
9898
worker.step_while(|| probe.less_than(graph.time()));
9999
dump_cursor(i, worker.index(), &mut graph_trace);
100100
}

examples/multitemporal.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use pair::Pair;
2222

2323
fn main() {
2424

25-
timely::execute_from_args(std::env::args(), move |worker| {
25+
timely::execute_from_args(std::env::args(), move |worker| {
2626

2727
// Used to determine if our output has caught up to our input.
2828
let mut probe: ProbeHandle<Pair<isize, isize>> = ProbeHandle::new();
@@ -49,7 +49,7 @@ fn main() {
4949
});
5050

5151
// Do not hold back physical compaction.
52-
trace.distinguish_since(AntichainRef::new(&[]));
52+
trace.set_physical_compaction(AntichainRef::new(&[]));
5353

5454
println!("Multi-temporal histogram; valid commands are (integer arguments):");
5555
println!(" update value time1 time2 change");
@@ -86,22 +86,22 @@ fn main() {
8686
},
8787
("advance-output", 2) => {
8888
let time = Pair::new(arguments[0], arguments[1]);
89-
if trace.advance_frontier().less_equal(&time) {
90-
trace.advance_by(AntichainRef::new(&[time]));
89+
if trace.get_logical_compaction().less_equal(&time) {
90+
trace.set_logical_compaction(AntichainRef::new(&[time]));
9191
while probe.less_than(capability.time()) {
9292
worker.step();
9393
}
9494
} else {
95-
println!("Requested time {:?} not readable (output from {:?})", time, trace.advance_frontier());
95+
println!("Requested time {:?} not readable (output from {:?})", time, trace.get_logical_compaction());
9696
}
9797
},
9898
("query", 2) => {
9999
// Check that the query times are not beyond the current capabilities.
100100
let query_time = Pair::new(arguments[0], arguments[1]);
101101
if capability.time().less_equal(&query_time) {
102102
println!("Query time ({:?}) is still open (input from {:?}).", query_time, capability.time());
103-
} else if !trace.advance_frontier().less_equal(&query_time) {
104-
println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.advance_frontier());
103+
} else if !trace.get_logical_compaction().less_equal(&query_time) {
104+
println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.get_logical_compaction());
105105
}
106106
else {
107107
println!("Report at {:?}", query_time);

interactive/src/manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,13 @@ impl<V: ExchangeData+Hash+Datum> TraceManager<V> {
187187
use timely::progress::frontier::Antichain;
188188
let frontier = Antichain::from_elem(time.clone());
189189
for trace in self.inputs.values_mut() {
190-
trace.advance_by(frontier.borrow());
191-
trace.distinguish_since(frontier.borrow());
190+
trace.set_logical_compaction(frontier.borrow());
191+
trace.set_physical_compaction(frontier.borrow());
192192
}
193193
for map in self.arrangements.values_mut() {
194194
for trace in map.values_mut() {
195-
trace.advance_by(frontier.borrow());
196-
trace.distinguish_since(frontier.borrow());
195+
trace.set_logical_compaction(frontier.borrow());
196+
trace.set_physical_compaction(frontier.borrow());
197197
}
198198
}
199199
}

mdbook/src/chapter_5/chapter_5_3.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ When we extract a trace from an arrangement, we acquire the ability to replay th
9494

9595
A `TraceHandle` (the type of `trace`) has two important methods. Their names are not great, and subject to change in the future. Their idioms may also change as more information flows in about users and use cases.
9696

97-
1. `advance_by(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced.
97+
1. `set_logical_compaction(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced.
9898

99-
2. `distinguish_since(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.distinguish_since(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method.
99+
2. `set_physical_compaction(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.set_physical_compaction(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method.

server/dataflows/random_graph/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
4646
// operator holds only a weak reference to it.
4747
//
4848
// The operator also holds an `Weak<RefCell<Option<TraceHandle>>>` which it will
49-
// attempt to borrow and call `advance_by` in order to advance the capability
49+
// attempt to borrow and call `set_logical_compaction` in order to advance the capability
5050
// as it runs, to allow compaction and the maintenance of bounded state.
5151

5252
if args.len() != 4 { return Err(format!("expected four arguments, instead: {:?}", args)); }
@@ -134,7 +134,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
134134
if let Some(trace_handle) = trace_handle_weak.upgrade() {
135135
let mut borrow = trace_handle.borrow_mut();
136136
if let Some(ref mut trace_handle) = borrow.as_mut() {
137-
trace_handle.advance_by(&[elapsed_ns]);
137+
trace_handle.set_logical_compaction(&[elapsed_ns]);
138138
}
139139
}
140140

@@ -191,7 +191,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
191191
.trace;
192192

193193
// release all blocks on merging.
194-
trace.distinguish_since(&[]);
194+
trace.set_physical_compaction(&[]);
195195
*trace_handle.borrow_mut() = Some(trace);
196196

197197
handles.set::<Rc<RefCell<Option<TraceHandle>>>>(name.to_owned(), trace_handle);

src/operators/arrange/agent.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,20 @@ where
5454
type Batch = Tr::Batch;
5555
type Cursor = Tr::Cursor;
5656

57-
fn advance_by(&mut self, frontier: AntichainRef<Tr::Time>) {
58-
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), frontier);
57+
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
58+
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier);
5959
self.advance.clear();
6060
self.advance.extend(frontier.iter().cloned());
6161
}
62-
fn advance_frontier(&mut self) -> AntichainRef<Tr::Time> {
62+
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
6363
self.advance.borrow()
6464
}
65-
fn distinguish_since(&mut self, frontier: AntichainRef<Tr::Time>) {
65+
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
6666
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier);
6767
self.through.clear();
6868
self.through.extend(frontier.iter().cloned());
6969
}
70-
fn distinguish_frontier(&mut self) -> AntichainRef<Tr::Time> {
70+
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
7171
self.through.borrow()
7272
}
7373
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
@@ -99,7 +99,7 @@ where
9999
let reader = TraceAgent {
100100
trace: trace.clone(),
101101
queues: Rc::downgrade(&queues),
102-
advance: trace.borrow().advance_frontiers.frontier().to_owned(),
102+
advance: trace.borrow().get_logical_compactions.frontier().to_owned(),
103103
through: trace.borrow().through_frontiers.frontier().to_owned(),
104104
operator,
105105
logging,
@@ -163,13 +163,13 @@ where
163163
/// are no longer evident.
164164
///
165165
/// The current behavior is that the introduced collection accumulates updates to some times less or equal
166-
/// to `self.advance_frontier()`. There is *not* currently a guarantee that the updates are accumulated *to*
166+
/// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
167167
/// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
168168
/// the historical collection may move through configurations that did not actually occur, even if eventually
169169
/// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
170170
/// the intermediate computation could do something that the original computation did not, like diverge.
171171
///
172-
/// I would expect the semantics to improve to "updates are advanced to `self.advance_frontier()`", which
172+
/// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
173173
/// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
174174
/// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
175175
/// to advance times before using them).
@@ -341,7 +341,7 @@ where
341341

342342
/// Imports an arrangement into the supplied scope.
343343
///
344-
/// This variant of import uses the `advance_frontier` to forcibly advance timestamps in updates.
344+
/// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
345345
///
346346
/// # Examples
347347
///
@@ -382,7 +382,7 @@ where
382382
/// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
383383
/// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
384384
///
385-
/// trace.advance_by(AntichainRef::new(&[5]));
385+
/// trace.set_logical_compaction(AntichainRef::new(&[5]));
386386
///
387387
/// // create a second dataflow
388388
/// let mut shutdown = worker.dataflow(|scope| {
@@ -418,7 +418,7 @@ where
418418
Tr: TraceReader,
419419
{
420420
// This frontier describes our only guarantee on the compaction frontier.
421-
let frontier = self.advance_frontier().to_owned();
421+
let frontier = self.get_logical_compaction().to_owned();
422422
self.import_frontier_core(scope, name, frontier)
423423
}
424424

@@ -533,7 +533,7 @@ where
533533

534534
// increase counts for wrapped `TraceBox`.
535535
let empty_frontier = Antichain::new();
536-
self.trace.borrow_mut().adjust_advance_frontier(empty_frontier.borrow(), self.advance.borrow());
536+
self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow());
537537
self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow());
538538

539539
TraceAgent {
@@ -562,7 +562,7 @@ where
562562

563563
// decrement borrow counts to remove all holds
564564
let empty_frontier = Antichain::new();
565-
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), empty_frontier.borrow());
565+
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow());
566566
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow());
567567
}
568568
}

src/operators/arrange/arrangement.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ where
278278
queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {
279279

280280
let mut trace = Some(self.trace.clone());
281-
// release `distinguish_since` capability.
282-
trace.as_mut().unwrap().distinguish_since(Antichain::new().borrow());
281+
// release `set_physical_compaction` capability.
282+
trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow());
283283

284284
let mut stash = Vec::new();
285285
let mut capability: Option<Capability<G::Timestamp>> = None;
@@ -413,7 +413,7 @@ where
413413
].into_iter().cloned().filter_map(|t| t).min();
414414

415415
if let Some(frontier) = frontier {
416-
trace.as_mut().map(|t| t.advance_by(AntichainRef::new(&[frontier])));
416+
trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier])));
417417
}
418418
else {
419419
trace = None;

src/operators/arrange/upsert.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,8 @@ where
331331
input_frontier.extend(input.frontier().frontier().iter().cloned());
332332

333333
// Downgrade capabilities for `reader_local`.
334-
reader_local.advance_by(input_frontier.borrow());
335-
reader_local.distinguish_since(input_frontier.borrow());
334+
reader_local.set_logical_compaction(input_frontier.borrow());
335+
reader_local.set_physical_compaction(input_frontier.borrow());
336336
}
337337

338338
if let Some(mut fuel) = effort.clone() {

src/operators/count.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ where
124124

125125
// tidy up the shared input trace.
126126
trace.advance_upper(&mut upper_limit);
127-
trace.advance_by(upper_limit.borrow());
128-
trace.distinguish_since(upper_limit.borrow());
127+
trace.set_logical_compaction(upper_limit.borrow());
128+
trace.set_physical_compaction(upper_limit.borrow());
129129
}
130130
})
131131
.as_collection()

src/operators/join.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
360360
if !batch1.is_empty() {
361361
if let Some(acknowledged2) = &acknowledged2 {
362362
// TODO : cursor_through may be problematic for pre-merged traces.
363-
// A trace should provide the contract that whatever its `distinguish_since` capability,
363+
// A trace should provide the contract that whatever its `set_physical_compaction` capability,
364364
// it is safe (and reasonable) to await delivery of batches up through that frontier.
365365
// In this case, we should be able to await (not block on) the arrival of these batches.
366366
let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
@@ -395,7 +395,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
395395
if !batch2.is_empty() {
396396
if let Some(acknowledged1) = &acknowledged1 {
397397
// TODO : cursor_through may be problematic for pre-merged traces.
398-
// A trace should provide the contract that whatever its `distinguish_since` capability,
398+
// A trace should provide the contract that whatever its `set_physical_compaction` capability,
399399
// it is safe (and reasonable) to await delivery of batches up through that frontier.
400400
// In this case, we should be able to await (not block on) the arrival of these batches.
401401
let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
@@ -450,28 +450,28 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
450450
// shut down or advance trace2.
451451
if trace2.is_some() && input1.frontier().is_empty() { trace2 = None; }
452452
if let Some(ref mut trace2) = trace2 {
453-
trace2.advance_by(input1.frontier().frontier());
453+
trace2.set_logical_compaction(input1.frontier().frontier());
454454
// At this point, if we haven't seen any input batches we should establish a frontier anyhow.
455455
if acknowledged2.is_none() {
456456
acknowledged2 = Some(Antichain::from_elem(<G::Timestamp>::minimum()));
457457
}
458458
if let Some(acknowledged2) = &mut acknowledged2 {
459459
trace2.advance_upper(acknowledged2);
460-
trace2.distinguish_since(acknowledged2.borrow());
460+
trace2.set_physical_compaction(acknowledged2.borrow());
461461
}
462462
}
463463

464464
// shut down or advance trace1.
465465
if trace1.is_some() && input2.frontier().is_empty() { trace1 = None; }
466466
if let Some(ref mut trace1) = trace1 {
467-
trace1.advance_by(input2.frontier().frontier());
467+
trace1.set_logical_compaction(input2.frontier().frontier());
468468
// At this point, if we haven't seen any input batches we should establish a frontier anyhow.
469469
if acknowledged1.is_none() {
470470
acknowledged1 = Some(Antichain::from_elem(<G::Timestamp>::minimum()));
471471
}
472472
if let Some(acknowledged1) = &mut acknowledged1 {
473473
trace1.advance_upper(acknowledged1);
474-
trace1.distinguish_since(acknowledged1.borrow());
474+
trace1.set_physical_compaction(acknowledged1.borrow());
475475
}
476476
}
477477
}

0 commit comments

Comments
 (0)