Skip to content

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Jul 21, 2025

Rework the container abstractions in timely-container.

A WithProgress trait describes an update count for objects, most likely containers, but otherwise not opinionated. It's what core Timely requires for sending over a wire. Two container abstractions describe how to interact with data. The SizableContainer has a capacity and can recycle old allocations, and is mostly used in conjunction with the CapacityContainerBuilder. What was the old Container becomes IterableContainer.

Most parts of Timely require WithProgress, but otherwise aren't interested in the contents of the data. Operators that want to write individual elements use the SizableContainer/CapacityContainerBuilder combination. Operators that want to read containers, and exchange use IterableContainer.

@antiguru antiguru requested a review from Copilot July 22, 2025 07:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR splits the Container trait into two separate traits: ProgressContainer and Container. The ProgressContainer trait only exposes a len() method which is sufficient for progress tracking in most parts of Timely, while the Container trait extends ProgressContainer with additional methods for iteration and clearing contents. This separation allows for a cleaner abstraction where progress tracking operations don't require full container functionality.

Key changes:

  • Introduction of the ProgressContainer trait with only len() and is_empty() methods
  • Container trait now extends ProgressContainer and includes the iteration and clearing methods
  • Addition of PassthroughContainerBuilder<C> for operators that pass through data without modification

Reviewed Changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
container/src/lib.rs Core trait definitions - splits Container into ProgressContainer and Container traits, adds PassthroughContainerBuilder
timely/src/dataflow/stream.rs Updates Stream types to use ProgressContainer trait bounds
timely/src/dataflow/operators/generic/ Updates generic operator implementations to use ProgressContainer where appropriate
timely/src/dataflow/operators/core/ Updates core operators to use ProgressContainer and PassthroughContainerBuilder
timely/src/dataflow/channels/ Updates channel infrastructure to use ProgressContainer for progress tracking
timely/examples/columnar.rs Updates custom container implementation to implement both traits
Comments suppressed due to low confidence (2)

timely/src/dataflow/operators/core/input.rs:394

  • The removal of container.clear() is problematic. This line was responsible for clearing the container after flushing data, which is necessary to prevent data accumulation and potential memory leaks. The container should be cleared after sending its contents.
        }

timely/src/dataflow/channels/mod.rs:54

  • The removal of buffer.clear() in the push_at method is concerning. The comment above this line was updated to say 'The buffer's contents are left in an undefined state', but removing the clear operation could lead to data accumulation and memory issues. The buffer should still be cleared after the message is pushed.
            *buffer = message.data;

Splits the Container trait into a progress container and a container. The
progress container only exposes a length, which is enough for most parts of
Timely. The container trait extends the progress container and carries the
usual functions to iterate and clear the contents.

I'm open to renaming and moving types, and primarily wanted to check
whether it's possible to split the trait.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the progress_container branch from 82bc666 to 0d03378 Compare July 22, 2025 07:19
antiguru added 3 commits July 22, 2025 11:41
Signed-off-by: Moritz Hoffmann <[email protected]>
Removes clear from Container

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru changed the title Progress container with length Rework container traits Jul 24, 2025
Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through and I think I've got some comments recorded to start with, before continuing with what seems like a lot of search-replace rewrites. Thoughts at the moment are that

  1. This is directionally a good idea, but:
  2. We should take this opportunity to boil away some complexity. Do we still need traits like "length preserving" if we have a concept of a container that must report its count accurately and everyone needs to implement that?

I have some biases around naming that we can dance around. I'm currently leaning towards Container for WithProgress: everything we ship around timely should have this information, and just having that be what "container" means works great for me. The bonus features around "you can enumerate some unrelated number of records from this container" are helpful, but not as core and I'm up for letting their names reflect this.

}

/// We require [`Default`] for convenience purposes.
pub trait WithProgress: Default {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss the name! I don't have anything better, but this name is surprising to me (there is no progress that it is with, and it's unclear what it means).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I kinda like Container as the name here, in that from timely's point of view containers hold numbers of things that should be tracked, and it has no other opinion. The other trait being ItemContainer or IterContainer or whathaveyou makes lots of sense, but partly because it isn't as "core timely" function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to Container!

