Skip to content

Commit 392a476

Browse files
frankmcsherryutaal
andauthored
Actually log progress updates (#352)
* Actually log progress updates * remove commented code, improve comments * Separate the progress logging stream, use dyn trait instead of String for timestamps (#353) * Separate the progress logging stream, use dyn trait instead of String for timestamps * Remove the serialization machinery from progress logging, provide dynamic type information instead * Add example for progress logging * Always box logging progress vectors on construction * Explain why we need the `ProgressEventTimestampVec` trait Co-authored-by: Andrea Lattuada <[email protected]>
1 parent de69cfc commit 392a476

File tree

7 files changed

+183
-39
lines changed

7 files changed

+183
-39
lines changed

timely/examples/logging-send.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::operators::{Input, Exchange, Probe};
55

66
// use timely::dataflow::operators::capture::EventWriter;
77
// use timely::dataflow::ScopeParent;
8-
use timely::logging::TimelyEvent;
8+
use timely::logging::{TimelyEvent, TimelyProgressEvent};
99

1010
fn main() {
1111
// initializes and runs a timely dataflow.
@@ -21,6 +21,26 @@ fn main() {
2121
data.iter().for_each(|x| println!("LOG1: {:?}", x))
2222
);
2323

24+
// Register timely progress logging.
25+
// Less generally useful: intended for debugging advanced custom operators or timely
26+
// internals.
27+
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
28+
data.iter().for_each(|x| {
29+
println!("PROGRESS: {:?}", x);
30+
let (_, _, ev) = x;
31+
print!("PROGRESS: TYPED MESSAGES: ");
32+
for (n, p, t, d) in ev.messages.iter() {
33+
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
34+
}
35+
println!();
36+
print!("PROGRESS: TYPED INTERNAL: ");
37+
for (n, p, t, d) in ev.internal.iter() {
38+
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
39+
}
40+
println!();
41+
})
42+
);
43+
2444
// create a new input, exchange data, and inspect its output
2545
worker.dataflow(|scope| {
2646
scope

timely/src/dataflow/scopes/child.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::progress::{Source, Target};
1212
use crate::progress::timestamp::Refines;
1313
use crate::order::Product;
1414
use crate::logging::TimelyLogger as Logger;
15+
use crate::logging::TimelyProgressLogger as ProgressLogger;
1516
use crate::worker::{AsWorker, Config};
1617

1718
use super::{ScopeParent, Scope};
@@ -32,6 +33,8 @@ where
3233
pub parent: G,
3334
/// The log writer for this scope.
3435
pub logging: Option<Logger>,
36+
/// The progress log writer for this scope.
37+
pub progress_logging: Option<ProgressLogger>,
3538
}
3639

3740
impl<'a, G, T> Child<'a, G, T>
@@ -115,12 +118,13 @@ where
115118
let index = self.subgraph.borrow_mut().allocate_child_id();
116119
let path = self.subgraph.borrow().path.clone();
117120

118-
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), name));
121+
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name));
119122
let result = {
120123
let mut builder = Child {
121124
subgraph: &subscope,
122125
parent: self.clone(),
123126
logging: self.logging.clone(),
127+
progress_logging: self.progress_logging.clone(),
124128
};
125129
func(&mut builder)
126130
};
@@ -143,7 +147,8 @@ where
143147
Child {
144148
subgraph: self.subgraph,
145149
parent: self.parent.clone(),
146-
logging: self.logging.clone()
150+
logging: self.logging.clone(),
151+
progress_logging: self.progress_logging.clone(),
147152
}
148153
}
149154
}

timely/src/logging.rs

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ pub type WorkerIdentifier = usize;
66
pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
77
/// Logger for timely dataflow system events.
88
pub type TimelyLogger = Logger<TimelyEvent>;
9+
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
10+
pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;
911

1012
use std::time::Duration;
1113
use crate::dataflow::operators::capture::{Event, EventPusher};
@@ -70,9 +72,63 @@ pub struct ChannelsEvent {
7072
pub target: (usize, usize),
7173
}
7274

