Skip to content

Commit 7af2b66

Browse files
srawat98-devsrawatteamurkosumedhsakdeo
authored
Add IcebergCommitEventStats model for capturing commit events (#390)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> [DTQ-6354](https://linkedin.atlassian.net/browse/DTQ-6354) Briefly discuss the summary of the changes made in this pull request in 2-3 lines. Implemented a Base class for all commit event statistics. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: srawat <[email protected]> Co-authored-by: Stas Pak <[email protected]> Co-authored-by: Sumedh Sakdeo <[email protected]>
1 parent ff847c3 commit 7af2b66

File tree

7 files changed

+331
-0
lines changed

7 files changed

+331
-0
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
import lombok.NonNull;
7+
import lombok.experimental.SuperBuilder;
8+
9+
/** Container for base model classes used in the dataset event hierarchy. */
10+
public final class BaseEventModels {
11+
12+
private BaseEventModels() {
13+
// Utility class - no instantiation
14+
}
15+
16+
/** Base class containing table identification metadata. */
17+
@Data
18+
@SuperBuilder
19+
@NoArgsConstructor
20+
@AllArgsConstructor
21+
public static class BaseTableIdentifier {
22+
23+
/** Name of the database for the dataset */
24+
@NonNull private String databaseName;
25+
26+
/** Name of the table for the dataset */
27+
@NonNull private String tableName;
28+
29+
/** Name of the cluster (e.g., holdem/war) */
30+
@NonNull private String clusterName;
31+
32+
/**
33+
* Fully qualified path to the root table metadata JSON. Useful to distinguish uniqueness in
34+
* case the table is dropped and recreated with the same name.
35+
*/
36+
@NonNull private String tableMetadataLocation;
37+
38+
/**
39+
* String representation of the Iceberg PartitionSpec for the dataset.
40+
*
41+
* <p>Use the string from PartitionSpec.toString() to describe partitioning. For non-partitioned
42+
* tables, use PartitionSpec.UNPARTITIONED_SPEC.toString(). See:
43+
* https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/PartitionSpec.java
44+
*/
45+
@NonNull private String partitionSpec;
46+
}
47+
48+
/** Base class for commit events that contains dataset information. */
49+
@Data
50+
@SuperBuilder
51+
@NoArgsConstructor
52+
@AllArgsConstructor
53+
public abstract static class BaseCommitEvent {
54+
55+
/** Dataset information for this commit event */
56+
@NonNull private BaseTableIdentifier dataset;
57+
58+
/** commit Metadata for this commit */
59+
@NonNull private CommitMetadata commitMetadata;
60+
61+
/**
62+
* Timestamp (in epoch milliseconds) representing when the collector job processed and ingested
63+
* the corresponding event.
64+
*/
65+
@NonNull private Long eventTimestampMs;
66+
}
67+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
import lombok.NonNull;
8+
9+
/**
10+
* Column-level data interface for type-safe values used in events and stats.
11+
*
12+
* <p>Implementations encapsulate a specific value type to provide compile-time safety.
13+
*/
14+
public interface ColumnData {
15+
/** Returns the column name this data applies to. */
16+
String getColumnName();
17+
18+
/** Returns the underlying value. */
19+
Object getValue();
20+
21+
/** Long-valued column data for counts and sizes in bytes. */
22+
@Data
23+
@Builder
24+
@NoArgsConstructor
25+
@AllArgsConstructor
26+
class LongColumnData implements ColumnData {
27+
@NonNull private String columnName;
28+
@NonNull private Long value;
29+
}
30+
31+
/** String-valued column data for min/max of strings, dates, and timestamps. */
32+
@Data
33+
@Builder
34+
@NoArgsConstructor
35+
@AllArgsConstructor
36+
class StringColumnData implements ColumnData {
37+
@NonNull private String columnName;
38+
@NonNull private String value;
39+
}
40+
41+
/** Double-valued column data for floating-point min/max values. */
42+
@Data
43+
@Builder
44+
@NoArgsConstructor
45+
@AllArgsConstructor
46+
class DoubleColumnData implements ColumnData {
47+
@NonNull private String columnName;
48+
@NonNull private Double value;
49+
}
50+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import lombok.Data;
4+
import lombok.EqualsAndHashCode;
5+
import lombok.NoArgsConstructor;
6+
import lombok.experimental.SuperBuilder;
7+
8+
/**
9+
* Data model for openhouseTableCommitEvents table.
10+
*
11+
* <p>Stores commit-level metadata for dataset changes. Each record represents a commit on table and
12+
* can be linked to multiple partition events via commitId.
13+
*
14+
* <p><b>Cardinality</b>: One commit event can have N partition events. See {@link
15+
* CommitEventTablePartitions} for partition-level details.
16+
*
17+
* @see CommitEventTablePartitions
18+
*/
19+
@Data
20+
@SuperBuilder
21+
@NoArgsConstructor
22+
@EqualsAndHashCode(callSuper = true)
23+
public class CommitEventTable extends BaseEventModels.BaseCommitEvent {
24+
// All fields inherited from BaseCommitEvent and BaseTableIdentifier
25+
// datasetType is available via dataset.datasetType
26+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import java.util.List;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Data;
6+
import lombok.EqualsAndHashCode;
7+
import lombok.NoArgsConstructor;
8+
import lombok.experimental.SuperBuilder;
9+
10+
/**
11+
* Data model for openhouseCommitEventTablePartitionStats table.
12+
*
13+
* <p>Stores partition-level metadata and statistics such as null count, NaN count, row count, and
14+
* min/max values. Can represent both partition-level and table-level statistics.
15+
*
16+
* <p><b>Cardinality</b>: Each partition stats record references the latest commit that modified the
17+
* partition via commitMetadata.commitId (Foreign Key to {@link CommitEventTable}).
18+
*
19+
* <p>Stats are updated/replaced when new commits modify the partition.
20+
*
21+
* @see CommitEventTable
22+
* @see CommitEventTablePartitions
23+
* @see CommitMetadata
24+
*/
25+
@Data
26+
@SuperBuilder
27+
@NoArgsConstructor
28+
@AllArgsConstructor
29+
@EqualsAndHashCode(callSuper = true)
30+
public class CommitEventTablePartitionStats extends BaseEventModels.BaseCommitEvent {
31+
32+
/**
33+
* List of per-partition column values associated with the statistics.
34+
*
35+
* <p>Each element carries the partition column name and its typed value using {@link ColumnData}
36+
* implementations. The list order should align with the partition spec order for determinism.
37+
*
38+
* <p>Can be null if the statistics is on a table level (non-partitioned table or table-level
39+
* aggregates).
40+
*
41+
* <p>Example for non-null partition: [ new ColumnData.StringColumnData("datepartition",
42+
* "2025-01-25"), new ColumnData.StringColumnData("hourpartition", "12") ]
43+
*/
44+
private List<ColumnData> partitionData;
45+
46+
/**
47+
* Total number of rows corresponding to the given partition specification if partitionData is not
48+
* null; otherwise, the row count for the entire table.
49+
*/
50+
private Long rowCount;
51+
52+
/**
53+
* Total number of columns corresponding to the given partition specification if partitionData is
54+
* not null; otherwise, the column count for the entire table.
55+
*/
56+
private Long columnCount;
57+
58+
/**
59+
* Stores null count statistics for each column in the dataset. Each element represents a column
60+
* name and its corresponding number of null values.
61+
*/
62+
private List<ColumnData> nullCount;
63+
64+
/**
65+
* Stores NaN count statistics for each column in the dataset. Each element represents a column
66+
* name and its corresponding number of NaN values.
67+
*
68+
* <p>Applicable only to numeric data types such as float, double, or decimal.
69+
*/
70+
private List<ColumnData> nanCount;
71+
72+
/**
73+
* Stores minimum value statistics for each column in the dataset. Each element represents a
74+
* column name and its corresponding minimum value.
75+
*/
76+
private List<ColumnData> minValue;
77+
78+
/**
79+
* Stores maximum value statistics for each column in the dataset. Each element represents a
80+
* column name and its corresponding maximum value.
81+
*/
82+
private List<ColumnData> maxValue;
83+
84+
/**
85+
* Stores column size in bytes statistics for each column in the dataset. Each element represents
86+
* a column name and its corresponding size in bytes.
87+
*/
88+
private List<ColumnData> columnSizeInBytes;
89+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import java.util.List;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Data;
6+
import lombok.EqualsAndHashCode;
7+
import lombok.NoArgsConstructor;
8+
import lombok.NonNull;
9+
import lombok.experimental.SuperBuilder;
10+
11+
/**
12+
* Data model for openhouseTableCommitEventPartitions table.
13+
*
14+
* <p>Stores partition-level information for each commit. Maps commit events to specific partitions
15+
* affected by that commit. One commit can correspond to multiple partition records. A commit event
16+
* represents a single commit-partition pair, capturing what changed for that specific partition in
17+
* that specific commit
18+
*
19+
* <p><b>Naming</b>: Represents "partitions of a commit event". This aligns with the conceptual
20+
* model where CommitEvent has a "partitions" field that's been normalized into a separate table.
21+
*
22+
* <p><b>Cardinality</b>: N partition records linked to 1 commit event via commitId foreign key.
23+
*
24+
* @see CommitEventTable
25+
*/
26+
@Data
27+
@SuperBuilder
28+
@NoArgsConstructor
29+
@AllArgsConstructor
30+
@EqualsAndHashCode(callSuper = true)
31+
public class CommitEventTablePartitions extends BaseEventModels.BaseCommitEvent {
32+
33+
/**
34+
* List of per-partition column values associated with the commit.
35+
*
36+
* <p>Each element carries the partition column name and its typed value using {@link ColumnData}
37+
* implementations. The list order should align with the partition spec order for determinism.
38+
*
39+
* <p>Example: [ new ColumnData.StringColumnData("datepartition", "2025-01-25"), new
40+
* ColumnData.StringColumnData("hourpartition", "12") ]
41+
*/
42+
@NonNull private List<ColumnData> partitionData;
43+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
import lombok.NonNull;
8+
9+
/** Standalone class representing commit metadata. */
10+
@Data
11+
@Builder
12+
@NoArgsConstructor
13+
@AllArgsConstructor
14+
public class CommitMetadata {
15+
16+
/** Unique identifier for the commit event */
17+
@NonNull private Long commitId;
18+
19+
/** Timestamp of the commit event captured in epoch milliseconds */
20+
@NonNull private Long commitTimestampMs;
21+
22+
/**
23+
* Unique application identifier (e.g., Spark Application ID) associated with the process or job
24+
* that performed the commit
25+
*/
26+
private String commitAppId;
27+
28+
/**
29+
* Descriptive name of the application or job that executed the commit. Helps in identifying the
30+
* pipeline or workflow responsible for the data change.
31+
*/
32+
private String commitAppName;
33+
34+
/** Type of operation performed during the commit (e.g., APPEND, OVERWRITE, DELETE, REPLACE) */
35+
private CommitOperation commitOperation;
36+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.linkedin.openhouse.common.stats.model;
2+
3+
/**
4+
* Enum representing the type of operation performed during a dataset commit.
5+
*
6+
* <p>Defines the possible modification operations that can be applied to a dataset.
7+
*/
8+
public enum CommitOperation {
9+
/** Append new data to the dataset */
10+
APPEND,
11+
12+
/** Overwrite existing data in the dataset */
13+
OVERWRITE,
14+
15+
/** Delete data from the dataset */
16+
DELETE,
17+
18+
/** Replace data in the dataset */
19+
REPLACE
20+
}

0 commit comments

Comments
 (0)