Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 25 additions & 37 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,53 +802,41 @@ StatusOr<int> Storage::IngestSST(const std::string &sst_dir, const rocksdb::Inge
return 0;
}

std::unordered_map<ColumnFamilyID, std::vector<std::string>> cf_files;
// Group SST files by column family
const auto &column_families = ColumnFamilyConfigs::ListAllColumnFamilies();
std::unordered_map<ColumnFamilyID, std::vector<std::string>> column_family_files;
for (const auto &file : sst_files) {
bool matched = false;
for (const auto &cf : ColumnFamilyConfigs::ListAllColumnFamilies()) {
if (file.find(cf.Name()) != std::string::npos) {
cf_files[cf.Id()].push_back(file);
matched = true;
break;
}
}
if (!matched) {
auto iter = std::find_if(column_families.begin(), column_families.end(),
[&file](const auto &cf) { return file.find(cf.Name()) != std::string::npos; });
if (iter == column_families.end()) {
return {Status::NotOK, fmt::format("SST file '{}' does not match any known column family name", file)};
}
column_family_files[iter->Id()].push_back(file);
}

// Process each set of files with the appropriate column family
// By importing the specific column family SST files first, we avoid data corruption -
// if import fails, no data is made available or corrupted in either column family
// if the metadata import fails, the imported data will be deleted by the compaction.
rocksdb::Status status;
// Process files for each column family except metadata
for (const auto &[cf, files] : cf_files) {
if (cf == ColumnFamilyID::Metadata) continue;
if (files.empty()) continue;

rocksdb::ColumnFamilyHandle *cf_handle = GetCFHandle(cf);

status = ingestSST(cf_handle, ingest_options, files);
if (!status.ok()) {
return {Status::NotOK, status.ToString()};
}
// Build ingestion arguments for atomic ingestion across all column families
// IngestExternalFiles API ingests all files atomically - either all succeed or all fail
std::vector<rocksdb::IngestExternalFileArg> ingest_args;
ingest_args.reserve(column_family_files.size());
for (auto &[cf_id, files] : column_family_files) {
rocksdb::IngestExternalFileArg arg;
arg.column_family = GetCFHandle(cf_id);
arg.external_files = std::move(files);
arg.options = ingest_options;
ingest_args.push_back(std::move(arg));
}
// Process metadata files
const auto &metadata_files = cf_files[ColumnFamilyID::Metadata];
if (!metadata_files.empty()) {
status = ingestSST(GetCFHandle(ColumnFamilyID::Metadata), ingest_options, metadata_files);

if (!ingest_args.empty()) {
rocksdb::Status status = db_->IngestExternalFiles(ingest_args);
if (!status.ok()) {
return {Status::NotOK, status.ToString()};
ERROR("Failed to atomically ingest SST files across column families: {}", status.ToString());
return {Status::NotOK,
fmt::format("Failed to atomically ingest SST files across column families: {}", status.ToString())};
}
}
return sst_files.size();
}

rocksdb::Status Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::IngestExternalFileOptions &options,
const std::vector<std::string> &sst_file_names) {
return db_->IngestExternalFile(cf_handle, sst_file_names, options);
INFO("Successfully ingested {} SST files atomically across all column families", sst_files.size());
return sst_files.size();
}

void Storage::FlushBlockCache() { shared_block_cache_->EraseUnRefEntries(); }
Expand Down
Loading