diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index c3d02a780..fee122014 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -21,7 +21,6 @@ use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_ha use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; use crate::progress::operate::PortConnectivity; -use crate::logging::TimelyLogger as Logger; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; @@ -35,14 +34,12 @@ pub struct OperatorBuilder { /// For each input, a shared list of summaries to each output. summaries: Vec::Summary>>>>, produced: Vec>>>, - logging: Option, } impl OperatorBuilder { /// Allocates a new generic operator builder from its containing scope. pub fn new(name: String, scope: G) -> Self { - let logging = scope.logging(); OperatorBuilder { builder: OperatorBuilderRaw::new(name, scope), frontier: Vec::new(), @@ -50,7 +47,6 @@ impl OperatorBuilder { internal: Rc::new(RefCell::new(Vec::new())), summaries: Vec::new(), produced: Vec::new(), - logging, } } @@ -90,7 +86,7 @@ impl OperatorBuilder { let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect())); self.summaries.push(Rc::clone(&shared_summary)); - new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone()) + new_input_handle(input, Rc::clone(&self.internal), shared_summary) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 16d8a149c..dc9db01ce 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -17,7 +17,6 @@ use crate::dataflow::channels::Message; use crate::communication::{Push, Pull}; use crate::{Container, Data}; use crate::container::{ContainerBuilder, CapacityContainerBuilder}; -use crate::logging::TimelyLogger as Logger; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; @@ -31,7 +30,6 @@ pub struct InputHandleCore>> { /// Each timestamp received through this input may only produce output timestamps /// greater or equal to the input timestamp subjected to at least one of these summaries. summaries: Rc>>, - logging: Option, } /// Handle to an operator's input stream, specialized to vectors. @@ -82,13 +80,9 @@ impl>> InputHandleCore, &mut C)>(&mut self, mut logic: F) { - let mut logging = self.logging.take(); while let Some((cap, data)) = self.next() { - logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); logic(cap, data); - logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false })); } - self.logging = logging; } } @@ -150,13 +144,11 @@ pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>, - logging: Option ) -> InputHandleCore { InputHandleCore { pull_counter, internal, summaries, - logging, } } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index c1d886d31..5eb6bdfe4 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -1,7 +1,6 @@ use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; -use crate::logging::TimelyLogger as Logger; /// Tracks requests for notification and delivers available notifications. /// @@ -18,7 +17,6 @@ use crate::logging::TimelyLogger as Logger; pub struct Notificator<'a, T: Timestamp> { frontiers: &'a [&'a MutableAntichain], inner: &'a mut FrontierNotificator, - logging: &'a Option, } impl<'a, T: Timestamp> Notificator<'a, T> { @@ -28,14 +26,13 @@ impl<'a, T: Timestamp> Notificator<'a, T> { pub fn new( frontiers: &'a [&'a MutableAntichain], inner: &'a mut FrontierNotificator, - logging: &'a Option) -> Self { + ) -> Self { inner.make_available(frontiers); Notificator { frontiers, inner, - logging, } } @@ -82,9 +79,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { #[inline] pub fn for_each, u64, &mut Notificator)>(&mut self, mut logic: F) { while let Some((cap, count)) = self.next() { - self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true })); logic(cap, count, self); - self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false })); } } } @@ -117,8 +112,6 @@ fn notificator_delivers_notifications_in_topo_order() { let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new()))); - let logging = None;//::logging::new_inactive_logger(); - // notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]); let times = [ Product::new(3, 5), @@ -136,14 +129,14 @@ fn notificator_delivers_notifications_in_topo_order() { let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t))); // the frontier is initially (0,0), and so we should deliver no notifications. - assert!(frontier_notificator.monotonic(&[&frontier], &logging).next().is_none()); + assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none()); // advance the frontier to [(5,7), (6,0)], opening up some notifications. frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]); { let frontiers = [&frontier]; - let mut notificator = frontier_notificator.monotonic(&frontiers, &logging); + let mut notificator = frontier_notificator.monotonic(&frontiers); // we should deliver the following available notifications, in this order. assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1)); @@ -159,7 +152,7 @@ fn notificator_delivers_notifications_in_topo_order() { { let frontiers = [&frontier]; - let mut notificator = frontier_notificator.monotonic(&frontiers, &logging); + let mut notificator = frontier_notificator.monotonic(&frontiers); // the first available notification should be (5,8). Note: before (6,0) in the total order, but not // in the partial order. We don't make the promise that we respect the total order. @@ -380,8 +373,8 @@ impl FrontierNotificator { /// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`, /// in the event that `Notificator` provides too restrictive an interface. #[inline] - pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain], logging: &'a Option) -> Notificator<'a, T> { - Notificator::new(frontiers, self, logging) + pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain]) -> Notificator<'a, T> { + Notificator::new(frontiers, self) } /// Iterates over pending capabilities and their count. The count represents how often a diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 1c51f7ecd..5f76bc59c 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -350,10 +350,9 @@ impl Operator for StreamCore { notificator.notify_at(capability.delayed(&time)); } - let logging = self.scope().logging(); move |input, output| { let frontier = &[input.frontier()]; - let notificator = &mut Notificator::new(frontier, &mut notificator, &logging); + let notificator = &mut Notificator::new(frontier, &mut notificator); logic(input.handle, output, notificator); } }) @@ -436,10 +435,9 @@ impl Operator for StreamCore { notificator.notify_at(capability.delayed(&time)); } - let logging = self.scope().logging(); move |input1, input2, output| { let frontiers = &[input1.frontier(), input2.frontier()]; - let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging); + let notificator = &mut Notificator::new(frontiers, &mut notificator); logic(input1.handle, input2.handle, output, notificator); } }) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 80777dc98..2c2439319 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -181,20 +181,6 @@ pub struct ApplicationEvent { pub is_start: bool, } -#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] -/// Application-defined code start or stop -pub struct GuardedMessageEvent { - /// `true` when activity begins, `false` when it stops - pub is_start: bool, -} - -#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] -/// Application-defined code start or stop -pub struct GuardedProgressEvent { - /// `true` when activity begins, `false` when it stops - pub is_start: bool, -} - #[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)] /// Identifier of the worker that generated a log line pub struct TimelySetup { @@ -260,10 +246,6 @@ pub enum TimelyEvent { Shutdown(ShutdownEvent), /// No clue. Application(ApplicationEvent), - /// Per-message computation. - GuardedMessage(GuardedMessageEvent), - /// Per-notification computation. - GuardedProgress(GuardedProgressEvent), /// Communication channel event. CommChannels(CommChannelsEvent), /// Input event. @@ -302,14 +284,6 @@ impl From for TimelyEvent { fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) } } -impl From for TimelyEvent { - fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) } -} - -impl From for TimelyEvent { - fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) } -} - impl From for TimelyEvent { fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) } }