Fix case where messages can be written multiple times in Iceberg#29251
Conversation
f3491fb to
2e688f2
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug where Iceberg messages could be written multiple times when disk or memory reservation errors occur after a successful row write. The issue arose because serde_parquet_writer::add_data_struct would return an error after successfully writing a row, causing callers to retry the write.
Changes:
- Modified error handling to defer reporting of post-write reservation errors to the next call
- Added an
_errorfield to track deferred errors - Added test infrastructure to inject OOM conditions for validation
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/v/datalake/serde_parquet_writer.h |
Added _error field to store deferred errors |
src/v/datalake/serde_parquet_writer.cc |
Updated error handling to defer post-write errors and return success when row write succeeds |
src/v/datalake/tests/test_data_writer.h |
Added OOM injection capability to noop_mem_tracker for testing |
src/v/datalake/tests/serde_parquet_writer_test.cc |
Added test case validating correct behavior when OOM occurs after row write |
| if (_error != writer_error::ok) { | ||
| co_return std::exchange(_error, writer_error::ok); | ||
| } |
There was a problem hiding this comment.
There's a potential race condition if add_data_struct can be called concurrently. The check-and-exchange pattern on _error is not atomic, which could lead to lost errors or multiple coroutines seeing the same error. Consider adding synchronization or documenting that this method must not be called concurrently.
There was a problem hiding this comment.
It can never be called concurrently from different threads and therefore there is no need for std::exchange to be atomic.
There was a problem hiding this comment.
do we need to std::exchange()? Just retain the _error state once it is set?
There was a problem hiding this comment.
The local_parquet_file_writer has a similar mechanism that prevents further writes once the serde_parquet_writer returns an error once. So I think the behavior will be the same whether or not we clear the error in serde_parquet_writer, at least for any user of local_parquet_file_writer. Will remove the exchange.
| // Hence, the current solution is to return those errors on the subsequent | ||
| // call to `add_data_struct`. |
There was a problem hiding this comment.
The comment should clarify what happens if the error is never consumed (e.g., if add_data_struct is not called again before finish()). Document whether finish() checks _error or if the caller needs to ensure one final call to add_data_struct to retrieve deferred errors.
| // Hence, the current solution is to return those errors on the subsequent | |
| // call to `add_data_struct`. | |
| // Hence, the current solution is to defer those errors by storing them in | |
| // `_error` and returning them on the subsequent call to `add_data_struct`. | |
| // Callers must ensure that any deferred error is observed (either via a | |
| // follow-up call to `add_data_struct` or by the finalization path, e.g. | |
| // `finish()` inspecting `_error`) before treating the writer as | |
| // successfully completed. |
2e688f2 to
59fd83b
Compare
59fd83b to
2e51457
Compare
Retry command for Build#79055please wait until all jobs are finished before running the slash command |
CI test resultstest results on build#79055
test results on build#79607
|
|
/ci-repeat 1 |
bharathv
left a comment
There was a problem hiding this comment.
lgtm just one minor question.
| if (_error != writer_error::ok) { | ||
| co_return std::exchange(_error, writer_error::ok); | ||
| } |
There was a problem hiding this comment.
do we need to std::exchange()? Just retain the _error state once it is set?
Callers of `serde_parquet_writer::add_data_struct` assume that the data struct wasn't successfully written to the file if `writer_error::ok` isn't returned. However, it's possible for `_writer.write_row(...)` to be successful then have some disk/memory reservation error. Prior to this commit these errors would result in the message being written twice as the callers would assume that the first write had failed.
In a previous commit `serde_parquet_writer` was changed to always return an `ok` result when the record is written. This resulted in the flush at the translation_task level to no longer return a no data error. The unit test is therefore modified in this commit to reflect the new behavior.
2e51457 to
7f506e3
Compare
|
/backport v25.3.x |
|
/backport v25.2.x |
|
/backport v25.1.x |
|
Failed to create a backport PR to v25.1.x branch. I tried: |
|
Failed to create a backport PR to v25.2.x branch. I tried: |
Callers of
serde_parquet_writer::add_data_structassume that the data struct wasn't successfully written to the file ifwriter_error::okisn't returned. However, it's possible for_writer.write_row(...)to be successful then have some disk/memory reservation error. Prior to this PR these errors would result in the message being written twice as the callers would assume that the first write had failed.Backports Required
Release Notes
Bug Fixes