73-
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
75+
/// Encapsulates Any and Debug for dynamically typed timestamps in logs
76+
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
77+
/// Upcasts this `ProgressEventTimestamp` to `Any`.
78+
///
79+
/// NOTE: This is required until https://github.com/rust-lang/rfcs/issues/2765 is fixed
80+
///
81+
/// # Example
82+
/// ```rust
83+
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
84+
/// let ts: &timely::logging::ProgressEventTimestampVec = &ts;
85+
/// for (n, p, t, d) in ts.iter() {
86+
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
87+
/// }
88+
/// println!();
89+
/// ```
90+
fn as_any(&self) -> &dyn std::any::Any;
91+
92+
/// Returns the name of the concrete type of this object.
93+
///
94+
/// # Note
95+
///
96+
/// This is intended for diagnostic use. The exact contents and format of the
97+
/// string returned are not specified, other than being a best-effort
98+
/// description of the type. For example, amongst the strings
99+
/// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
100+
/// `"std::option::Option<std::string::String>"`.
101+
fn type_name(&self) -> &'static str;
102+
}
103+
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
104+
fn as_any(&self) -> &dyn std::any::Any { self }
105+
106+
fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
107+
}
108+
109+
/// A vector of progress updates in logs
110+
///
111+
/// This exists to support upcasting of the concrecte progress update vectors to
112+
/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
113+
/// use a single allocation for the entire vector (as opposed to a `Box` allocation
114+
/// for each dynamically typed element).
115+
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
116+
/// Iterate over the contents of the vector
117+
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
118+
}
119+
120+
impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
121+
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
122+
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
123+
let t: &dyn ProgressEventTimestamp = t;
124+
(n, p, t, d)
125+
}))
126+
}
127+
}
128+
129+
#[derive(Debug)]
74130
/// Send or receive of progress information.
75-
pub struct ProgressEvent {
131+
pub struct TimelyProgressEvent {
76132
/// `true` if the event is a send, and `false` if it is a receive.
77133
pub is_send: bool,
78134
/// Source worker index.
@@ -84,9 +140,9 @@ pub struct ProgressEvent {
84140
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
85141
pub addr: Vec<usize>,
86142
/// List of message updates, containing Target descriptor, timestamp as string, and delta.
87-
pub messages: Vec<(usize, usize, String, i64)>,
143+
pub messages: Box<dyn ProgressEventTimestampVec>,
88144
/// List of capability updates, containing Source descriptor, timestamp as string, and delta.
89-
pub internal: Vec<(usize, usize, String, i64)>,
145+
pub internal: Box<dyn ProgressEventTimestampVec>,
90146
}
91147

92148
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
@@ -225,8 +281,6 @@ pub enum TimelyEvent {
225281
Operates(OperatesEvent),
226282
/// Channel creation.
227283
Channels(ChannelsEvent),
228-
/// Progress message send or receive.
229-
Progress(ProgressEvent),
230284
/// Progress propagation (reasoning).
231285
PushProgress(PushProgressEvent),
232286
/// Message send or receive.
@@ -259,10 +313,6 @@ impl From<ChannelsEvent> for TimelyEvent {
259313
fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
260314
}
261315

262-
impl From<ProgressEvent> for TimelyEvent {
263-
fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) }
264-
}
265-
266316
impl From<PushProgressEvent> for TimelyEvent {
267317
fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
268318
}

timely/src/progress/broadcast.rs

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
//! Broadcasts progress information among workers.
22
33
use crate::progress::{ChangeBatch, Timestamp};
4-
use crate::progress::Location;
4+
use crate::progress::{Location, Port};
55
use crate::communication::{Message, Push, Pull};
66
use crate::logging::TimelyLogger as Logger;
7+
use crate::logging::TimelyProgressLogger as ProgressLogger;
78

89
/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
910
pub type ProgressVec<T> = Vec<((Location, T), i64)>;
@@ -25,12 +26,12 @@ pub struct Progcaster<T:Timestamp> {
2526
/// Communication channel identifier
2627
channel_identifier: usize,
2728

28-
logging: Option<Logger>,
29+
progress_logging: Option<ProgressLogger>,
2930
}
3031

3132
impl<T:Timestamp+Send> Progcaster<T> {
3233
/// Creates a new `Progcaster` using a channel from the supplied worker.
33-
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>) -> Progcaster<T> {
34+
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
3435

3536
let channel_identifier = worker.new_identifier();
3637
let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
@@ -48,7 +49,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
4849
counter: 0,
4950
addr,
5051
channel_identifier,
51-
logging,
52+
progress_logging,
5253
}
5354
}
5455

@@ -58,16 +59,35 @@ impl<T:Timestamp+Send> Progcaster<T> {
5859
changes.compact();
5960
if !changes.is_empty() {
6061

61-
self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent {
62-
is_send: true,
63-
source: self.source,
64-
channel: self.channel_identifier,
65-
seq_no: self.counter,
66-
addr: self.addr.clone(),
67-
// TODO: fill with additional data
68-
messages: Vec::new(),
69-
internal: Vec::new(),
70-
}));
62+
self.progress_logging.as_ref().map(|l| {
63+
64+
// Pre-allocate enough space; we transfer ownership, so there is not
65+
// an apportunity to re-use allocations (w/o changing the logging
66+
// interface to accept references).
67+
let mut messages = Box::new(Vec::with_capacity(changes.len()));
68+
let mut internal = Box::new(Vec::with_capacity(changes.len()));
69+
70+
for ((location, time), diff) in changes.iter() {
71+
match location.port {
72+
Port::Target(port) => {
73+
messages.push((location.node, port, time.clone(), *diff))
74+
},
75+
Port::Source(port) => {
76+
internal.push((location.node, port, time.clone(), *diff))
77+
}
78+
}
79+
}
80+
81+
l.log(crate::logging::TimelyProgressEvent {
82+
is_send: true,
83+
source: self.source,
84+
channel: self.channel_identifier,
85+
seq_no: self.counter,
86+
addr: self.addr.clone(),
87+
messages,
88+
internal,
89+
});
90+
});
7191

