Closed
Description
timely::execute_directly(move |worker| {
let mut probe = ProbeHandle::new();
let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
let (handle, input) = scope.new_collection();
let arrange = input.arrange_by_self();
arrange.stream.probe_with(&mut probe);
(handle, arrange.trace)
});
// ingest some batches
for _ in 0..10 {
input.insert(10);
input.advance_to(input.time() + 1);
input.flush();
worker.step_while(|| probe.less_than(input.time()));
}
// advance the trace
trace.set_logical_compaction(AntichainRef::new(&[2]));
trace.set_physical_compaction(AntichainRef::new(&[2]));
worker.dataflow::<u32, _, _>(|scope| {
let arrange = trace.import(scope);
arrange
.count_total() // <-- panic
.inspect(|d| println!("{:?}", d));
});
})
The above code panic:
thread 'main' panicked at /home/tom/hdx/apps/cargo/git/checkouts/differential-dataflow-d065d23d797aa027/b5046c8/src/trace/implementations/spine_fueled.rs:150:9:
assertion failed: PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)
note: run withRUST_BACKTRACE=1
environment variable to display a backtrace
It seems count_total
only works for traces with default compaction frontier. Is it intentional?
Full code is here
Metadata
Metadata
Assignees
Labels
No labels