Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.openhouse.common.stats.model.CommitEventTable;
import com.linkedin.openhouse.common.stats.model.CommitEventTablePartitions;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
import com.linkedin.openhouse.jobs.util.TableStatsCollector;
import java.io.IOException;
Expand Down Expand Up @@ -252,6 +253,7 @@ public void expireSnapshots(Table table, int maxAge, String granularity, int ver
* @param count granularity count representing retention timeline for @fqtn records
* @param backupEnabled flag to indicate if backup manifests need to be created for data files
* @param backupDir backup directory under which data manifests are created
* @param now current timestamp to be used for retention calculation
*/
public void runRetention(
String fqtn,
Expand All @@ -260,13 +262,14 @@ public void runRetention(
String granularity,
int count,
boolean backupEnabled,
String backupDir) {
ZonedDateTime now = ZonedDateTime.now();
String backupDir,
ZonedDateTime now) {
if (backupEnabled) {
// Cache of manifests: partitionPath -> list of data file path
Map<String, List<String>> manifestCache =
prepareBackupDataManifests(fqtn, columnName, columnPattern, granularity, count, now);
writeBackupDataManifests(manifestCache, getTable(fqtn), backupDir);
writeBackupDataManifests(manifestCache, getTable(fqtn), backupDir, now);
exposeBackupLocation(getTable(fqtn), backupDir);
}
final String statement =
SparkJobUtil.createDeleteStatement(
Expand Down Expand Up @@ -302,7 +305,7 @@ private Map<String, List<String>> prepareBackupDataManifests(
}

private void writeBackupDataManifests(
Map<String, List<String>> manifestCache, Table table, String backupDir) {
Map<String, List<String>> manifestCache, Table table, String backupDir, ZonedDateTime now) {
for (String partitionPath : manifestCache.keySet()) {
List<String> files = manifestCache.get(partitionPath);
List<String> backupFiles =
Expand All @@ -314,8 +317,9 @@ private void writeBackupDataManifests(
jsonMap.put("files", backupFiles);
String jsonStr = new Gson().toJson(jsonMap);
// Create data_manifest.json
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
Path destPath =
getTrashPath(table, new Path(partitionPath, "data_manifest.json").toString(), backupDir);
getTrashPath(table, new Path(partitionPath, manifestName).toString(), backupDir);
try {
final FileSystem fs = fs();
if (!fs.exists(destPath.getParent())) {
Expand All @@ -331,6 +335,14 @@ private void writeBackupDataManifests(
}
}

private void exposeBackupLocation(Table table, String backupDir) {
Path fullyQualifiedBackupDir = new Path(table.location(), backupDir);
spark.sql(
String.format(
"ALTER TABLE %s SET TBLPROPERTIES ('%s'='%s')",
table.name(), AppConstants.BACKUP_DIR_KEY, fullyQualifiedBackupDir));
}

private Path getTrashPath(String path, String filePath, String trashDir) {
return new Path(filePath.replace(path, new Path(path, trashDir).toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
@Slf4j
public class OrphanFilesDeletionSparkApp extends BaseTableSparkApp {
private final long ttlSeconds;
private final String backupDir;

public OrphanFilesDeletionSparkApp(
String jobId,
StateManager stateManager,
String fqtn,
long ttlSeconds,
OtelEmitter otelEmitter) {
OtelEmitter otelEmitter,
String backupDir) {
super(jobId, stateManager, fqtn, otelEmitter);
this.ttlSeconds = ttlSeconds;
this.backupDir = backupDir;
}

@Override
Expand All @@ -46,8 +49,7 @@ protected void runInner(Operations ops) {
Table table = ops.getTable(fqtn);
boolean backupEnabled =
Boolean.parseBoolean(
table.properties().getOrDefault(RetentionSparkApp.BACKUP_ENABLED_KEY, "false"));
String backupDir = table.properties().getOrDefault(RetentionSparkApp.BACKUP_DIR_KEY, ".backup");
table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false"));
log.info(
"Orphan files deletion app start for table={} with olderThanTimestampMillis={} backupEnabled={} and backupDir={}",
fqtn,
Expand Down Expand Up @@ -89,6 +91,7 @@ public static OrphanFilesDeletionSparkApp createApp(String[] args, OtelEmitter o
extraOptions.add(
new Option(
"s", "skipStaging", false, "Whether to skip staging orphan files before deletion"));
extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
CommandLine cmdLine = createCommandLine(args, extraOptions);
return new OrphanFilesDeletionSparkApp(
getJobId(cmdLine),
Expand All @@ -97,6 +100,7 @@ public static OrphanFilesDeletionSparkApp createApp(String[] args, OtelEmitter o
Math.max(
NumberUtils.toLong(cmdLine.getOptionValue("ttl"), TimeUnit.DAYS.toSeconds(7)),
TimeUnit.DAYS.toSeconds(1)),
otelEmitter);
otelEmitter,
cmdLine.getOptionValue("backupDir", ".backup"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
import com.linkedin.openhouse.common.metrics.OtelEmitter;
import com.linkedin.openhouse.jobs.spark.state.StateManager;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -23,12 +25,11 @@
*/
@Slf4j
public class RetentionSparkApp extends BaseTableSparkApp {
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
public static final String BACKUP_DIR_KEY = "retention.backup.dir";
private final String columnName;
private final String columnPattern;
private final String granularity;
private final int count;
private final String backupDir;

public RetentionSparkApp(
String jobId,
Expand All @@ -38,30 +39,35 @@ public RetentionSparkApp(
String columnPattern,
String granularity,
int count,
OtelEmitter otelEmitter) {
OtelEmitter otelEmitter,
String backupDir) {
super(jobId, stateManager, fqtn, otelEmitter);
this.columnName = columnName;
this.columnPattern = columnPattern;
this.granularity = granularity;
this.count = count;
this.backupDir = backupDir;
}

@Override
protected void runInner(Operations ops) {
Table table = ops.getTable(fqtn);
boolean backupEnabled =
Boolean.parseBoolean(table.properties().getOrDefault(BACKUP_ENABLED_KEY, "false"));
String backupDir = table.properties().getOrDefault(BACKUP_DIR_KEY, ".backup");
Boolean.parseBoolean(
table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false"));
ZonedDateTime now = ZonedDateTime.now();
log.info(
"Retention app start for table {}, column {}, {}, ttl={} {}s, backupEnabled={}, backupDir={}",
"Retention app start for table {}, column {}, {}, ttl={} {}s, backupEnabled={}, backupDir={}, ts={}",
fqtn,
columnName,
columnPattern,
count,
granularity,
backupEnabled,
backupDir);
ops.runRetention(fqtn, columnName, columnPattern, granularity, count, backupEnabled, backupDir);
backupDir,
now);
ops.runRetention(
fqtn, columnName, columnPattern, granularity, count, backupEnabled, backupDir, now);
}

public static void main(String[] args) {
Expand All @@ -77,6 +83,7 @@ public static RetentionSparkApp createApp(String[] args, OtelEmitter otelEmitter
extraOptions.add(new Option("cp", "columnPattern", true, "Retention column pattern"));
extraOptions.add(new Option("g", "granularity", true, "Granularity: day, week"));
extraOptions.add(new Option("c", "count", true, "Retain last <count> <granularity>s"));
extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
CommandLine cmdLine = createCommandLine(args, extraOptions);
return new RetentionSparkApp(
getJobId(cmdLine),
Expand All @@ -86,6 +93,7 @@ public static RetentionSparkApp createApp(String[] args, OtelEmitter otelEmitter
cmdLine.getOptionValue("columnPattern", ""),
cmdLine.getOptionValue("granularity"),
Integer.parseInt(cmdLine.getOptionValue("count")),
otelEmitter);
otelEmitter,
cmdLine.getOptionValue("backupDir", ".backup"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,9 @@ public final class AppConstants {
public static final String QUEUED_TIME = "queued_time";
public static final String DATABASE_NAME = "database_name";

// Maintenance jobs table properties keys
public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";
public static final String BACKUP_DIR_KEY = "retention.backup.dir";

private AppConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
import com.linkedin.openhouse.common.metrics.OtelEmitter;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
import com.linkedin.openhouse.tables.client.model.Policies;
Expand Down Expand Up @@ -56,7 +57,7 @@ public void testRetentionSparkApp() throws Exception {
prepareTableWithRetentionAndSharingPolicies(ops, tableName, "1d", true);
populateTable(ops, tableName, 3);
populateTable(ops, tableName, 2, 2);
ops.runRetention(tableName, "ts", "", "day", 1, false, "");
ops.runRetention(tableName, "ts", "", "day", 1, false, "", ZonedDateTime.now());
verifyRowCount(ops, tableName, 3);
verifyPolicies(ops, tableName, 1, Retention.GranularityEnum.DAY, true);
}
Expand Down Expand Up @@ -157,7 +158,7 @@ private void runRetentionJobWithStringPartitionColumns(
String granularity) {
prepareTableWithStringColumn(ops, tableName);
populateTableWithStringColumn(ops, tableName, 3, dataFormats);
ops.runRetention(tableName, column, pattern, granularity, 2, false, "");
ops.runRetention(tableName, column, pattern, granularity, 2, false, "", ZonedDateTime.now());
}

@Test
Expand All @@ -169,7 +170,7 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
List<Long> snapshots = getSnapshotIds(ops, tableName);
// check if there are existing snapshots
Assertions.assertTrue(snapshots.size() > 0);
ops.runRetention(tableName, "ts", "", "day", 2, false, "");
ops.runRetention(tableName, "ts", "", "day", 2, false, "", ZonedDateTime.now());
verifyRowCount(ops, tableName, 4);
List<Long> snapshotsAfter = getSnapshotIds(ops, tableName);
Assertions.assertEquals(snapshots.size() + 1, snapshotsAfter.size());
Expand Down Expand Up @@ -213,19 +214,22 @@ public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exc
String.format(
"insert into %s values ('b', '%s', '%s', 0), ('b', '%s', '%s', 0)",
tableName, twoDayAgoDate, twoDayAgoHour, threeDayAgoDate, threeDayAgoHour));
ops.runRetention(tableName, columnName, columnPattern, granularity, count, true, ".backup");
ZonedDateTime now = ZonedDateTime.now();
ops.runRetention(
tableName, columnName, columnPattern, granularity, count, true, ".backup", now);
// verify data_manifest.json
Table table = ops.getTable(tableName);
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
Path firstManifestPath =
new Path(
String.format(
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/data_manifest.json",
table.location(), twoDayAgoDate, twoDayAgoHour));
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/%s",
table.location(), twoDayAgoDate, twoDayAgoHour, manifestName));
Path secondManifestPath =
new Path(
String.format(
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/data_manifest.json",
table.location(), threeDayAgoDate, threeDayAgoHour));
"%s/.backup/data/datepartition=%s/hourpartition=%s/late=0/%s",
table.location(), threeDayAgoDate, threeDayAgoHour, manifestName));
Assertions.assertTrue(ops.fs().exists(firstManifestPath));
Assertions.assertTrue(ops.fs().exists(secondManifestPath));
try (InputStream in = ops.fs().open(firstManifestPath);
Expand All @@ -246,6 +250,8 @@ public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exc
oneDataFilePath.startsWith(secondManifestPath.getParent().toString())
&& oneDataFilePath.endsWith(".orc"));
}
Assertions.assertEquals(
table.location() + "/.backup", table.properties().get(AppConstants.BACKUP_DIR_KEY));
}
}

Expand Down Expand Up @@ -277,13 +283,16 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
String.format(
"insert into %s values ('b', cast('%s' as timestamp)), ('b', cast('%s' as timestamp))",
tableName, today, twoDayAgo));
ops.runRetention(tableName, columnName, columnPattern, granularity, count, true, ".backup");
ZonedDateTime now = ZonedDateTime.now();
ops.runRetention(
tableName, columnName, columnPattern, granularity, count, true, ".backup", now);
// verify data_manifest.json
Table table = ops.getTable(tableName);
String manifestName = String.format("data_manifest_%d.json", now.toInstant().toEpochMilli());
Path manifestPath =
new Path(
String.format(
"%s/.backup/data/ts_day=%s/data_manifest.json", table.location(), twoDayAgo));
"%s/.backup/data/ts_day=%s/%s", table.location(), twoDayAgo, manifestName));
Assertions.assertTrue(ops.fs().exists(manifestPath));
try (InputStream in = ops.fs().open(manifestPath);
InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) {
Expand All @@ -294,6 +303,8 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
oneDataFilePath.startsWith(manifestPath.getParent().toString())
&& oneDataFilePath.endsWith(".orc"));
}
Assertions.assertEquals(
table.location() + "/.backup", table.properties().get(AppConstants.BACKUP_DIR_KEY));
}
}

Expand Down
4 changes: 2 additions & 2 deletions infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
<< : *livy-engine
- type: RETENTION
class-name: com.linkedin.openhouse.jobs.spark.RetentionSparkApp
args: []
args: ["--backupDir", ".backup"]
<< : *apps-defaults
<< : *livy-engine
- type: DATA_COMPACTION
Expand All @@ -52,7 +52,7 @@ jobs:
<< : *livy-engine
- type: ORPHAN_FILES_DELETION
class-name: com.linkedin.openhouse.jobs.spark.OrphanFilesDeletionSparkApp
args: []
args: ["--backupDir", ".backup"]
<<: *apps-defaults
spark-properties:
<<: *spark-defaults
Expand Down