-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add compression option to SpillManager #16268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ee58ac3
to
f50d928
Compare
68d9179
to
7561c13
Compare
This is ready for review :) @2010YOUY01 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, overall it looks good. Left some minor suggestions.
Regarding tests: I suggest to add some end-to-end tests. Perhaps choose several tests inside tests/memory_limit/mod.rs
which run queries with spill, and check the final result is correct with different compression options.
datafusion/common/src/config.rs
Outdated
/// | ||
/// Since datafusion writes spill files using the Arrow IPC Stream format, | ||
/// only codecs supported by the Arrow IPC Stream Writer are allowed. | ||
/// Valid values are: uncompressed, lz4_frame, zstd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to mention it in the doc (as a quick reminder): lz4 is faster to compress, but has lower compression ratio, whereas zstd is slower to compress but achieves higher compression ratio.
We can do some experiment in the future to make this info more specific, like what's the measured compression rate/speed on tpch table in arrow batches.
env: Arc<RuntimeEnv>, | ||
metrics: SpillMetrics, | ||
schema: SchemaRef, | ||
compression: SpillCompression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use a builder pattern to set this new field. The reason is SpillManager
is a public interface that can be used outside the crate, so it's better to keep the API stable.
To do that, use a default compression option in new()
, and add a with_compression_type()
function to change the default compression type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This removed some diffs in existing unit tests since the signature of new()
remains the same.
98349bc
to
d708556
Compare
/// External query should succeed using lz4_frame as spill compression codec and | ||
/// and all temporary spill files are properly cleaned up after execution. | ||
/// Note: This test does not inspect file contents (e.g. magic number), | ||
/// as spill files are automatically deleted on drop. | ||
#[tokio::test] | ||
async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added test here, but unfortunately it is not straightforward to check whether the file is actually compressed with desired codec in e2e test. Maybe we can compare spilled_bytes
after follow up fix.
338d85a
to
0f77d35
Compare
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] | ||
pub enum SpillCompression { | ||
Zstd, | ||
Lz4Frame, | ||
#[default] | ||
Uncompressed, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change the default codec after experiments.
Currently CI fails, but I think that is due to change introduced in another pr. |
close and reopen to trigger CI again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new compression option to the SpillManager to enable spill files to be written with different compression codecs (uncompressed, lz4_frame, zstd). Key changes include:
- Addition of a new configuration option (datafusion.execution.spill_compression) with its documentation.
- Updates to SpillManager, IPCStreamWriter, and related modules to accept and propagate the compression setting.
- Extended tests and configuration updates to verify the new spill compression behavior.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
docs/source/user-guide/configs.md | Documents the new spill_compression configuration option. |
datafusion/sqllogictest/test_files/information_schema.slt | Updates to include the new spill_compression setting in tests. |
datafusion/physical-plan/src/spill/spill_manager.rs | Introduces a with_compression_type method and default compression setting. |
datafusion/physical-plan/src/spill/mod.rs | Updates IPCStreamWriter creation to support compression. |
datafusion/physical-plan/src/spill/in_progress_spill_file.rs | Uses the new compression setting when creating spill files. |
datafusion/physical-plan/src/sorts/sort.rs & datafusion/physical-plan/src/joins/sort_merge_join.rs & datafusion/physical-plan/src/aggregates/row_hash.rs | Adjusts sorter and join code to propagate the spill_compression config. |
datafusion/execution/src/config.rs & datafusion/common/src/config.rs | Adds configuration for spill_compression and defines the SpillCompression enum. |
datafusion/core/tests/memory_limit/mod.rs | Extends tests to include scenarios using different spill compression codecs. |
Cargo.toml | Enables the zstd feature for the arrow-ipc crate. |
Comments suppressed due to low confidence (3)
datafusion/physical-plan/src/spill/spill_manager.rs:62
- Consider adding a brief doc comment for the with_compression_type method to explain its purpose and how it integrates with the spill file creation workflow.
pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
datafusion/core/tests/memory_limit/mod.rs:643
- [nitpick] While the tests confirm spill file sizes and cleanup, consider enhancing them with a check on the spill file's header or magic number to verify that the intended compression codec is applied.
async fn test_spill_file_compressed_with_zstd() -> Result<()> {
datafusion/common/src/config.rs:390
- [nitpick] Clarify in the documentation that the default spill compression is 'uncompressed' and ensure this default is consistently reflected in the configuration and its usage.
/// Sets the compression codec used when spilling data to disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, thank you!
@@ -1324,6 +1326,7 @@ impl Stream for SortMergeJoinStream { | |||
impl SortMergeJoinStream { | |||
#[allow(clippy::too_many_arguments)] | |||
pub fn try_new( | |||
spill_compression: SpillCompression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: For readability, I suggest adding a comment indicating that this argument is passed through from the configuration xxx.
In the future, we might consider enforcing a naming convention for such arguments—e.g., always using a cfg_
prefix like cfg_spill_compression
.
// Configured via `datafusion.execution.spill_compression`. | ||
spill_compression: SpillCompression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the changes. Thank you.
// Configured via `datafusion.execution.spill_compression`. | ||
spill_compression: SpillCompression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to mention the option in DF49 upgrade doc
Thank you for the reminder @xudong963 . I got a question: as I understand it, upgrade guide is like 'here is the API changes that might break your system during upgrades', and this PR is like a new feature you might want to try in the new release -- do we have a separate place to document new features like this? 🤔 |
I was thinking the upgrade doc contains all things that we wanna highlight and let users be aware of |
I see. Maybe we can put new features and API changes into different sub sections. |
Which issue does this PR close?
Further Jobs to to do
I filed a separate issue to investigate the benefit of this compression option. #16367
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
yes, new
datafusion.execution
config option