Skip to content

Commit b450bf7

Browse files
srawat98-devsrawat
andauthored
Implementation[OpenhouseCommitEventTablePartitions]: Add partition-level commit event collection and publishing in TableStatsCollectionSparkApp (#402)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> I extended the existing TableStatsCollectionSparkApp to implement the logic for populating the openhouseTableCommitEventsPartitions table. This new table will serve as the partition-level source of truth for commit-related metadata across all OpenHouse datasets, including: 1. Commit ID (snapshot_id) 2. Commit timestamp (committed_at) 3. Commit operation (APPEND, DELETE, OVERWRITE, REPLACE) 4. Partition data (typed column values for all partition columns) 5. Spark App ID and Spark App Name 6. Table identifier (database, table, cluster, location, partition spec) This enables granular tracking of which partitions were affected by each commit, providing: 1. Partition-level lineage - Track exactly which partitions changed in each commit 2. Fine-grained auditing - Monitor data changes at partition granularity 3. Optimized queries - Query only relevant partitions for specific time ranges 4. Incremental processing - Identify changed partitions for downstream pipelines ## Output This PR populates the openhouseTableCommitEventsPartitions table by querying the Iceberg all_entries and snapshots metadata tables for all OpenHouse datasets. **End-to-End Verification (Docker)** 1. publishCommitEvents Log Output ``` 25/12/01 12:11:22 INFO spark.TableStatsCollectionSparkApp: Publishing commit events for table:testdb.partition_stats_test 25/12/01 12:11:22 INFO spark.TableStatsCollectionSparkApp: [{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-6f34c382-ced3-4834-813b-b40acf74b0c4","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":9208708835032390256,"commitTimestampMs":1764590975624,"commitAppId":"local-1764590946735","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764591082431},{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-6f34c382-ced3-4834-813b-b40acf74b0c4","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":5567407446452786456,"commitTimestampMs":1764590978518,"commitAppId":"local-1764590946735","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764591082431},{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-6f34c382-ced3-4834-813b-b40acf74b0c4","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":1894327931030053191,"commitTimestampMs":1764590980739,"commitAppId":"local-1764590946735","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764591082431}] ``` Key Points: - ✅ All 3 commit events published successfully - ✅ commitAppId: "local-1764590946735" (populated) - ✅ commitAppName: "Spark shell" (populated) - ✅ commitOperation: "APPEND" (properly parsed) 2. publishPartitionEvents Log Output ``` 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: Publishing partition events for table: testdb.partition_stats_test 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: [{"partitionData":[{"columnName":"event_time_day","value":"2024-01-03"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-9091e45f-fd03-4f7e-9a95-a051a5e5e10f","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":4471081304043344222,"commitTimestampMs":1764621880000,"commitAppId":"local-1764621844757","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764621954954},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-01"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-9091e45f-fd03-4f7e-9a95-a051a5e5e10f","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":5214137394193985715,"commitTimestampMs":1764621875000,"commitAppId":"local-1764621844757","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764621954954},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-02"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-9091e45f-fd03-4f7e-9a95-a051a5e5e10f","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":7033261685039461134,"commitTimestampMs":1764621878000,"commitAppId":"local-1764621844757","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1764621954954}] ``` Key Points: - ✅ All 3 partition events published successfully - ✅ partitionData: Contains partition column name and values (event_time_day: 2024-01-01, 2024-01-02, 2024-01-03) - ✅ commitAppId: "local-1764621844757" (populated) - ✅ commitAppName: "Spark shell" (populated) - ✅ commitOperation: "APPEND" (properly parsed) - ✅ Each event represents a different partition with correct metadata 3. executeWithTimingAsync (Parallel Execution) Log Output ``` 25/12/01 20:45:40 INFO spark.TableStatsCollectionSparkApp: Starting table stats collection for table: testdb.partition_stats_test 25/12/01 20:45:40 INFO spark.TableStatsCollectionSparkApp: Starting commit events collection for table: testdb.partition_stats_test 25/12/01 20:45:40 INFO spark.TableStatsCollectionSparkApp: Starting partition events collection for table: testdb.partition_stats_test 25/12/01 20:45:40 INFO util.TableStatsCollectorUtil: Collecting commit events for table: openhouse.testdb.partition_stats_test (all non-expired snapshots) 25/12/01 20:45:40 INFO util.TableStatsCollectorUtil: Collecting partition-level commit events for table: openhouse.testdb.partition_stats_test 25/12/01 20:45:54 INFO util.TableStatsCollectorUtil: Collected 3 commit events for table: openhouse.testdb.partition_stats_test 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: Completed commit events collection for table: testdb.partition_stats_test (3 events) in 13518 ms 25/12/01 20:45:54 INFO util.TableStatsCollectorUtil: Collected 3 partition-level commit events for table: openhouse.testdb.partition_stats_test 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: Completed partition events collection for table: testdb.partition_stats_test (3 partition events) in 14109 ms 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: Completed table stats collection for table: testdb.partition_stats_test in 14262 ms 25/12/01 20:45:54 INFO spark.TableStatsCollectionSparkApp: Total collection time for table: testdb.partition_stats_test in 14268 ms (parallel execution) ``` Key Points: - ✅ All three collections started in parallel (stats, commit events, partition events) - ✅ Commit events collection: 13.5 seconds (3 events collected) - ✅ Partition events collection: 14.1 seconds (3 partition events collected) - ✅ Table stats collection: 14.3 seconds - ✅ Total parallel execution time: 14.3 seconds (vs ~41s if sequential) - ✅ Job completed successfully: state SUCCEEDED ## Key Features: 1. One Row Per (Commit, Partition) Pair - Creates one CommitEventTablePartitions record for each unique (snapshot_id, partition) combination - Example: 1 commit affecting 3 partitions → 3 records 2. Parallel Execution - Runs simultaneously with table stats and commit events collection - ~2x performance improvement over sequential execution - Uses CompletableFuture for non-blocking parallel processing 3. Type-Safe Partition Data - Partition values stored as typed ColumnData objects: - LongColumnData for Integer/Long values (e.g., year=2024) - DoubleColumnData for Float/Double values - StringColumnData for String/Date/Timestamp values - Runtime type detection using instanceof checks 5. Robust Error Handling - ✅ Unpartitioned tables return empty list (no errors) - ✅ Null values logged and skipped - ✅ Unknown commit operations set to null with warning - ✅ Invalid partition values logged and skipped - ✅ Timestamp conversion handles both seconds and milliseconds 6. Stateless Design - Processes all active (non-expired) commit-partition pairs at every job run - No state tracking between runs (matches existing openhouseTableCommitEvents behavior) - Duplicates across partitions (same commit-partition pair in multiple event_timestamp partitions) - Deduplication handled at query time in downstream consumers (use DISTINCT or GROUP BY) ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: srawat <[email protected]>
1 parent 5bcfd4e commit b450bf7

File tree

6 files changed

+882
-3
lines changed

6 files changed

+882
-3
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.google.gson.Gson;
66
import com.linkedin.openhouse.common.metrics.OtelEmitter;
77
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
8+
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
89
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
910
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
1011
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
@@ -564,4 +565,29 @@ public List<CommitEventTable> collectCommitEventTable(String fqtn) {
564565
return Collections.emptyList();
565566
}
566567
}
568+
569+
/**
570+
* Collect partition-level commit events for a given fully-qualified table name.
571+
*
572+
* <p>Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned
573+
* tables or errors.
574+
*
575+
* @param fqtn fully-qualified table name
576+
* @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish
577+
* time)
578+
*/
579+
public List<CommitEventTablePartitions> collectCommitEventTablePartitions(String fqtn) {
580+
Table table = getTable(fqtn);
581+
582+
try {
583+
TableStatsCollector tableStatsCollector = new TableStatsCollector(fs(), spark, table);
584+
return tableStatsCollector.collectCommitEventTablePartitions();
585+
} catch (IOException e) {
586+
log.error("Unable to initialize file system for partition events collection", e);
587+
return Collections.emptyList();
588+
} catch (Exception e) {
589+
log.error("Failed to collect partition events for table: {}", fqtn, e);
590+
return Collections.emptyList();
591+
}
592+
}
567593
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
55
import com.linkedin.openhouse.common.metrics.OtelEmitter;
66
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
7+
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
78
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
89
import com.linkedin.openhouse.jobs.spark.state.StateManager;
910
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
@@ -34,7 +35,7 @@ public TableStatsCollectionSparkApp(
3435
protected void runInner(Operations ops) {
3536
log.info("Running TableStatsCollectorApp for table {}", fqtn);
3637

37-
// Run stats collection and commit events collection in parallel
38+
// Run stats collection, commit events collection, and partition events collection in parallel
3839
long startTime = System.currentTimeMillis();
3940

4041
CompletableFuture<IcebergTableStats> statsFuture =
@@ -49,8 +50,14 @@ protected void runInner(Operations ops) {
4950
() -> ops.collectCommitEventTable(fqtn),
5051
result -> String.format("%s (%d events)", fqtn, result.size()));
5152

52-
// Wait for both to complete
53-
CompletableFuture.allOf(statsFuture, commitEventsFuture).join();
53+
CompletableFuture<List<CommitEventTablePartitions>> partitionEventsFuture =
54+
executeWithTimingAsync(
55+
"partition events collection",
56+
() -> ops.collectCommitEventTablePartitions(fqtn),
57+
result -> String.format("%s (%d partition events)", fqtn, result.size()));
58+
59+
// Wait for all three to complete
60+
CompletableFuture.allOf(statsFuture, commitEventsFuture, partitionEventsFuture).join();
5461

5562
long endTime = System.currentTimeMillis();
5663
log.info(
@@ -74,6 +81,16 @@ protected void runInner(Operations ops) {
7481
"Skipping commit events publishing for table: {} due to collection failure or no events",
7582
fqtn);
7683
}
84+
85+
List<CommitEventTablePartitions> partitionEvents = partitionEventsFuture.join();
86+
if (partitionEvents != null && !partitionEvents.isEmpty()) {
87+
publishPartitionEvents(partitionEvents);
88+
} else {
89+
log.info(
90+
"Skipping partition events publishing for table: {} "
91+
+ "(unpartitioned table or collection failure or no events)",
92+
fqtn);
93+
}
7794
}
7895

7996
/**
@@ -100,6 +117,20 @@ protected void publishCommitEvents(List<CommitEventTable> commitEvents) {
100117
log.info(new Gson().toJson(commitEvents));
101118
}
102119

120+
/**
121+
* Publish partition-level commit events. Override this method in li-openhouse to send to Kafka.
122+
*
123+
* @param partitionEvents List of partition events to publish
124+
*/
125+
protected void publishPartitionEvents(List<CommitEventTablePartitions> partitionEvents) {
126+
// Set event timestamp at publish time
127+
long eventTimestampInEpochMs = System.currentTimeMillis();
128+
partitionEvents.forEach(event -> event.setEventTimestampMs(eventTimestampInEpochMs));
129+
130+
log.info("Publishing partition events for table: {}", fqtn);
131+
log.info(new Gson().toJson(partitionEvents));
132+
}
133+
103134
public static void main(String[] args) {
104135
OtelEmitter otelEmitter =
105136
new AppsOtelEmitter(Arrays.asList(DefaultOtelConfig.getOpenTelemetry()));

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollector.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.openhouse.jobs.util;
22

33
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
4+
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
45
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
56
import java.util.List;
67
import lombok.AllArgsConstructor;
@@ -47,4 +48,25 @@ public IcebergTableStats collectTableStats() {
4748
public List<CommitEventTable> collectCommitEventTable() {
4849
return TableStatsCollectorUtil.populateCommitEventTable(table, spark);
4950
}
51+
52+
/**
53+
* Collect partition-level commit events for the table.
54+
*
55+
* <p>Returns one record per (commit_id, partition) pair. Returns empty list for unpartitioned
56+
* tables.
57+
*
58+
* <p>Note: Returns List (loads into memory). Size is manageable due to:
59+
*
60+
* <ul>
61+
* <li>Iceberg retention limits active snapshots (~1-10k per table)
62+
* <li>Typical partitions per commit: 10-1000
63+
* <li>Typical size: 100K rows × 200 bytes = 20MB
64+
* </ul>
65+
*
66+
* @return List of CommitEventTablePartitions objects (event_timestamp_ms will be set at publish
67+
* time)
68+
*/
69+
public List<CommitEventTablePartitions> collectCommitEventTablePartitions() {
70+
return TableStatsCollectorUtil.populateCommitEventTablePartitions(table, spark);
71+
}
5072
}

0 commit comments

Comments
 (0)