Skip to content

Commit 14fe460

Browse files
authored
Introduce branching and support for spark.wap.branch (#406)
## Summary Implement Iceberg table branching functionality in OpenHouse internal catalog with comprehensive testing infrastructure. Strategically, we trust that only a valid client performs the operation. This is safe because in fact O.S. model has the client write metadata.json and send the pointer to the catalog server. So, GlueCatalog (e.g. gluetableoperations.java), DynamoDbCatalog, jdbctableoperations.java do client-only metadata operations (no server-side validation exists). The client behavior is also covered extensively by Iceberg compatibility tests applied to openhouse. When we trust the client, we simply serialize the client request and later deserialize it. Since TableMetadata fields lack public access and only some are exposed via the builder, we start from the base metadata, take the serialized client request, and apply it to base by comparing with request and: truncating snapshot IDs from base, adding new IDs from the request, and moving the current pointer according to the request. how to test oh suite tests - this branch ``` cd /Users/chbush/code/li/project_openhouse/openhouse && \ export JAVA_HOME=$(/usr/libexec/java_home -v 17) && \ ./gradlew clean \ :integrations:spark:spark-3.5:openhouse-spark-3.5-itest:catalogTest \ --tests "com.linkedin.openhouse.spark.catalogtest.BranchTestSpark3_5" \ :iceberg:openhouse:internalcatalog:test \ --tests "com.linkedin.openhouse.internal.catalog.SnapshotDiffApplierTest" \ --tests "com.linkedin.openhouse.internal.catalog.OpenHouseInternalTableOperationsTest" \ :integrations:spark:spark-3.1:openhouse-spark-itest:catalogTest \ --tests "com.linkedin.openhouse.spark.catalogtest.WapIdTest" \ :apps:openhouse-spark-apps-1.5_2.12:test \ --tests "com.linkedin.openhouse.catalog.e2e.WapIdJavaTest" ``` compatibility tests - includes this branch + new shim files, and also requires this iceberg https://github.com/cbb330/openhouse/tree/working-wap-compatibility https://github.com/cbb330/iceberg/tree/test-harness ``` # 1. Clean up local maven repo to ensure fresh publish rm -rf /Users/chbush/code/li/project_openhouse/openhouse/build && \ rm -rf /Users/chbush/.m2/repository/com/linkedin/openhouse/tables-test-fixtures* && \ # 2. Build and publish the fixtures jar (containing the new test list) cd /Users/chbush/code/li/project_openhouse/openhouse && \ export JAVA_HOME=$(/usr/libexec/java_home -v 17) && \ ./gradlew :tables-test-fixtures:tables-test-fixtures-iceberg-1.5_2.12:clean \ :tables-test-fixtures:tables-test-fixtures-iceberg-1.5_2.12:shadowJar \ :tables-test-fixtures:tables-test-fixtures-iceberg-1.5_2.12:publishToMavenLocal && \ # 3. Run the compatibility tests cd /Users/chbush/code/li/project_openhouse/iceberg && \ export JAVA_HOME=$(/usr/libexec/java_home -v 17) && \ ./gradlew :iceberg-core:cleanOpenhouseCompatibilityTest \ :iceberg-spark:iceberg-spark-3.5_2.12:cleanOpenhouseCompatibilityTest \ :iceberg-spark:iceberg-spark-extensions-3.5_2.12:cleanOpenhouseCompatibilityTest \ :iceberg-core:openhouseCompatibilityTest \ :iceberg-spark:iceberg-spark-3.5_2.12:openhouseCompatibilityTest \ :iceberg-spark:iceberg-spark-extensions-3.5_2.12:openhouseCompatibilityTest \ -PopenhouseCompatibilityCoordinate=com.linkedin.openhouse:tables-test-fixtures-iceberg-1.5_2.12:0.0.19-SNAPSHOT:uber ``` ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent ff9a40c commit 14fe460

File tree

14 files changed

+3281
-642
lines changed

14 files changed

+3281
-642
lines changed

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,21 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {
5959

6060
@Autowired StorageType storageType;
6161

62-
@Autowired SnapshotInspector snapshotInspector;
63-
6462
@Autowired HouseTableMapper houseTableMapper;
6563

6664
@Autowired MeterRegistry meterRegistry;
6765

6866
@Override
6967
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
68+
FileIO fileIO = resolveFileIO(tableIdentifier);
69+
MetricsReporter metricsReporter =
70+
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList());
7071
return new OpenHouseInternalTableOperations(
7172
houseTableRepository,
72-
resolveFileIO(tableIdentifier),
73-
snapshotInspector,
73+
fileIO,
7474
houseTableMapper,
7575
tableIdentifier,
76-
new MetricsReporter(this.meterRegistry, METRICS_PREFIX, Lists.newArrayList()),
76+
metricsReporter,
7777
fileIOManager);
7878
}
7979

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java

