[Data] Fixed ParquetDatasource encoding ratio estimation#56268
[Data] Fixed ParquetDatasource encoding ratio estimation#56268alexeykudinkin merged 41 commits intomasterfrom
ParquetDatasource encoding ratio estimation#56268Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the encoding ratio estimation in ParquetDatasource to be based on actual file sizes rather than Parquet metadata, which improves accuracy. This is achieved by introducing a _ParquetFragment wrapper to carry file size information. The change also includes a significant cleanup by removing the ParquetMetadataProvider and its associated abstractions, leading to simpler and more direct code. My review identifies a couple of critical issues where the new sampling logic doesn't correctly handle empty Parquet files, which could lead to runtime errors. I've also included some suggestions to improve type hints for better code clarity and maintainability.
| def deserialize(self) -> "ParquetFileFragment": | ||
| # Implicitly trigger S3 subsystem initialization by importing | ||
| # pyarrow.fs. | ||
| import pyarrow.fs # noqa: F401 | ||
|
|
||
| (file_format, path, filesystem, partition_expression) = cloudpickle.loads( | ||
| self._data | ||
| ) | ||
| return file_format.make_fragment(path, filesystem, partition_expression) | ||
|
|
||
|
|
||
| # Visible for test mocking. | ||
| def _deserialize_fragments( | ||
| serialized_fragments: List[_NoIOSerializableFragmentWrapper], | ||
| ) -> List["pyarrow._dataset.ParquetFileFragment"]: | ||
| return [p.deserialize() for p in serialized_fragments] |
| try: | ||
| prefetch_remote_args = {} | ||
| prefetch_remote_args["num_cpus"] = NUM_CPUS_FOR_META_FETCH_TASK | ||
| if self._local_scheduling: | ||
| prefetch_remote_args["scheduling_strategy"] = self._local_scheduling | ||
| else: | ||
| # Use the scheduling strategy ("SPREAD" by default) provided in | ||
| # `DataContext``, to spread out prefetch tasks in cluster, avoid | ||
| # AWS S3 throttling error. | ||
| # Note: this is the same scheduling strategy used by read tasks. | ||
| prefetch_remote_args[ | ||
| "scheduling_strategy" | ||
| ] = DataContext.get_current().scheduling_strategy | ||
|
|
||
| self._metadata = [ | ||
| ParquetFileMetadata( | ||
| num_bytes=num_bytes, | ||
| ) |
srinathk10
left a comment
There was a problem hiding this comment.
Minor comments. LGTM.
Also we can kick off below release test.
name:tpch_q1_fixed_size
| ) -> List["pyarrow._dataset.ParquetFileFragment"]: | ||
| return [p.deserialize() for p in serialized_fragments] | ||
| @staticmethod | ||
| def make_fragment(format, path, filesystem, partition_expression, file_size): |
There was a problem hiding this comment.
Can add type annotations here
There was a problem hiding this comment.
Agree in principle.
These however are opaque deps we get from and wire back into Pyarrow. I can obviously wire their types, but i don't think that's gonna be very useful.
|
|
||
| sample_infos = sample_fragments( | ||
| # Sample small number of parquet files to estimate | ||
| # - Encoding ratio: ration of file size on disk to approximate expected |
| # 'avg_row_in_mem_bytes' is None if the sampled file was empty and 0 if the data | ||
| # was all null. | ||
| if not sample_info.actual_bytes_per_row: | ||
| if not file_info or not file_info.avg_row_in_mem_bytes: |
There was a problem hiding this comment.
In line 630, should we do instead
max_parquet_reader_row_batch_size_bytes = ctx.target_max_block_size
There was a problem hiding this comment.
Yeah, wasn't happy about it from the beginning but was thinking about leaving it as is to fix in separate PR.
Now that i'm thinking about it, no good reason not to fix it right away.
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…n disk file size (instead of uncompressed byte size) Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Fixed batch-size estimation Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up; Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Limit read-ahead buffer to 1 batch Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
a858acd to
c6912bc
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: sampan <sampan@anyscale.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: yenhong.wong <yenhong.wong@grabtaxi.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for #56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: zac <zac@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for #56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…ct#56268) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This change is a follow-up for ray-project#56105. Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata. This change is addressing that by: - Rebasing encoding ratio to relate estimated in-memory size to the listed file size - Cleaning up unused abstractions (like `ParquetMetadataProvider`) ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Why are these changes needed?
This change is a follow-up for #56105.
Now dataset size estimation is based on listed file sizes. However, encoding ratio was still based on the file size estimates derived from the uncompressed data size obtained from Parquet metadata.
This change is addressing that by:
ParquetMetadataProvider)Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.