Skip to content

Address rename errors #310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ where
{
trace: Rc<RefCell<TraceBox<Tr>>>,
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
advance: Antichain<Tr::Time>,
through: Antichain<Tr::Time>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,

operator: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
Expand All @@ -55,21 +55,21 @@ where
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier);
self.advance.clear();
self.advance.extend(frontier.iter().cloned());
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction.clear();
self.logical_compaction.extend(frontier.iter().cloned());
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.advance.borrow()
self.logical_compaction.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
debug_assert!(timely::PartialOrder::less_equal(&self.through.borrow(), &frontier));
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier);
self.through.clear();
self.through.extend(frontier.iter().cloned());
debug_assert!(timely::PartialOrder::less_equal(&self.physical_compaction.borrow(), &frontier));
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction.clear();
self.physical_compaction.extend(frontier.iter().cloned());
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.through.borrow()
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
Expand Down Expand Up @@ -100,8 +100,8 @@ where
let reader = TraceAgent {
trace: trace.clone(),
queues: Rc::downgrade(&queues),
advance: trace.borrow().get_logical_compactions.frontier().to_owned(),
through: trace.borrow().through_frontiers.frontier().to_owned(),
logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
operator,
logging,
};
Expand Down Expand Up @@ -534,14 +534,14 @@ where

// increase counts for wrapped `TraceBox`.
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow());
self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow());
self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());

TraceAgent {
trace: self.trace.clone(),
queues: self.queues.clone(),
advance: self.advance.clone(),
through: self.through.clone(),
logical_compaction: self.logical_compaction.clone(),
physical_compaction: self.physical_compaction.clone(),
operator: self.operator.clone(),
logging: self.logging.clone(),
}
Expand All @@ -563,7 +563,7 @@ where

// decrement borrow counts to remove all holds
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
}
}
75 changes: 34 additions & 41 deletions src/trace/wrappers/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ where
Tr: TraceReader
{
/// accumulated holds on times for advancement.
pub get_logical_compactions: MutableAntichain<Tr::Time>,
pub logical_compaction: MutableAntichain<Tr::Time>,
/// accumulated holds on times for distinction.
pub through_frontiers: MutableAntichain<Tr::Time>,
pub physical_compaction: MutableAntichain<Tr::Time>,
/// The wrapped trace.
pub trace: Tr,
}
Expand All @@ -49,35 +49,28 @@ where
/// process will fish these out and make sure that they are used for the initial read capabilities.
pub fn new(mut trace: Tr) -> Self {

let mut advance = MutableAntichain::new();
advance.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
// for time in trace.get_logical_compaction() {
// advance.update(time, 1);
// }

let mut through = MutableAntichain::new();
through.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
// for time in trace.get_physical_compaction() {
// through.update(time, 1);
// }
let mut logical_compaction = MutableAntichain::new();
logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
let mut physical_compaction = MutableAntichain::new();
physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));

TraceBox {
get_logical_compactions: advance,
through_frontiers: through,
logical_compaction,
physical_compaction,
trace: trace,
}
}
/// Replaces elements of `lower` with those of `upper`.
pub fn adjust_get_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.get_logical_compactions.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.get_logical_compactions.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_logical_compaction(self.get_logical_compactions.frontier());
pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_logical_compaction(self.logical_compaction.frontier());
}
/// Replaces elements of `lower` with those of `upper`.
pub fn adjust_through_frontier(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_physical_compaction(self.through_frontiers.frontier());
pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_physical_compaction(self.physical_compaction.frontier());
}
}

Expand All @@ -91,8 +84,8 @@ where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
get_logical_compaction: Antichain<Tr::Time>,
through_frontier: Antichain<Tr::Time>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,
/// Wrapped trace. Please be gentle when using.
pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}
Expand All @@ -116,16 +109,16 @@ where
/// handle no longer requires access to times other than those in the future of `frontier`, but if
/// there are other handles to the same trace, it may not yet be able to compact.
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), frontier);
self.get_logical_compaction = frontier.to_owned();
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction = frontier.to_owned();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.get_logical_compaction.borrow() }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
/// Allows the trace to compact batches of times before `frontier`.
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), frontier);
self.through_frontier = frontier.to_owned();
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction = frontier.to_owned();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.through_frontier.borrow() }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
/// Creates a new cursor over the wrapped trace.
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
Expand All @@ -147,8 +140,8 @@ where
let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));

let handle = TraceRc {
get_logical_compaction: wrapped.borrow().get_logical_compactions.frontier().to_owned(),
through_frontier: wrapped.borrow().through_frontiers.frontier().to_owned(),
logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
wrapper: wrapped.clone(),
};

Expand All @@ -163,11 +156,11 @@ where
{
fn clone(&self) -> Self {
// increase ref counts for this frontier
self.wrapper.borrow_mut().adjust_get_logical_compaction(Antichain::new().borrow(), self.get_logical_compaction.borrow());
self.wrapper.borrow_mut().adjust_through_frontier(Antichain::new().borrow(), self.through_frontier.borrow());
self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
TraceRc {
get_logical_compaction: self.get_logical_compaction.clone(),
through_frontier: self.through_frontier.clone(),
logical_compaction: self.logical_compaction.clone(),
physical_compaction: self.physical_compaction.clone(),
wrapper: self.wrapper.clone(),
}
}
Expand All @@ -179,9 +172,9 @@ where
Tr: TraceReader,
{
fn drop(&mut self) {
self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), Antichain::new().borrow());
self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), Antichain::new().borrow());
self.get_logical_compaction = Antichain::new();
self.through_frontier = Antichain::new();
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
self.logical_compaction = Antichain::new();
self.physical_compaction = Antichain::new();
}
}