Lines changed: 40 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.IOException;
2424
import java.time.Clock;
2525
import java.time.Instant;
26-
import java.util.ArrayList;
2726
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
@@ -34,8 +33,6 @@
3433
import java.util.stream.Collectors;
3534
import lombok.AllArgsConstructor;
3635
import lombok.extern.slf4j.Slf4j;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.commons.collections.MapUtils;
3936
import org.apache.hadoop.fs.FileSystem;
4037
import org.apache.iceberg.BaseMetastoreTableOperations;
4138
import org.apache.iceberg.PartitionField;
@@ -44,7 +41,6 @@
4441
import org.apache.iceberg.SchemaParser;
4542
import org.apache.iceberg.Snapshot;
4643
import org.apache.iceberg.SnapshotRef;
47-
import org.apache.iceberg.SnapshotSummary;
4844
import org.apache.iceberg.SortDirection;
4945
import org.apache.iceberg.SortField;
5046
import org.apache.iceberg.SortOrder;
@@ -56,11 +52,11 @@
5652
import org.apache.iceberg.exceptions.BadRequestException;
5753
import org.apache.iceberg.exceptions.CommitFailedException;
5854
import org.apache.iceberg.exceptions.CommitStateUnknownException;
55+
import org.apache.iceberg.exceptions.ValidationException;
5956
import org.apache.iceberg.expressions.Expressions;
6057
import org.apache.iceberg.expressions.Term;
6158
import org.apache.iceberg.io.FileIO;
6259
import org.apache.iceberg.relocated.com.google.common.base.Objects;
63-
import org.springframework.data.util.Pair;
6460

6561
@AllArgsConstructor
6662
@Slf4j
@@ -70,8 +66,6 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio
7066

7167
FileIO fileIO;
7268

73-
SnapshotInspector snapshotInspector;
74-
7569
HouseTableMapper houseTableMapper;
7670

7771
TableIdentifier tableIdentifier;
@@ -267,32 +261,56 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
267261
String sortOrderJson = properties.remove(CatalogConstants.SORT_ORDER_KEY);
268262
logPropertiesMap(properties);
269263

270-
TableMetadata updatedMetadata = metadata.replaceProperties(properties);
264+
TableMetadata metadataToCommit = metadata.replaceProperties(properties);
271265

272266
if (sortOrderJson != null) {
273-
SortOrder sortOrder = SortOrderParser.fromJson(updatedMetadata.schema(), sortOrderJson);
274-
updatedMetadata = updatedMetadata.replaceSortOrder(sortOrder);
267+
SortOrder sortOrder = SortOrderParser.fromJson(metadataToCommit.schema(), sortOrderJson);
268+
metadataToCommit = metadataToCommit.replaceSortOrder(sortOrder);
275269
}
276270

277271
if (serializedSnapshotsToPut != null) {
278272
List<Snapshot> snapshotsToPut =
279273
SnapshotsUtil.parseSnapshots(fileIO, serializedSnapshotsToPut);
280-
Pair<List<Snapshot>, List<Snapshot>> snapshotsDiff =
281-
SnapshotsUtil.symmetricDifferenceSplit(snapshotsToPut, updatedMetadata.snapshots());
282-
List<Snapshot> appendedSnapshots = snapshotsDiff.getFirst();
283-
List<Snapshot> deletedSnapshots = snapshotsDiff.getSecond();
284-
snapshotInspector.validateSnapshotsUpdate(
285-
updatedMetadata, appendedSnapshots, deletedSnapshots);
286274
Map<String, SnapshotRef> snapshotRefs =
287275
serializedSnapshotRefs == null
288276
? new HashMap<>()
289277
: SnapshotsUtil.parseSnapshotRefs(serializedSnapshotRefs);
290-
updatedMetadata =
291-
maybeAppendSnapshots(updatedMetadata, appendedSnapshots, snapshotRefs, true);
292-
updatedMetadata = maybeDeleteSnapshots(updatedMetadata, deletedSnapshots);
278+
279+
TableMetadata.Builder builder = TableMetadata.buildFrom(metadataToCommit);
280+
281+
// 1. Identify which snapshots are new vs existing
282+
Set<Long> existingSnapshotIds =
283+
metadataToCommit.snapshots().stream()
284+
.map(Snapshot::snapshotId)
285+
.collect(Collectors.toSet());
286+
Set<Long> newSnapshotIds =
287+
snapshotsToPut.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
288+
289+
// 2. Add new snapshots
290+
snapshotsToPut.stream()
291+
.filter(s -> !existingSnapshotIds.contains(s.snapshotId()))
292+
.forEach(builder::addSnapshot);
293+
294+
// 3. Remove snapshots that are no longer present in the client payload
295+
List<Long> toRemove =
296+
existingSnapshotIds.stream()
297+
.filter(id -> !newSnapshotIds.contains(id))
298+
.collect(Collectors.toList());
299+
if (!toRemove.isEmpty()) {
300+
builder.removeSnapshots(toRemove);
301+
}
302+
303+
// 4. Sync Refs: Remove refs not in payload, Set/Update refs from payload
304+
metadataToCommit.refs().keySet().stream()
305+
.filter(ref -> !snapshotRefs.containsKey(ref))
306+
.forEach(builder::removeRef);
307+
308+
snapshotRefs.forEach(builder::setRef);
309+
310+
metadataToCommit = builder.build();
293311
}
294312