/// Determine if the container contains any elements, corresponding to `len() == 0`.
/// Determine if the container contains any elements, corresponding to `count() == 0`.
#[inline(always)]
fn is_empty(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful here because throughout Rust-land is_empty() matches len() rather than some other count() function. I'd be interested to figure out where we use this, i.e. where we ask "hey, is that container utterly devoid of anything worth tracking?".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the places where we use this, and it is mostly to guard against sending empty containers. Is it correct to send empty containers, i.e., should the count for something within progress tracking be non-zero? It doesn't make much sense to me to send containers with length 0, so it's probably good to guard against it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Removed is_empty and replaced all calls with .count() > 0).

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// A type representing progress, with an update count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing I want to discuss, but am not sure I'll find a better place to comment:

My guess is we'd eventually like to close in on a representation that presents its progress tracking information as a Map<T, usize>, allowing multiple counts for multiple times. This might be in that direction, I think it is, but also I'm trying to think through how this ends up looking and whether there are helpful steps to take as we go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We currently have the separation into Message and the data it's transferring, and the Container trait in the past was just the data, mostly for historic reasons. It might make sense to make messages the interface to sending data, which would give more control about the associated timestamps to the users.

/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);
/// A container that can reveal its contents through iterating by reference and draining.
pub trait IterableContainer: WithProgress {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I foresee this being deleted / unified with columnar's container traits. No rush, but curious if that checks out too. This is now I think not timely specific as much as "can I read data out of here?". At least, if not columnar then some random container traits somewhere, unrelated to core timely, in timely only as long as there aren't better ones somewhere else.

pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
type Container: WithProgress + Default + Clone + 'static;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithProgress : Default, so is the + Default redundant, or perhaps helpful because that WithProgress constraint isn't locked down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, was redundant.

/// number of times that `push_into` is called on the container builder.
/// If you have any questions about this trait you are best off not implementing it.
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
pub trait CountPreservingContainerBuilder: ContainerBuilder { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names are a bit weird here, because "count" is a method on WithProgress and not obviously a top-level concept. I wonder if it makes sense to require all container builders to be count preserving? As in, you now need to implement both count() and len(), and there's no room for containers whose count() method is incorrect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the idea of the trait to indicate that it's compatible with partition, i.e., after partitioning, the sum length of the parts equals the length of the input? From this perspective, it seems worth keeping around.

I restored the old name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the goal of the trait historically was to say, in the absence of a count function, "the len function correctly represents the intent of count", but .. I'm not sure we need that distinction any more? At least, I can't tell what would implement ContainerBuilder and not CountPreservingContainerBuilder and we wouldn't immediately call buggy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might still be useful for a container builder that compacts in-place, like we have in Differential. The consolidating container builder certainly can only be used before we fix counts for progress tracking. It can't be used for an exchange because data might consolidate differently.

I see two ways out: Either we have a container that stores the count independently of the contents, and sets the count to the number of push_into calls outside progress tracking, or rework the exchange logic to make it clear that container builders inside have less flexibility than the ones before we take the count.

For this reason, I think it makes sense to leave this trait in Timely for the time being.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not following, but maybe I need a few to think and understand. Can you point at a thing that implements ContainerBuilder and not CountPreservingContainerBuilder? I'm pretty sure I will then conclude we should fix / delete it. A differential builder than has a count() method and just uses e.g. Vec::len for this even with consolidation .. is .. just wrong going forward. It is incorrect, and we should disallow it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a trait for both Container and ImplementsContainerCorrectly is what seems weird.


if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line prompted me to think I don't actually understand what is going on. It seems like a pretty substantial change to some contracts, and it's worth going through the reason for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backed out all the clear changes, and added a clear function to Container. I'm not sure we want to have it in the future, but seems easier to decouple the discussions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Elsewhere (e.g. columnar) you just have a Clear trait and require it when you want to be able to re-use a container.

}

impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
impl<T, C: SizableContainer + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a new constraint, e.g. as opposed to IterableContainer. Is there a reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a weaker requirement than what we had in the past, only that we couldn't express the weaker requirement back then. We need C to be SizableContainer so we can satisfy CapacityContainerBuilder's constraints.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused then. SizableContainer is weaker than Container? Surely SizableContainer : Container?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out we don't need this change. Removed.

antiguru added 2 commits July 24, 2025 16:46
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the progress_container branch from 4eb17b9 to 8517bf8 Compare July 24, 2025 15:24
Comment on lines -27 to -29
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push wasn't used, removed.

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// A type representing progress, with an update count.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We currently have the separation into Message and the data it's transferring, and the Container trait in the past was just the data, mostly for historic reasons. It might make sense to make messages the interface to sending data, which would give more control about the associated timestamps to the users.

}

impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
impl<T, C: SizableContainer + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a weaker requirement than what we had in the past, only that we couldn't express the weaker requirement back then. We need C to be SizableContainer so we can satisfy CapacityContainerBuilder's constraints.

}

/// We require [`Default`] for convenience purposes.
pub trait WithProgress: Default {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to Container!

/// Determine if the container contains any elements, corresponding to `len() == 0`.
/// Determine if the container contains any elements, corresponding to `count() == 0`.
#[inline(always)]
fn is_empty(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Removed is_empty and replaced all calls with .count() > 0).

pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
type Container: WithProgress + Default + Clone + 'static;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, was redundant.

/// number of times that `push_into` is called on the container builder.
/// If you have any questions about this trait you are best off not implementing it.
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
pub trait CountPreservingContainerBuilder: ContainerBuilder { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the idea of the trait to indicate that it's compatible with partition, i.e., after partitioning, the sum length of the parts equals the length of the input? From this perspective, it seems worth keeping around.

I restored the old name.


if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backed out all the clear changes, and added a clear function to Container. I'm not sure we want to have it in the future, but seems easier to decouple the discussions.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the progress_container branch from 8517bf8 to 236c46b Compare July 24, 2025 15:46
@antiguru
Copy link
Member Author

antiguru commented Aug 22, 2025

Superseded by #697

@antiguru antiguru closed this Aug 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants