Skip to content

Commit fd5d9ea

Browse files
authored
Config backup dir in jobs.yaml for soft retention (#408)
## Summary This PR is a follow up of soft delete in Retention app and ODF app which does the following: 1. Config backup dir in jobs.yaml instead of table properties. 2. Expose fully qualified backup dir in table properties. 3. Append timestamp to the data_manifest.json file. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [x] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. Updated unit tests. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent 14fe460 commit fd5d9ea

File tree

6 files changed

+69
-30
lines changed

6 files changed

+69
-30
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
88
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
99
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
10+
import com.linkedin.openhouse.jobs.util.AppConstants;
1011
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
1112
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
1213
import java.io.IOException;
@@ -252,6 +253,7 @@ public void expireSnapshots(Table table, int maxAge, String granularity, int ver
252253
* @param count granularity count representing retention timeline for @fqtn records
253254
* @param backupEnabled flag to indicate if backup manifests need to be created for data files
254255
* @param backupDir backup directory under which data manifests are created
256+
* @param now current timestamp to be used for retention calculation
255257
*/
256258
public void runRetention(
257259
String fqtn,
@@ -260,13 +262,14 @@ public void runRetention(
260262
String granularity,
261263
int count,
262264
boolean backupEnabled,
263-
String backupDir) {
264-
ZonedDateTime now = ZonedDateTime.now();
265+
String backupDir,
266+
ZonedDateTime now) {
265267
if (backupEnabled) {
266268
// Cache of manifests: partitionPath -> list of data file path
267269
Map<String, List<String>> manifestCache =
268270
prepareBackupDataManifests(fqtn, columnName, columnPattern, granularity, count, now);
269-
writeBackupDataManifests(manifestCache, getTable(fqtn), backupDir);
271+
writeBackupDataManifests(manifestCache, getTable(fqtn), backupDir, now);
272+
exposeBackupLocation(getTable(fqtn), backupDir);
270273
}
271274
final String statement =
272275
SparkJobUtil.createDeleteStatement(
@@ -302,7 +305,7 @@ private Map<String, List<String>> prepareBackupDataManifests(
302305
}
303306

304307
private void writeBackupDataManifests(
305-
Map<String, List<String>> manifestCache, Table table, String backupDir) {
308+
Map<String, List<String>> manifestCache, Table table, String backupDir, ZonedDateTime now) {
306309
for (String partitionPath : manifestCache.keySet()) {
307310
List<String> files = manifestCache.get(partitionPath);
308311
List<String> backupFiles =
@@ -314,8 +317,9 @@ private void writeBackupDataManifests(
314317
jsonMap.put("files", backupFiles);
315318
String jsonStr = new Gson().toJson(jsonMap);
316319
// Create data_manifest.json
320+
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
317321
Path destPath =
318-
getTrashPath(table, new Path(partitionPath, "data_manifest.json").toString(), backupDir);
322+
getTrashPath(table, new Path(partitionPath, manifestName).toString(), backupDir);
319323
try {
320324
final FileSystem fs = fs();
321325
if (!fs.exists(destPath.getParent())) {
@@ -331,6 +335,14 @@ private void writeBackupDataManifests(
331335
}
332336
}
333337

338+
private void exposeBackupLocation(Table table, String backupDir) {
339+
Path fullyQualifiedBackupDir = new Path(table.location(), backupDir);
340+
spark.sql(
341+
String.format(
342+
"ALTER TABLE %s SET TBLPROPERTIES ('%s'='%s')",
343+
table.name(), AppConstants.BACKUP_DIR_KEY, fullyQualifiedBackupDir));
344+
}
345+
334346
private Path getTrashPath(String path, String filePath, String trashDir) {
335347
return new Path(filePath.replace(path, new Path(path, trashDir).toString()));
336348
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/OrphanFilesDeletionSparkApp.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@
2828
@Slf4j
2929
public class OrphanFilesDeletionSparkApp extends BaseTableSparkApp {
3030
private final long ttlSeconds;
31+
private final String backupDir;
3132

3233
public OrphanFilesDeletionSparkApp(
3334
String jobId,
3435
StateManager stateManager,
3536
String fqtn,
3637
long ttlSeconds,
37-
OtelEmitter otelEmitter) {
38+
OtelEmitter otelEmitter,
39+
String backupDir) {
3840
super(jobId, stateManager, fqtn, otelEmitter);
3941
this.ttlSeconds = ttlSeconds;
42+
this.backupDir = backupDir;
4043
}
4144

4245
@Override
@@ -46,8 +49,7 @@ protected void runInner(Operations ops) {
4649
Table table = ops.getTable(fqtn);
4750
boolean backupEnabled =
4851
Boolean.parseBoolean(
49-
table.properties().getOrDefault(RetentionSparkApp.BACKUP_ENABLED_KEY, "false"));
50-
String backupDir = table.properties().getOrDefault(RetentionSparkApp.BACKUP_DIR_KEY, ".backup");
52+
table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false"));
5153
log.info(
5254
"Orphan files deletion app start for table={} with olderThanTimestampMillis={} backupEnabled={} and backupDir={}",
5355
fqtn,
@@ -89,6 +91,7 @@ public static OrphanFilesDeletionSparkApp createApp(String[] args, OtelEmitter o
8991
extraOptions.add(
9092
new Option(
9193
"s", "skipStaging", false, "Whether to skip staging orphan files before deletion"));
94+
extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
9295
CommandLine cmdLine = createCommandLine(args, extraOptions);
9396
return new OrphanFilesDeletionSparkApp(
9497
getJobId(cmdLine),
@@ -97,6 +100,7 @@ public static OrphanFilesDeletionSparkApp createApp(String[] args, OtelEmitter o
97100
Math.max(
98101
NumberUtils.toLong(cmdLine.getOptionValue("ttl"), TimeUnit.DAYS.toSeconds(7)),
99102
TimeUnit.DAYS.toSeconds(1)),
100-
otelEmitter);
103+
otelEmitter,
104+
cmdLine.getOptionValue("backupDir", ".backup"));
101105
}
102106
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/RetentionSparkApp.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
44
import com.linkedin.openhouse.common.metrics.OtelEmitter;
55
import com.linkedin.openhouse.jobs.spark.state.StateManager;
6+
import com.linkedin.openhouse.jobs.util.AppConstants;
67
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
8+
import java.time.ZonedDateTime;
79
import java.util.ArrayList;
810
import java.util.Arrays;
911
import java.util.List;
@@ -23,12 +25,11 @@
2325
*/
2426
@Slf4j
2527
public class RetentionSparkApp extends BaseTableSparkApp {
26-
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
27-
public static final String BACKUP_DIR_KEY = "retention.backup.dir";
2828
private final String columnName;
2929
private final String columnPattern;
3030
private final String granularity;
3131
private final int count;
32+
private final String backupDir;
3233

3334
public RetentionSparkApp(
3435
String jobId,
@@ -38,30 +39,35 @@ public RetentionSparkApp(
3839
String columnPattern,
3940
String granularity,
4041
int count,
41-
OtelEmitter otelEmitter) {
42+
OtelEmitter otelEmitter,
43+
String backupDir) {
4244
super(jobId, stateManager, fqtn, otelEmitter);
4345
this.columnName = columnName;
4446
this.columnPattern = columnPattern;
4547
this.granularity = granularity;
4648
this.count = count;
49+
this.backupDir = backupDir;
4750
}
4851

4952
@Override
5053
protected void runInner(Operations ops) {
5154
Table table = ops.getTable(fqtn);
5255
boolean backupEnabled =
53-
Boolean.parseBoolean(table.properties().getOrDefault(BACKUP_ENABLED_KEY, "false"));
54-
String backupDir = table.properties().getOrDefault(BACKUP_DIR_KEY, ".backup");
56+
Boolean.parseBoolean(
57+
table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false"));
58+
ZonedDateTime now = ZonedDateTime.now();
5559
log.info(
56-
"Retention app start for table {}, column {}, {}, ttl={} {}s, backupEnabled={}, backupDir={}",
60+
"Retention app start for table {}, column {}, {}, ttl={} {}s, backupEnabled={}, backupDir={}, ts={}",
5761
fqtn,
5862
columnName,
5963
columnPattern,
6064
count,
6165
granularity,
6266
backupEnabled,
63-
backupDir);
64-
ops.runRetention(fqtn, columnName, columnPattern, granularity, count, backupEnabled, backupDir);
67+
backupDir,
68+
now);
69+
ops.runRetention(
70+
fqtn, columnName, columnPattern, granularity, count, backupEnabled, backupDir, now);
6571
}
6672

6773
public static void main(String[] args) {
@@ -77,6 +83,7 @@ public static RetentionSparkApp createApp(String[] args, OtelEmitter otelEmitter
7783
extraOptions.add(new Option("cp", "columnPattern", true, "Retention column pattern"));
7884
extraOptions.add(new Option("g", "granularity", true, "Granularity: day, week"));
7985
extraOptions.add(new Option("c", "count", true, "Retain last <count> <granularity>s"));
86+
extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
8087
CommandLine cmdLine = createCommandLine(args, extraOptions);
8188
return new RetentionSparkApp(
8289
getJobId(cmdLine),
@@ -86,6 +93,7 @@ public static RetentionSparkApp createApp(String[] args, OtelEmitter otelEmitter
8693
cmdLine.getOptionValue("columnPattern", ""),
8794
cmdLine.getOptionValue("granularity"),
8895
Integer.parseInt(cmdLine.getOptionValue("count")),
89-
otelEmitter);
96+
otelEmitter,
97+
cmdLine.getOptionValue("backupDir", ".backup"));
9098
}
9199
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,9 @@ public final class AppConstants {
5050
public static final String QUEUED_TIME = "queued_time";
5151
public static final String DATABASE_NAME = "database_name";
5252

53+
// Maintenance jobs table properties keys
54+
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
55+
public static final String BACKUP_DIR_KEY = "retention.backup.dir";
56+
5357
private AppConstants() {}
5458
}

apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
88
import com.linkedin.openhouse.common.metrics.OtelEmitter;
99
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
10+
import com.linkedin.openhouse.jobs.util.AppConstants;
1011
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
1112
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
1213
import com.linkedin.openhouse.tables.client.model.Policies;
@@ -56,7 +57,7 @@ public void testRetentionSparkApp() throws Exception {
5657
prepareTableWithRetentionAndSharingPolicies(ops, tableName, "1d", true);
5758
populateTable(ops, tableName, 3);
5859
populateTable(ops, tableName, 2, 2);
59-
ops.runRetention(tableName, "ts", "", "day", 1, false, "");
60+
ops.runRetention(tableName, "ts", "", "day", 1, false, "", ZonedDateTime.now());
6061
verifyRowCount(ops, tableName, 3);
6162
verifyPolicies(ops, tableName, 1, Retention.GranularityEnum.DAY, true);
6263
}
@@ -157,7 +158,7 @@ private void runRetentionJobWithStringPartitionColumns(
157158
String granularity) {
158159
prepareTableWithStringColumn(ops, tableName);
159160
populateTableWithStringColumn(ops, tableName, 3, dataFormats);
160-
ops.runRetention(tableName, column, pattern, granularity, 2, false, "");
161+
ops.runRetention(tableName, column, pattern, granularity, 2, false, "", ZonedDateTime.now());
161162
}
162163

163164
@Test
@@ -169,7 +170,7 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
169170
List<Long> snapshots = getSnapshotIds(ops, tableName);
170171
// check if there are existing snapshots
171172
Assertions.assertTrue(snapshots.size() > 0);
172-
ops.runRetention(tableName, "ts", "", "day", 2, false, "");
173+
ops.runRetention(tableName, "ts", "", "day", 2, false, "", ZonedDateTime.now());
173174
verifyRowCount(ops, tableName, 4);
174175
List<Long> snapshotsAfter = getSnapshotIds(ops, tableName);
175176
Assertions.assertEquals(snapshots.size() + 1, snapshotsAfter.size());
@@ -213,19 +214,22 @@ public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exc
213214
String.format(
214215
"insert into %s values ('b', '%s', '%s', 0), ('b', '%s', '%s', 0)",
215216
tableName, twoDayAgoDate, twoDayAgoHour, threeDayAgoDate, threeDayAgoHour));
216-
ops.runRetention(tableName, columnName, columnPattern, granularity, count, true, ".backup");
217+
ZonedDateTime now = ZonedDateTime.now();
218+
ops.runRetention(
219+
tableName, columnName, columnPattern, granularity, count, true, ".backup", now);
217220
// verify data_manifest.json
218221
Table table = ops.getTable(tableName);
222+
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
219223
Path firstManifestPath =
220224
new Path(
221225
String.format(
222-
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/data_manifest.json",
223-
table.location(), twoDayAgoDate, twoDayAgoHour));
226+
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/%s",
227+
table.location(), twoDayAgoDate, twoDayAgoHour, manifestName));
224228
Path secondManifestPath =
225229
new Path(
226230
String.format(
227-
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/data_manifest.json",
228-
table.location(), threeDayAgoDate, threeDayAgoHour));
231+
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/%s",
232+
table.location(), threeDayAgoDate, threeDayAgoHour, manifestName));
229233
Assertions.assertTrue(ops.fs().exists(firstManifestPath));
230234
Assertions.assertTrue(ops.fs().exists(secondManifestPath));
231235
try (InputStream in = ops.fs().open(firstManifestPath);
@@ -246,6 +250,8 @@ public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exc
246250
oneDataFilePath.startsWith(secondManifestPath.getParent().toString())
247251
&& oneDataFilePath.endsWith(".orc"));
248252
}
253+
Assertions.assertEquals(
254+
table.location() + "/.backup", table.properties().get(AppConstants.BACKUP_DIR_KEY));
249255
}
250256
}
251257

@@ -277,13 +283,16 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
277283
String.format(
278284
"insert into %s values ('b', cast('%s' as timestamp)), ('b', cast('%s' as timestamp))",
279285
tableName, today, twoDayAgo));
280-
ops.runRetention(tableName, columnName, columnPattern, granularity, count, true, ".backup");
286+
ZonedDateTime now = ZonedDateTime.now();
287+
ops.runRetention(
288+
tableName, columnName, columnPattern, granularity, count, true, ".backup", now);
281289
// verify data_manifest.json
282290
Table table = ops.getTable(tableName);
291+
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
283292
Path manifestPath =
284293
new Path(
285294
String.format(
286-
"%s/.backup/data/ts_day=%s/data_manifest.json", table.location(), twoDayAgo));
295+
"%s/.backup/data/ts_day=%s/%s", table.location(), twoDayAgo, manifestName));
287296
Assertions.assertTrue(ops.fs().exists(manifestPath));
288297
try (InputStream in = ops.fs().open(manifestPath);
289298
InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) {
@@ -294,6 +303,8 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
294303
oneDataFilePath.startsWith(manifestPath.getParent().toString())
295304
&& oneDataFilePath.endsWith(".orc"));
296305
}
306+
Assertions.assertEquals(
307+
table.location() + "/.backup", table.properties().get(AppConstants.BACKUP_DIR_KEY));
297308
}
298309
}
299310

infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
<< : *livy-engine
3838
- type: RETENTION
3939
class-name: com.linkedin.openhouse.jobs.spark.RetentionSparkApp
40-
args: []
40+
args: ["--backupDir", ".backup"]
4141
<< : *apps-defaults
4242
<< : *livy-engine
4343
- type: DATA_COMPACTION
@@ -52,7 +52,7 @@ jobs:
5252
<< : *livy-engine
5353
- type: ORPHAN_FILES_DELETION
5454
class-name: com.linkedin.openhouse.jobs.spark.OrphanFilesDeletionSparkApp
55-
args: []
55+
args: ["--backupDir", ".backup"]
5656
<<: *apps-defaults
5757
spark-properties:
5858
<<: *spark-defaults

0 commit comments

Comments
 (0)