Skip to content

Commit da20880

Browse files
authored
Adding historyConfig to table metadata for maintenance jobs (#356)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> HistoryConfig is supposed to be passed as params to SnapshotsExpirationJob in order to control the number of snapshots. However, the the config properties are not propagated as params, resulting in SE job running with default snapshot retention. Fix: Add historyConfig to metadata which is passed to SE job. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [x] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done Added unit tests. Local testing [In progress] Stage env testing [after deployment] <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent e138a2c commit da20880

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
55
import com.linkedin.openhouse.jobs.util.DatabaseTableFilter;
66
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
7+
import com.linkedin.openhouse.jobs.util.HistoryConfig;
78
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
89
import com.linkedin.openhouse.jobs.util.RetentionConfig;
910
import com.linkedin.openhouse.jobs.util.RetryUtil;
@@ -15,6 +16,7 @@
1516
import com.linkedin.openhouse.tables.client.model.GetAllTablesResponseBody;
1617
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
1718
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
19+
import com.linkedin.openhouse.tables.client.model.History;
1820
import com.linkedin.openhouse.tables.client.model.Policies;
1921
import com.linkedin.openhouse.tables.client.model.Replication;
2022
import java.time.Duration;
@@ -66,6 +68,11 @@ public Optional<List<ReplicationConfig>> getTableReplication(TableMetadata table
6668
return getTableReplication(response);
6769
}
6870

71+
public Optional<HistoryConfig> getTableHistory(TableMetadata tableMetadata) {
72+
GetTableResponseBody response = getTable(tableMetadata);
73+
return getTableHistory(response);
74+
}
75+
6976
private Optional<RetentionConfig> getTableRetention(GetTableResponseBody response) {
7077
// timePartitionSpec or retention.ColumnPattern should be present to run Retention job on a
7178
// table.
@@ -96,6 +103,31 @@ private Optional<RetentionConfig> getTableRetention(GetTableResponseBody respons
96103
.build());
97104
}
98105

106+
private Optional<HistoryConfig> getTableHistory(GetTableResponseBody response) {
107+
if (response == null
108+
|| response.getPolicies() == null
109+
|| response.getPolicies().getHistory() == null) {
110+
return Optional.empty();
111+
}
112+
History history = response.getPolicies().getHistory();
113+
114+
// Validate that at least one of maxAge/granularity or versions is configured
115+
boolean hasTimeBasedHistory =
116+
history.getMaxAge() != null && history.getMaxAge() > 0 && history.getGranularity() != null;
117+
boolean hasVersionBasedHistory = history.getVersions() != null && history.getVersions() > 0;
118+
119+
if (!hasTimeBasedHistory && !hasVersionBasedHistory) {
120+
return Optional.empty();
121+
}
122+
123+
return Optional.of(
124+
HistoryConfig.builder()
125+
.maxAge(hasTimeBasedHistory ? history.getMaxAge() : 0)
126+
.granularity(hasTimeBasedHistory ? history.getGranularity() : null)
127+
.versions(hasVersionBasedHistory ? history.getVersions() : 0)
128+
.build());
129+
}
130+
99131
private Optional<List<ReplicationConfig>> getTableReplication(GetTableResponseBody response) {
100132
// At least one replication config must be present
101133
if (response == null
@@ -360,6 +392,7 @@ public Optional<TableMetadata> mapTableResponseToTableMetadata(
360392
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
361393
.isClustered(tableResponseBody.getClustering() != null)
362394
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
395+
.historyConfig(getTableHistory(tableResponseBody).orElse(null))
363396
.replicationConfig(getTableReplication(tableResponseBody).orElse(null))
364397
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody))
365398
.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
@@ -391,6 +424,7 @@ public List<TableDataLayoutMetadata> mapTableResponseToTableDataLayoutMetadataLi
391424
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
392425
.isClustered(tableResponseBody.getClustering() != null)
393426
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
427+
.historyConfig(getTableHistory(tableResponseBody).orElse(null))
394428
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody))
395429
.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
396430
List<TableDataLayoutMetadata> result = new ArrayList<>();

apps/spark/src/test/java/com/linkedin/openhouse/jobs/client/TablesClientTest.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
77
import com.linkedin.openhouse.jobs.util.DatabaseTableFilter;
88
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
9+
import com.linkedin.openhouse.jobs.util.HistoryConfig;
910
import com.linkedin.openhouse.jobs.util.RetentionConfig;
1011
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
1112
import com.linkedin.openhouse.jobs.util.TableMetadata;
@@ -16,6 +17,7 @@
1617
import com.linkedin.openhouse.tables.client.model.GetAllTablesResponseBody;
1718
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
1819
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
20+
import com.linkedin.openhouse.tables.client.model.History;
1921
import com.linkedin.openhouse.tables.client.model.Policies;
2022
import com.linkedin.openhouse.tables.client.model.Replication;
2123
import com.linkedin.openhouse.tables.client.model.ReplicationConfig;
@@ -507,6 +509,59 @@ private TableDataLayoutMetadata createTableDataLayoutMetadataMock() {
507509
return metadata;
508510
}
509511