7292
for pusher in self.pushers.iter_mut() {
7393

@@ -108,16 +128,36 @@ impl<T:Timestamp+Send> Progcaster<T> {
108128

109129
let addr = &mut self.addr;
110130
let channel = self.channel_identifier;
111-
self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent {
112-
is_send: false,
113-
source: source,
114-
seq_no: counter,
115-
channel,
116-
addr: addr.clone(),
117-
// TODO: fill with additional data
118-
messages: Vec::new(),
119-
internal: Vec::new(),
120-
}));
131+
132+
// See comments above about the relatively high cost of this logging, and our
133+
// options for improving it if performance limits users who want other logging.
134+
self.progress_logging.as_ref().map(|l| {
135+
136+
let mut messages = Box::new(Vec::with_capacity(changes.len()));
137+
let mut internal = Box::new(Vec::with_capacity(changes.len()));
138+
139+
for ((location, time), diff) in recv_changes.iter() {
140+
141+
match location.port {
142+
Port::Target(port) => {
143+
messages.push((location.node, port, time.clone(), *diff))
144+
},
145+
Port::Source(port) => {
146+
internal.push((location.node, port, time.clone(), *diff))
147+
}
148+
}
149+
}
150+
151+
l.log(crate::logging::TimelyProgressEvent {
152+
is_send: false,
153+
source: source,
154+
seq_no: counter,
155+
channel,
156+
addr: addr.clone(),
157+
messages: messages,
158+
internal: internal,
159+
});
160+
});
121161

122162
// We clone rather than drain to avoid deserialization.
123163
for &(ref update, delta) in recv_changes.iter() {

timely/src/progress/change_batch.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,27 @@ impl<T:Ord> ChangeBatch<T> {
201201
}
202202
}
203203

204+
/// Number of compacted updates.
205+
///
206+
/// This method requires mutable access to `self` because it may need to compact the
207+
/// representation to determine the number of actual updates.
208+
///
209+
/// # Examples
210+
///
211+
///```
212+
/// use timely::progress::ChangeBatch;
213+
///
214+
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
215+
/// batch.update(17, -1);
216+
/// batch.update(14, -1);
217+
/// assert_eq!(batch.len(), 1);
218+
///```
219+
#[inline]
220+
pub fn len(&mut self) -> usize {
221+
self.compact();
222+
self.updates.len()
223+
}
224+
204225
/// Drains `self` into `other`.
205226
///
206227
/// This method has similar a effect to calling `other.extend(self.drain())`, but has the

timely/src/progress/subgraph.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::collections::BinaryHeap;
1111
use std::cmp::Reverse;
1212

1313
use crate::logging::TimelyLogger as Logger;
14+
use crate::logging::TimelyProgressLogger as ProgressLogger;
1415

1516
use crate::scheduling::Schedule;
1617
use crate::scheduling::activate::Activations;
@@ -63,6 +64,9 @@ where
6364

6465
/// Logging handle
6566
logging: Option<Logger>,
67+
68+
/// Progress logging handle
69+
progress_logging: Option<ProgressLogger>,
6670
}
6771

6872
impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
@@ -95,6 +99,7 @@ where
9599
index: usize,
96100
mut path: Vec<usize>,
97101
logging: Option<Logger>,
102+
progress_logging: Option<ProgressLogger>,
98103
name: &str,
99104
)
100105
-> SubgraphBuilder<TOuter, TInner>
@@ -114,6 +119,7 @@ where
114119
input_messages: Vec::new(),
115120
output_capabilities: Vec::new(),
116121
logging,
122+
progress_logging,
117123
}
118124
}
119125

@@ -169,7 +175,7 @@ where
169175

170176
let (tracker, scope_summary) = builder.build();
171177

172-
let progcaster = Progcaster::new(worker, &self.path, self.logging.clone());
178+
let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
173179

174180
let mut incomplete = vec![true; self.children.len()];
175181
incomplete[0] = false;

0 commit comments

Comments
 (0)