295-
final TableMetadata updatedMtDataRef = updatedMetadata;
313+
final TableMetadata updatedMtDataRef = metadataToCommit;
296314
long metadataUpdateStartTime = System.currentTimeMillis();
297315
try {
298316
metricsReporter.executeWithStats(
@@ -314,7 +332,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
314332
throw e;
315333
}
316334

317-
houseTable = houseTableMapper.toHouseTable(updatedMetadata, fileIO);
335+
houseTable = houseTableMapper.toHouseTable(metadataToCommit, fileIO);
318336
if (base != null
319337
&& (properties.containsKey(CatalogConstants.OPENHOUSE_TABLEID_KEY)
320338
&& !properties
@@ -358,7 +376,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
358376
e);
359377
}
360378
throw new CommitFailedException(ioe);
361-
} catch (InvalidIcebergSnapshotException e) {
379+
} catch (InvalidIcebergSnapshotException | IllegalArgumentException | ValidationException e) {
362380
throw new BadRequestException(e, e.getMessage());
363381
} catch (CommitFailedException e) {
364382
throw e;
@@ -531,125 +549,6 @@ private void failIfRetryUpdate(Map<String, String> properties) {
531549
}
532550
}
533551

534-
public TableMetadata maybeDeleteSnapshots(
535-
TableMetadata metadata, List<Snapshot> snapshotsToDelete) {
536-
TableMetadata result = metadata;
537-
if (CollectionUtils.isNotEmpty(snapshotsToDelete)) {
538-
Set<Long> snapshotIds =
539-
snapshotsToDelete.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
540-
Map<String, String> updatedProperties = new HashMap<>(result.properties());
541-
updatedProperties.put(
542-
getCanonicalFieldName(CatalogConstants.DELETED_SNAPSHOTS),
543-
snapshotsToDelete.stream()
544-
.map(s -> Long.toString(s.snapshotId()))
545-
.collect(Collectors.joining(",")));
546-
result =
547-
TableMetadata.buildFrom(result)
548-
.setProperties(updatedProperties)
549-
.build()
550-
.removeSnapshotsIf(s -> snapshotIds.contains(s.snapshotId()));
551-
metricsReporter.count(
552-
InternalCatalogMetricsConstant.SNAPSHOTS_DELETED_CTR, snapshotsToDelete.size());
553-
}
554-
return result;
555-
}
556-
557-
public TableMetadata maybeAppendSnapshots(
558-
TableMetadata metadata,
559-
List<Snapshot> snapshotsToAppend,
560-
Map<String, SnapshotRef> snapshotRefs,
561-
boolean recordAction) {
562-
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(metadata);
563-
List<String> appendedSnapshots = new ArrayList<>();
564-
List<String> stagedSnapshots = new ArrayList<>();
565-
List<String> cherryPickedSnapshots = new ArrayList<>();
566-
// Throw an exception if client sent request that included non-main branches in the
567-
// snapshotRefs.
568-
for (Map.Entry<String, SnapshotRef> entry : snapshotRefs.entrySet()) {
569-
if (!entry.getKey().equals(SnapshotRef.MAIN_BRANCH)) {
570-
throw new UnsupportedOperationException("OpenHouse supports only MAIN branch");
571-
}
572-
}
573-
/**
574-
* First check if there are new snapshots to be appended to current TableMetadata. If yes,
575-
* following are the cases to be handled:
576-
*
577-
* <p>[1] A regular (non-wap) snapshot is being added to the MAIN branch.
578-
*
579-
* <p>[2] A staged (wap) snapshot is being created on top of current snapshot as its base.
580-
* Recognized by STAGED_WAP_ID_PROP.
581-
*
582-
* <p>[3] A staged (wap) snapshot is being cherry picked to the MAIN branch wherein current
583-
* snapshot in the MAIN branch is not the same as the base snapshot the staged (wap) snapshot
584-
* was created on. Recognized by SOURCE_SNAPSHOT_ID_PROP. This case is called non-fast forward
585-
* cherry pick.
586-
*
587-
* <p>In case no new snapshots are to be appended to current TableMetadata, there could be a
588-
* cherrypick of a staged (wap) snapshot on top of the current snapshot in the MAIN branch which
589-
* is the same as the base snapshot the staged (wap) snapshot was created on. This case is
590-
* called fast forward cherry pick.
591-
*/
592-
if (CollectionUtils.isNotEmpty(snapshotsToAppend)) {
593-
for (Snapshot snapshot : snapshotsToAppend) {
594-
snapshotInspector.validateSnapshot(snapshot);
595-
if (snapshot.summary().containsKey(SnapshotSummary.STAGED_WAP_ID_PROP)) {
596-
// a stage only snapshot using wap.id
597-
metadataBuilder.addSnapshot(snapshot);
598-
stagedSnapshots.add(String.valueOf(snapshot.snapshotId()));
599-
} else if (snapshot.summary().containsKey(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)) {
600-
// a snapshot created on a non fast-forward cherry-pick snapshot
601-
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
602-
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
603-
cherryPickedSnapshots.add(
604-
String.valueOf(snapshot.summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP)));
605-
} else {
606-
// a regular snapshot
607-
metadataBuilder.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH);
608-
appendedSnapshots.add(String.valueOf(snapshot.snapshotId()));
609-
}
610-
}
611-
} else if (MapUtils.isNotEmpty(snapshotRefs)) {
612-
// Updated ref in the main branch with no new snapshot means this is a
613-
// fast-forward cherry-pick or rollback operation.
614-
long newSnapshotId = snapshotRefs.get(SnapshotRef.MAIN_BRANCH).snapshotId();
615-
// Either the current snapshot is null or the current snapshot is not equal
616-
// to the new snapshot indicates an update. The first case happens when the
617-
// stage/wap snapshot being cherry-picked is the first snapshot.
618-
if (MapUtils.isEmpty(metadata.refs())
619-
|| metadata.refs().get(SnapshotRef.MAIN_BRANCH).snapshotId() != newSnapshotId) {
620-
metadataBuilder.setBranchSnapshot(newSnapshotId, SnapshotRef.MAIN_BRANCH);
621-
cherryPickedSnapshots.add(String.valueOf(newSnapshotId));
622-
}
623-
}
624-
if (recordAction) {
625-
Map<String, String> updatedProperties = new HashMap<>(metadata.properties());
626-
if (CollectionUtils.isNotEmpty(appendedSnapshots)) {
627-
updatedProperties.put(
628-
getCanonicalFieldName(CatalogConstants.APPENDED_SNAPSHOTS),
629-
appendedSnapshots.stream().collect(Collectors.joining(",")));
630-
metricsReporter.count(
631-
InternalCatalogMetricsConstant.SNAPSHOTS_ADDED_CTR, appendedSnapshots.size());
632-
}
633-
if (CollectionUtils.isNotEmpty(stagedSnapshots)) {
634-
updatedProperties.put(
635-
getCanonicalFieldName(CatalogConstants.STAGED_SNAPSHOTS),
636-
stagedSnapshots.stream().collect(Collectors.joining(",")));
637-
metricsReporter.count(
638-
InternalCatalogMetricsConstant.SNAPSHOTS_STAGED_CTR, stagedSnapshots.size());
639-
}
640-
if (CollectionUtils.isNotEmpty(cherryPickedSnapshots)) {
641-
updatedProperties.put(
642-
getCanonicalFieldName(CatalogConstants.CHERRY_PICKED_SNAPSHOTS),
643-
cherryPickedSnapshots.stream().collect(Collectors.joining(",")));
644-
metricsReporter.count(
645-
InternalCatalogMetricsConstant.SNAPSHOTS_CHERRY_PICKED_CTR,
646-
cherryPickedSnapshots.size());
647-
}
648-
metadataBuilder.setProperties(updatedProperties);
649-
}
650-
return metadataBuilder.build();
651-
}
652-
653552
/** Helper function to dump contents for map in debugging mode. */
654553
private void logPropertiesMap(Map<String, String> map) {
655554
log.debug(" === Printing the table properties within doCommit method === ");

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/SnapshotInspector.java

Lines changed: 0 additions & 96 deletions
This file was deleted.

0 commit comments

Comments
 (0)