512+
void testGetTableHistoryWithTimeBasedPolicy() {
513+
GetTableResponseBody responseBody =
514+
createTableWithTimeBasedHistoryMock(
515+
testDbName, testTableName, 30, History.GranularityEnum.DAY, null);
516+
Mono<GetTableResponseBody> responseMock = (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
517+
Mockito.when(responseMock.block(any(Duration.class))).thenReturn(responseBody);
518+
Mockito.when(apiMock.getTableV1(testDbName, testTableName)).thenReturn(responseMock);
519+
520+
Optional<HistoryConfig> result =
521+
client.getTableHistory(
522+
TableMetadata.builder().dbName(testDbName).tableName(testTableName).build());
523+
524+
Assertions.assertTrue(result.isPresent());
525+
HistoryConfig historyConfig = result.get();
526+
Assertions.assertEquals(30, historyConfig.getMaxAge());
527+
Assertions.assertEquals(History.GranularityEnum.DAY, historyConfig.getGranularity());
528+
Assertions.assertEquals(0, historyConfig.getVersions());
529+
}
530+
531+
@Test
532+
void testGetTableHistoryWithVersionBasedPolicy() {
533+
GetTableResponseBody responseBody =
534+
createTableWithVersionBasedHistoryMock(testDbName, testTableName, 5);
535+
Mono<GetTableResponseBody> responseMock = (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
536+
Mockito.when(responseMock.block(any(Duration.class))).thenReturn(responseBody);
537+
Mockito.when(apiMock.getTableV1(testDbName, testTableName)).thenReturn(responseMock);
538+
539+
Optional<HistoryConfig> result =
540+
client.getTableHistory(
541+
TableMetadata.builder().dbName(testDbName).tableName(testTableName).build());
542+
543+
Assertions.assertTrue(result.isPresent());
544+
HistoryConfig historyConfig = result.get();
545+
Assertions.assertEquals(0, historyConfig.getMaxAge());
546+
Assertions.assertNull(historyConfig.getGranularity());
547+
Assertions.assertEquals(5, historyConfig.getVersions());
548+
}
549+
550+
@Test
551+
void testGetTableHistoryWithInvalidPolicy() {
552+
GetTableResponseBody responseBody =
553+
createTableWithInvalidHistoryMock(testDbName, testTableName);
554+
Mono<GetTableResponseBody> responseMock = (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
555+
Mockito.when(responseMock.block(any(Duration.class))).thenReturn(responseBody);
556+
Mockito.when(apiMock.getTableV1(testDbName, testTableName)).thenReturn(responseMock);
557+
558+
Optional<HistoryConfig> result =
559+
client.getTableHistory(
560+
TableMetadata.builder().dbName(testDbName).tableName(testTableName).build());
561+
562+
Assertions.assertFalse(result.isPresent());
563+
}
564+
510565
private GetTableResponseBody createTableResponseBodyMock(String dbName, String tableName) {
511566
GetTableResponseBody responseBody = Mockito.mock(GetTableResponseBody.class);
512567
Mockito.when(responseBody.getTableId()).thenReturn(tableName);
@@ -726,4 +781,58 @@ private GetTableResponseBody createPartitionedReplicaTableResponseBodyMock(
726781
.thenReturn(GetTableResponseBody.TableTypeEnum.REPLICA_TABLE);
727782
return responseBody;
728783
}
784+
785+
private GetTableResponseBody createTableWithTimeBasedHistoryMock(
786+
String dbName,
787+
String tableName,
788+
int maxAge,
789+
History.GranularityEnum granularity,
790+
Integer versions) {
791+
GetTableResponseBody responseBody = Mockito.mock(GetTableResponseBody.class);
792+
Policies policies = Mockito.mock(Policies.class);
793+
History history = Mockito.mock(History.class);
794+
795+
Mockito.when(responseBody.getDatabaseId()).thenReturn(dbName);
796+
Mockito.when(responseBody.getTableId()).thenReturn(tableName);
797+
Mockito.when(responseBody.getPolicies()).thenReturn(policies);
798+
Mockito.when(policies.getHistory()).thenReturn(history);
799+
Mockito.when(history.getMaxAge()).thenReturn(maxAge);
800+
Mockito.when(history.getGranularity()).thenReturn(granularity);
801+
Mockito.when(history.getVersions()).thenReturn(versions);
802+
803+
return responseBody;
804+
}
805+
806+
private GetTableResponseBody createTableWithVersionBasedHistoryMock(
807+
String dbName, String tableName, int versions) {
808+
GetTableResponseBody responseBody = Mockito.mock(GetTableResponseBody.class);
809+
Policies policies = Mockito.mock(Policies.class);
810+
History history = Mockito.mock(History.class);
811+
812+
Mockito.when(responseBody.getDatabaseId()).thenReturn(dbName);
813+
Mockito.when(responseBody.getTableId()).thenReturn(tableName);
814+
Mockito.when(responseBody.getPolicies()).thenReturn(policies);
815+
Mockito.when(policies.getHistory()).thenReturn(history);
816+
Mockito.when(history.getMaxAge()).thenReturn(null);
817+
Mockito.when(history.getGranularity()).thenReturn(null);
818+
Mockito.when(history.getVersions()).thenReturn(versions);
819+
820+
return responseBody;
821+
}
822+
823+
private GetTableResponseBody createTableWithInvalidHistoryMock(String dbName, String tableName) {
824+
GetTableResponseBody responseBody = Mockito.mock(GetTableResponseBody.class);
825+
Policies policies = Mockito.mock(Policies.class);
826+
History history = Mockito.mock(History.class);
827+
828+
Mockito.when(responseBody.getDatabaseId()).thenReturn(dbName);
829+
Mockito.when(responseBody.getTableId()).thenReturn(tableName);
830+
Mockito.when(responseBody.getPolicies()).thenReturn(policies);
831+
Mockito.when(policies.getHistory()).thenReturn(history);
832+
Mockito.when(history.getMaxAge()).thenReturn(0);
833+
Mockito.when(history.getGranularity()).thenReturn(null);
834+
Mockito.when(history.getVersions()).thenReturn(0);
835+
836+
return responseBody;
837+
}
729838
}

0 commit comments

Comments
 (0)