Skip to content

Commit ef54810

Browse files
committed
Remove WithProgress::is_empty
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 3f78d6b commit ef54810

File tree

3 files changed

+14
-10
lines changed

3 files changed

+14
-10
lines changed

container/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::collections::VecDeque;
66

77
/// A type representing progress, with an update count.
88
///
9-
/// It describes its update count (`count()`).
9+
/// It describes its update count (`count()`) and whether it is empty (`is_empty()`).
1010
///
1111
/// We require [`Default`] for convenience purposes.
1212
pub trait WithProgress: Default {
@@ -84,7 +84,7 @@ pub trait PushInto<T> {
8484
/// decide to represent a push order for `extract` and `finish`, or not.
8585
pub trait ContainerBuilder: Default + 'static {
8686
/// The container type we're building.
87-
type Container: WithProgress + Clone + 'static;
87+
type Container: WithProgress + Default + Clone + 'static;
8888
/// Extract assembled containers, potentially leaving unfinished data behind. Can
8989
/// be called repeatedly, for example while the caller can send data.
9090
///

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! with the performance of batched sends.
33
44
use crate::communication::Push;
5-
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto, SizableContainer};
5+
use crate::container::{ContainerBuilder, CapacityContainerBuilder, WithProgress, PushInto, SizableContainer};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
@@ -109,9 +109,11 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
109109
// buffer always requires a container builder. We could expose the buffer's underlying pusher
110110
// directly, but this would bypass the buffer's time tracking.
111111
fn give_container(&mut self, container: &mut CB::Container) {
112-
self.flush();
113-
let time = self.time.as_ref().unwrap().clone();
114-
Message::push_at(container, time, &mut self.pusher);
112+
if container.count() > 0 {
113+
self.flush();
114+
let time = self.time.as_ref().unwrap().clone();
115+
Message::push_at(container, time, &mut self.pusher);
116+
}
115117
}
116118

117119
/// An internal implementation of push that should only be called by sessions.

timely/src/dataflow/operators/core/input.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
6+
use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, WithProgress, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

@@ -439,9 +439,11 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
439439
/// });
440440
/// ```
441441
pub fn send_batch(&mut self, buffer: &mut CB::Container) {
442-
// flush buffered elements to ensure local fifo.
443-
self.flush();
444-
Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
442+
if !buffer.count() > 0 {
443+
// flush buffered elements to ensure local fifo.
444+
self.flush();
445+
Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
446+
}
445447
}
446448

447449
/// Advances the current epoch to `next`.

0 commit comments

Comments
 (0)