Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_ha
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::progress::operate::PortConnectivity;
use crate::logging::TimelyLogger as Logger;

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

Expand All @@ -35,22 +34,19 @@ pub struct OperatorBuilder<G: Scope> {
/// For each input, a shared list of summaries to each output.
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
logging: Option<Logger>,
}

impl<G: Scope> OperatorBuilder<G> {

/// Allocates a new generic operator builder from its containing scope.
pub fn new(name: String, scope: G) -> Self {
let logging = scope.logging();
OperatorBuilder {
builder: OperatorBuilderRaw::new(name, scope),
frontier: Vec::new(),
consumed: Vec::new(),
internal: Rc::new(RefCell::new(Vec::new())),
summaries: Vec::new(),
produced: Vec::new(),
logging,
}
}

Expand Down Expand Up @@ -90,7 +86,7 @@ impl<G: Scope> OperatorBuilder<G> {
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
self.summaries.push(Rc::clone(&shared_summary));

new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
new_input_handle(input, Rc::clone(&self.internal), shared_summary)
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
Expand Down
8 changes: 0 additions & 8 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::dataflow::channels::Message;
use crate::communication::{Push, Pull};
use crate::{Container, Data};
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
use crate::logging::TimelyLogger as Logger;

use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;
Expand All @@ -31,7 +30,6 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
/// Each timestamp received through this input may only produce output timestamps
/// greater or equal to the input timestamp subjected to at least one of these summaries.
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>,
}

/// Handle to an operator's input stream, specialized to vectors.
Expand Down Expand Up @@ -82,13 +80,9 @@ impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P
/// ```
#[inline]
pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
let mut logging = self.logging.take();
while let Some((cap, data)) = self.next() {
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
logic(cap, data);
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
}
self.logging = logging;
}

}
Expand Down Expand Up @@ -150,13 +144,11 @@ pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>
) -> InputHandleCore<T, C, P> {
InputHandleCore {
pull_counter,
internal,
summaries,
logging,
}
}

Expand Down
19 changes: 6 additions & 13 deletions timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::progress::Timestamp;
use crate::dataflow::operators::Capability;
use crate::logging::TimelyLogger as Logger;

/// Tracks requests for notification and delivers available notifications.
///
Expand All @@ -18,7 +17,6 @@ use crate::logging::TimelyLogger as Logger;
pub struct Notificator<'a, T: Timestamp> {
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
logging: &'a Option<Logger>,
}

impl<'a, T: Timestamp> Notificator<'a, T> {
Expand All @@ -28,14 +26,13 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
pub fn new(
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
logging: &'a Option<Logger>) -> Self {
) -> Self {

inner.make_available(frontiers);

Notificator {
frontiers,
inner,
logging,
}
}

Expand Down Expand Up @@ -82,9 +79,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
#[inline]
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
while let Some((cap, count)) = self.next() {
self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true }));
logic(cap, count, self);
self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false }));
}
}
}
Expand Down Expand Up @@ -117,8 +112,6 @@ fn notificator_delivers_notifications_in_topo_order() {

let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));

let logging = None;//::logging::new_inactive_logger();

// notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
let times = [
Product::new(3, 5),
Expand All @@ -136,14 +129,14 @@ fn notificator_delivers_notifications_in_topo_order() {
let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));

// the frontier is initially (0,0), and so we should deliver no notifications.
assert!(frontier_notificator.monotonic(&[&frontier], &logging).next().is_none());
assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none());

// advance the frontier to [(5,7), (6,0)], opening up some notifications.
frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);

{
let frontiers = [&frontier];
let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
let mut notificator = frontier_notificator.monotonic(&frontiers);

// we should deliver the following available notifications, in this order.
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
Expand All @@ -159,7 +152,7 @@ fn notificator_delivers_notifications_in_topo_order() {

{
let frontiers = [&frontier];
let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
let mut notificator = frontier_notificator.monotonic(&frontiers);

// the first available notification should be (5,8). Note: before (6,0) in the total order, but not
// in the partial order. We don't make the promise that we respect the total order.
Expand Down Expand Up @@ -380,8 +373,8 @@ impl<T: Timestamp> FrontierNotificator<T> {
/// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
/// in the event that `Notificator` provides too restrictive an interface.
#[inline]
pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger>) -> Notificator<'a, T> {
Notificator::new(frontiers, self, logging)
pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Notificator<'a, T> {
Notificator::new(frontiers, self)
}

/// Iterates over pending capabilities and their count. The count represents how often a
Expand Down
6 changes: 2 additions & 4 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,9 @@ impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
notificator.notify_at(capability.delayed(&time));
}

let logging = self.scope().logging();
move |input, output| {
let frontier = &[input.frontier()];
let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
let notificator = &mut Notificator::new(frontier, &mut notificator);
logic(input.handle, output, notificator);
}
})
Expand Down Expand Up @@ -436,10 +435,9 @@ impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
notificator.notify_at(capability.delayed(&time));
}

let logging = self.scope().logging();
move |input1, input2, output| {
let frontiers = &[input1.frontier(), input2.frontier()];
let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
let notificator = &mut Notificator::new(frontiers, &mut notificator);
logic(input1.handle, input2.handle, output, notificator);
}
})
Expand Down
26 changes: 0 additions & 26 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,6 @@ pub struct ApplicationEvent {
pub is_start: bool,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Application-defined code start or stop
pub struct GuardedMessageEvent {
/// `true` when activity begins, `false` when it stops
pub is_start: bool,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Application-defined code start or stop
pub struct GuardedProgressEvent {
/// `true` when activity begins, `false` when it stops
pub is_start: bool,
}

#[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)]
/// Identifier of the worker that generated a log line
pub struct TimelySetup {
Expand Down Expand Up @@ -260,10 +246,6 @@ pub enum TimelyEvent {
Shutdown(ShutdownEvent),
/// No clue.
Application(ApplicationEvent),
/// Per-message computation.
GuardedMessage(GuardedMessageEvent),
/// Per-notification computation.
GuardedProgress(GuardedProgressEvent),
/// Communication channel event.
CommChannels(CommChannelsEvent),
/// Input event.
Expand Down Expand Up @@ -302,14 +284,6 @@ impl From<ApplicationEvent> for TimelyEvent {
fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
}

impl From<GuardedMessageEvent> for TimelyEvent {
fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
}

impl From<GuardedProgressEvent> for TimelyEvent {
fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
}

impl From<CommChannelsEvent> for TimelyEvent {
fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
}
Expand Down
Loading