-
Notifications
You must be signed in to change notification settings - Fork 323
refactor: consolidate snapshot expiration into MaintenanceTable #2143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
refactor: consolidate snapshot expiration into MaintenanceTable #2143
Conversation
…h a new Expired Snapshot class. updated tests.
ValueError: Cannot expire snapshot IDs {3051729675574597004} as they are currently referenced by table refs.
Moved expiration-related methods from `ExpireSnapshots` to `ManageSnapshots` for improved organization and clarity. Updated corresponding pytest tests to reflect these changes.
Re-ran the `poetry run pre-commit run --all-files` command on the project.
Re-ran the `poetry run pre-commit run --all-files` command on the project.
Moved: the functions for expiring snapshots to their own class.
…ng it in a separate issue. Fixed: unrelated changes caused by afork/branch sync issues.
Co-authored-by: Fokko Driesprong <[email protected]>
Implemented logic to protect the HEAD branches or Tagged branches from being expired by the `expire_snapshot_by_id` method.
@Fokko @jayceslesar let me know if you guys prefer i stack this pr into the #1200 or if you both would rather i wait until the #1200 is merged into |
Great seeing this PR @ForeverAngry, thanks again for working on this! I'm okay with first merging #1200, but we could also merge this first, and adapt the remove orphan files routine to use |
@Fokko did you decide if you wanted me to stay stacked on the delete orphans pr, or go ahead and prepare the pr for this, to the main branch? |
a6c3b63
to
9937894
Compare
(1) apache#2130 with addition of the new `deduplicate_data_files` function to the `MaintenanceTable` class. (2) apache#2151 with the removal of the errant member variable from the `ManageSnapshots` class. (3) apache#2150 by adding the additional functions to be at parity with the Java API.
- **Duplicate File Remediation apache#2130** - Added `deduplicate_data_files` to the `MaintenanceTable` class. - Enables detection and removal of duplicate data files, improving table hygiene and storage efficiency. - **Support `retainLast` and `setMinSnapshotsToKeep` Snapshot Retention Policies apache#2150** - Added new snapshot retention methods to `MaintenanceTable` for feature parity with the Java API: - `retain_last_n_snapshots(n)`: Retain only the last N snapshots. - `expire_snapshots_older_than_with_retention(timestamp_ms, retain_last_n=None, min_snapshots_to_keep=None)`: Expire snapshots older than a timestamp, with additional retention constraints. - `expire_snapshots_with_retention_policy(timestamp_ms=None, retain_last_n=None, min_snapshots_to_keep=None)`: Unified retention policy supporting time-based and count-based constraints. - All retention logic respects protected snapshots (branches/tags) and includes guardrails to prevent over-aggressive expiration. ### Bug Fixes & Cleanups - **Remove unrelated instance variable from the `ManageSnapshots` class apache#2151** - Removed an errant member variable from the `ManageSnapshots` class, aligning the implementation with the intended design and the Java reference. ### Testing & Documentation - Consolidated all snapshot expiration and retention tests into a single file (`test_retention_strategies.py`), covering: - Basic expiration by ID and timestamp. - Protection of branch/tag snapshots. - Retention guardrails and combined policies. - Deduplication of data files. - Added and updated documentation to describe all new retention strategies, deduplication, and API parity improvements.
…intenance operations
def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]: | ||
"""Get the IDs of protected snapshots. | ||
|
||
These are the HEAD snapshots of all branches and all tagged snapshots. | ||
These ids are to be excluded from expiration. | ||
|
||
Args: | ||
table_metadata: The table metadata to check for protected snapshots. | ||
|
||
Returns: | ||
Set of protected snapshot IDs to exclude from expiration. | ||
""" | ||
from pyiceberg.table.refs import SnapshotRefType | ||
|
||
protected_ids: Set[int] = set() | ||
for ref in table_metadata.refs.values(): | ||
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]: | ||
protected_ids.add(ref.snapshot_id) | ||
return protected_ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know the answer to this but is this different than just the refs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think thats part of it, but there is a bit more validation around what is eligible to be expired. That being said, i dont think you initial intuition is wrong :), i think it all boils down to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayceslesar I went back and took a closer look at the refs, and wanted to give a slightly better response than my previous one. To me, the refs file seems like an object model and some enums. If I'm missing something, let me know! I really appreciate your responsiveness and input! 🙏 🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected_ids
is the same as set(table.inspect.refs()["snapshot_id"].to_pylist())
is what I was trying to say
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the same as {ref.snapshot_id for ref in tbl.metadata.refs.values()}
I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha - im happy to make that change, if you like! Let me know!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think if we can rely on existing code that is good!
…Table The deduplicate_data_files() method was not properly removing duplicate data file references from Iceberg tables. After deduplication, multiple references to the same data file remained instead of the expected single reference. Root causes: 1. _get_all_datafiles() was scanning ALL snapshots instead of current only 2. Incorrect transaction API usage that didn't leverage snapshot updates 3. Missing proper overwrite logic to create clean deduplicated snapshots Key fixes: - Modified _get_all_datafiles() to scan only current snapshot manifests - Implemented proper transaction pattern using update_snapshot().overwrite() - Added explicit delete_data_file() calls for duplicates + append_data_file() for unique files - Removed unused helper methods _get_all_datafiles_with_context() and _detect_duplicates() Technical details: - Deduplication now operates on ManifestEntry objects from current snapshot only - Files are grouped by basename and first occurrence is kept as canonical reference - New snapshot created atomically replaces current snapshot with deduplicated file list - Proper Iceberg transaction semantics ensure data consistency Tests: All deduplication tests now pass including the previously failing test_deduplicate_data_files_removes_duplicates_in_current_snapshot Fixes: Table maintenance deduplication functionality
…ion context in MaintenanceTable
@Fokko @jayceslesar, i wasn't sure when #1200 was going to be merged, and July tends to be pretty busy for me, so I thought i would use the framework for the |
def _get_all_datafiles(self) -> List[DataFile]: | ||
"""Collect all DataFiles in the current snapshot only.""" | ||
datafiles: List[DataFile] = [] | ||
|
||
current_snapshot = self.tbl.current_snapshot() | ||
if not current_snapshot: | ||
return datafiles | ||
|
||
def process_manifest(manifest: ManifestFile) -> list[DataFile]: | ||
found: list[DataFile] = [] | ||
for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=True): | ||
if hasattr(entry, "data_file"): | ||
found.append(entry.data_file) | ||
return found | ||
|
||
# Scan only the current snapshot's manifests | ||
manifests = current_snapshot.manifests(io=self.tbl.io) | ||
with ThreadPoolExecutor() as executor: | ||
results = executor.map(process_manifest, manifests) | ||
for res in results: | ||
datafiles.extend(res) | ||
|
||
return datafiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above, why cant you use
iceberg-python/pyiceberg/table/inspect.py
Line 665 in 9c99f32
def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above, why cant you use
iceberg-python/pyiceberg/table/inspect.py
Line 665 in 9c99f32
def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
HI @jayceslesar , Yeah, i agree its similar. I actually looked at using inspect
originally, and tried to use the DataFile.from_args()
to go from the json object back to a DataFile
, however, I couldn't seem to find a way to get this to work right, after trying a few different approaches. This was the easiest way i could think of. If you have an idea in mind, or know what I was missing, let me know!
…nce deduplication tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gonna be awesome, left a few comments.
One general question I have is I dont us making use of the following table properties anywhere where I think we should favor them in the case that the user doesn't specify:
history.expire.max-snapshot-age-ms
history.expire.min-snapshots-to-keep
history.expire.max-ref-age-ms
files_table: list[pa.Table] = [] | ||
for manifest_list in snapshot.manifests(io): | ||
files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
results = list( | ||
executor.map( | ||
lambda manifest_list: self._get_files_from_manifest(manifest_list, data_file_filter), snapshot.manifests(io) | ||
) | ||
) | ||
return pa.concat_tables(results) | ||
return pa.concat_tables(files_table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is de-parallelized? Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it was not :) Good catch!
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
snapshots = self.tbl.snapshots() | ||
# coerce into snapshot objects if users passes in snapshot ids | ||
if snapshots is not None: | ||
if isinstance(snapshots[0], int): | ||
snapshots = [ | ||
snapshot | ||
for snapshot_id in snapshots | ||
if (snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id)) is not None | ||
] | ||
else: | ||
snapshots = self.tbl.snapshots() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have written this and it got cherry picked in but I think its simpler to only allow Snapshot
objects until there is a need to allow either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr
Sounds good ill make the change!
I originally forked your branch so I could stack my PR on your “Delete Orphans” PR. As July began, my schedule looked pretty rough, so I converted my draft into a PR against the main pyiceberg branch—since I wasn’t sure how much time I’d have later in the month—but I forgot to rebase. As a result, I inadvertently removed the code for deleting orphans while keeping your MaintenanceTable implementation in a more… manual way 😟, so there may still be some remnants. They say the best form of flattery is imitation 😉.
def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> None: | ||
"""Expire multiple snapshots by their IDs. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this signature exist upstream? https://iceberg.apache.org/javadoc/1.9.1/org/apache/iceberg/ExpireSnapshots.html if not I dont think we should add it here or at least make it private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ill check if it exists upstream, if it does i can ill refactor to expire_snapshots
(i will also check if that exists upstream.). otherwise i will make it private!
|
||
try: | ||
import pyarrow as pa # noqa | ||
except ModuleNotFoundError as e: | ||
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont think we need this without the orphan code included, im also thinking of ways we can acheive without pyarrow so I think safe to remove
def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]: | ||
"""Get the IDs of protected snapshots. | ||
|
||
These are the HEAD snapshots of all branches and all tagged snapshots. | ||
These ids are to be excluded from expiration. | ||
|
||
Args: | ||
table_metadata: The table metadata to check for protected snapshots. | ||
|
||
Returns: | ||
Set of protected snapshot IDs to exclude from expiration. | ||
""" | ||
from pyiceberg.table.refs import SnapshotRefType | ||
|
||
protected_ids: Set[int] = set() | ||
for ref in table_metadata.refs.values(): | ||
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]: | ||
protected_ids.add(ref.snapshot_id) | ||
return protected_ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think if we can rely on existing code that is good!
def deduplicate_data_files(self) -> List[DataFile]: | ||
""" | ||
Remove duplicate data files from an Iceberg table. | ||
|
||
Returns: | ||
List of removed DataFile objects. | ||
""" | ||
import os | ||
from collections import defaultdict | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe logic for this should be added in a different PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was one of those selfish things. I was hoping to get it into the next version, for an issue i was having. If you feel strongly about it being a separate PR, i can do that :), but if you think we can let it slide, it would solve some headaches for me sooner, rather than later.
Rationale for this change
ExpireSnapshots
class into theMaintenanceTable
class for a unified maintenance API.Features & Enhancements
Duplicate Data File Remediation (#2130)
deduplicate_data_files
toMaintenanceTable
.Advanced Snapshot Retention (#2150)
retain_last_n_snapshots(n)
— Retain only the latest N snapshots.expire_snapshots_older_than_with_retention(timestamp_ms, retain_last_n=None, min_snapshots_to_keep=None)
— Expire snapshots older than a given timestamp, with optional retention constraints.expire_snapshots_with_retention_policy(timestamp_ms=None, retain_last_n=None, min_snapshots_to_keep=None)
— Unified retention policy supporting both time-based and count-based constraints.expire_snapshots_older_than
method.Bug Fixes & Cleanups
ManageSnapshots
class, aligning with the Java reference implementation.Testing & Documentation
test_retention_strategies.py
, including:Are these changes tested?
Yes. All changes are tested.
, with this PR predicated on the final changes from #1200.This work builds on the framework introduced by @jayceslesar in #1200 for theMaintenanceTable
.Are there any user-facing changes?
Breaking Changes:
ExpireSnapshots
functionality toMaintenanceTable
ExpireSnapshots
class entirelytable.maintenance.*
APIAPI Changes
Before:
Now:
Closes:
retainLast
andsetMinSnapshotsToKeep
Snapshot Retention Policies #2150ManageSnapshots
class #2151