[Data][Iceberg] - Add retry policy for Storage + Catalog Writes#60620
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a retry policy for Iceberg storage and catalog write operations to improve resiliency. The changes involve adding new retry configurations to DataContext and wrapping relevant I/O and catalog calls in iceberg_datasink.py with call_with_retry. The implementation is sound and includes a test for storage write retries.
My review focuses on improving code maintainability and test coverage. I've suggested refactoring the duplicated retry logic for catalog operations and adding tests for the newly introduced catalog retry mechanisms.
| def test_write_retry_on_transient_error(pyiceberg_table, fast_retry_config): | ||
| """Test that transient errors during file writes trigger retries.""" | ||
| from unittest.mock import patch | ||
|
|
||
| from ray.data._internal.datasource.iceberg_datasink import IcebergDatasink | ||
| from ray.data._internal.execution.interfaces import TaskContext | ||
|
|
||
| # Create datasink and initialize it | ||
| datasink = IcebergDatasink( | ||
| table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", | ||
| catalog_kwargs=_CATALOG_KWARGS.copy(), | ||
| ) | ||
| datasink.on_write_start() | ||
|
|
||
| # Track call count to simulate transient failures | ||
| call_count = {"count": 0} | ||
|
|
||
| # Import original function before patching | ||
| from pyiceberg.io.pyarrow import _dataframe_to_data_files | ||
|
|
||
| original_func = _dataframe_to_data_files | ||
|
|
||
| def flaky_dataframe_to_data_files(*args, **kwargs): | ||
| call_count["count"] += 1 | ||
| if call_count["count"] <= 2: | ||
| # Fail first 2 attempts with a retryable error | ||
| raise IOError("TestTransientError: simulated transient failure") | ||
| # Succeed on 3rd attempt | ||
| return original_func(*args, **kwargs) | ||
|
|
||
| # Create test data | ||
| data = pa.Table.from_pydict( | ||
| {"col_a": [200, 201], "col_b": ["x", "y"], "col_c": [5, 5]}, | ||
| schema=_SCHEMA, | ||
| ) | ||
|
|
||
| # Patch at pyiceberg module level and call write directly | ||
| with patch( | ||
| "pyiceberg.io.pyarrow._dataframe_to_data_files", | ||
| side_effect=flaky_dataframe_to_data_files, | ||
| ): | ||
| # Call write directly (bypassing Ray workers) | ||
| task_ctx = TaskContext(task_idx=0, op_name="Write") | ||
| result = datasink.write([data], task_ctx) | ||
|
|
||
| # Verify retries occurred (called 3 times: 2 failures + 1 success) | ||
| assert ( | ||
| call_count["count"] == 3 | ||
| ), f"Expected 3 calls (2 retries + 1 success), got {call_count['count']}" | ||
|
|
||
| # Verify write result has data files | ||
| assert len(result.data_files) > 0, "Expected data files in result" | ||
|
|
There was a problem hiding this comment.
This test effectively covers the retry logic for storage writes (_dataframe_to_data_files). However, the PR also adds retry logic for several catalog operations (load_catalog, load_table, commit_transaction, update_schema) which are not covered by tests.
Please add tests to verify the retry mechanism for these catalog operations as well. You could create a new fixture similar to fast_retry_config for catalog retries and then write a test that patches a catalog method to simulate transient failures.
|
/gemini summary |
Summary of ChangesThis pull request significantly improves the fault tolerance of Iceberg data writes within Ray Data. By implementing comprehensive retry policies for both storage and catalog operations, it ensures that transient network issues or service unavailability do not lead to data loss or failed write operations, thereby making data pipelines more reliable and resilient. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
python/ray/data/context.py
Outdated
| iceberg_write_file_max_attempts: int = DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS | ||
| iceberg_write_file_retry_max_backoff_s: int = ( | ||
| DEFAULT_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S | ||
| ) | ||
| iceberg_catalog_max_attempts: int = DEFAULT_ICEBERG_CATALOG_MAX_ATTEMPTS | ||
| iceberg_catalog_retry_max_backoff_s: int = ( | ||
| DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S | ||
| ) | ||
| iceberg_catalog_retried_errors: List[str] = field( | ||
| default_factory=lambda: list(DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS) | ||
| ) |
There was a problem hiding this comment.
I think adding 5 file-format-specific settings to the top-level DataContext might clutter it. Should we encapsulate this in a single dataclass?
There was a problem hiding this comment.
Good call
| DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S = env_integer( | ||
| "RAY_DATA_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S", 16 | ||
| ) | ||
| DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS = ( |
There was a problem hiding this comment.
Out-of-scope for this PR maybe, but I think it might confuse users if there's both write_file_retry_on_errors and iceberg_catalog_retried_errors
Signed-off-by: Goutam <goutam@anyscale.com>
…project#60620) ## Description Ensure that high volume writes are retried for better resiliency. Tested this with the following script on GCP: ``` import logging logging.getLogger("ray._common.retry").setLevel(logging.DEBUG) logging.basicConfig(level=logging.DEBUG) ds = ray.data.range(10000).repartition(5000) ds.write_iceberg( table_identifier="my_namespace.my_table", catalog_kwargs={ "uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog", "warehouse": "gs://my-bucket, "auth": { "type": "google", "google": { "scopes": ["https://www.googleapis.com/auth/cloud-platform"], }, }, "header.x-goog-user-project": "...", "header.X-Iceberg-Access-Delegation": "", } ) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
## Description
Ensure that high volume writes are retried for better resiliency.
Tested this with the following script on GCP:
```
import logging
logging.getLogger("ray._common.retry").setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
ds = ray.data.range(10000).repartition(5000)
ds.write_iceberg(
table_identifier="my_namespace.my_table",
catalog_kwargs={
"uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse": "gs://my-bucket,
"auth": {
"type": "google",
"google": {
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
},
},
"header.x-goog-user-project": "...",
"header.X-Iceberg-Access-Delegation": "",
}
)
```
## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".
## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.
---------
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
## Description
Ensure that high volume writes are retried for better resiliency.
Tested this with the following script on GCP:
```
import logging
logging.getLogger("ray._common.retry").setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
ds = ray.data.range(10000).repartition(5000)
ds.write_iceberg(
table_identifier="my_namespace.my_table",
catalog_kwargs={
"uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse": "gs://my-bucket,
"auth": {
"type": "google",
"google": {
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
},
},
"header.x-goog-user-project": "...",
"header.X-Iceberg-Access-Delegation": "",
}
)
```
## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".
## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.
---------
Signed-off-by: Goutam <goutam@anyscale.com>
…project#60620) ## Description Ensure that high volume writes are retried for better resiliency. Tested this with the following script on GCP: ``` import logging logging.getLogger("ray._common.retry").setLevel(logging.DEBUG) logging.basicConfig(level=logging.DEBUG) ds = ray.data.range(10000).repartition(5000) ds.write_iceberg( table_identifier="my_namespace.my_table", catalog_kwargs={ "uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog", "warehouse": "gs://my-bucket, "auth": { "type": "google", "google": { "scopes": ["https://www.googleapis.com/auth/cloud-platform"], }, }, "header.x-goog-user-project": "...", "header.X-Iceberg-Access-Delegation": "", } ) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
…project#60620) ## Description Ensure that high volume writes are retried for better resiliency. Tested this with the following script on GCP: ``` import logging logging.getLogger("ray._common.retry").setLevel(logging.DEBUG) logging.basicConfig(level=logging.DEBUG) ds = ray.data.range(10000).repartition(5000) ds.write_iceberg( table_identifier="my_namespace.my_table", catalog_kwargs={ "uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog", "warehouse": "gs://my-bucket, "auth": { "type": "google", "google": { "scopes": ["https://www.googleapis.com/auth/cloud-platform"], }, }, "header.x-goog-user-project": "...", "header.X-Iceberg-Access-Delegation": "", } ) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…project#60620) ## Description Ensure that high volume writes are retried for better resiliency. Tested this with the following script on GCP: ``` import logging logging.getLogger("ray._common.retry").setLevel(logging.DEBUG) logging.basicConfig(level=logging.DEBUG) ds = ray.data.range(10000).repartition(5000) ds.write_iceberg( table_identifier="my_namespace.my_table", catalog_kwargs={ "uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog", "warehouse": "gs://my-bucket, "auth": { "type": "google", "google": { "scopes": ["https://www.googleapis.com/auth/cloud-platform"], }, }, "header.x-goog-user-project": "...", "header.X-Iceberg-Access-Delegation": "", } ) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Ensure that high volume writes are retried for better resiliency.
Tested this with the following script on GCP:
Related issues
Additional information