Skip to content

Run migration to rewrite canonical IDs #7850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions Firestore/core/src/local/leveldb_migrations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
#include "Firestore/Protos/nanopb/firestore/local/target.nanopb.h"
#include "Firestore/core/src/local/leveldb_key.h"
#include "Firestore/core/src/local/memory_index_manager.h"
#include "Firestore/core/src/local/target_data.h"
#include "Firestore/core/src/model/document_key.h"
#include "Firestore/core/src/model/types.h"
#include "Firestore/core/src/nanopb/message.h"
#include "Firestore/core/src/nanopb/reader.h"
#include "Firestore/core/src/nanopb/writer.h"
#include "Firestore/core/src/util/log.h"
#include "Firestore/core/src/util/statusor.h"
#include "absl/strings/match.h"

namespace firebase {
Expand Down Expand Up @@ -65,8 +68,9 @@ using nanopb::Writer;
* has a sentinel row with a sequence number.
* * Migration 5 drops held write acks.
* * Migration 6 populates the collection_parents index.
* * Migration 7 rewrites query_targets canonical ids in new format.
*/
const LevelDbMigrations::SchemaVersion kSchemaVersion = 6;
const LevelDbMigrations::SchemaVersion kSchemaVersion = 7;

/**
* Save the given version number as the current version of the schema of the
Expand Down Expand Up @@ -303,6 +307,81 @@ void EnsureCollectionParentsIndex(leveldb::DB* db) {
transaction.Commit();
}

/**
* Returns a `TargetData` by reading the `targets` table, using the given key
* for `query_targets` as a foreign key.
*/
util::StatusOr<TargetData> ReadTargetData(
const LevelDbQueryTargetKey& query_target_key,
const LocalSerializer& serializer,
LevelDbTransaction& transaction) {
auto target_it = transaction.NewIterator();
const auto& target_key = LevelDbTargetKey::Key(query_target_key.target_id());
target_it->Seek(target_key);
if (!target_it->Valid()) {
return util::Status(
kErrorNotFound,
util::StringFormat(
"Dangling query-target reference found: seeking %s found %s",
DescribeKey(target_key), DescribeKey(target_it)));
}

StringReader reader{target_it->value()};
auto message = Message<firestore_client_Target>::TryParse(&reader);
if (!reader.ok()) {
return util::Status(kErrorDataLoss,
util::StringFormat("Target proto failed to parse: %s",
reader.status().ToString()));
}
auto target_data = serializer.DecodeTargetData(&reader, *message);
if (!reader.ok()) {
return util::Status(
kErrorDataLoss,
util::StringFormat("Target failed to parse: %s, message: %s",
reader.status().ToString(), message.ToString()));
}

return target_data;
}

/**
* Migration 7.
*
* Rewrites targets canonical IDs with new format.
*/
void RewriteTargetsCanonicalIds(leveldb::DB* db,
const LocalSerializer& serializer) {
LevelDbTransaction transaction(db, "Rewrite Targets Canonical Ids");

std::string query_targets_prefix = LevelDbQueryTargetKey::KeyPrefix();
auto it = transaction.NewIterator();
it->Seek(query_targets_prefix);
LevelDbQueryTargetKey query_target_key;
for (; it->Valid() && absl::StartsWith(it->key(), query_targets_prefix);
it->Next()) {
HARD_ASSERT(query_target_key.Decode(it->key()),
"Failed to decode query_targets key");

util::StatusOr<TargetData> target_data =
ReadTargetData(query_target_key, serializer, transaction);
if (!target_data.ok()) {
LOG_WARN("Reading target data failed: %s",
target_data.status().error_message());
continue;
}

auto new_key = LevelDbQueryTargetKey::Key(
target_data.ValueOrDie().target().CanonicalId(),
target_data.ValueOrDie().target_id());

transaction.Delete(it->key());
std::string empty_buffer;
transaction.Put(new_key, empty_buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to write the old data back (empty_buffer sounds suspicious).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This table always has empty value: https://osscs.corp.google.com/firebase-sdk/firebase-ios-sdk/+/master:Firestore/core/src/local/leveldb_target_cache.cc;l=110

It serves as an association between canonical id and target id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

}

transaction.Commit();
}

} // namespace

LevelDbMigrations::SchemaVersion LevelDbMigrations::ReadSchemaVersion(
Expand All @@ -321,12 +400,14 @@ LevelDbMigrations::SchemaVersion LevelDbMigrations::ReadSchemaVersion(
}
}

void LevelDbMigrations::RunMigrations(leveldb::DB* db) {
RunMigrations(db, kSchemaVersion);
void LevelDbMigrations::RunMigrations(leveldb::DB* db,
const LocalSerializer& serializer) {
RunMigrations(db, kSchemaVersion, serializer);
}

void LevelDbMigrations::RunMigrations(leveldb::DB* db,
SchemaVersion to_version) {
SchemaVersion to_version,
const LocalSerializer& serializer) {
SchemaVersion from_version = ReadSchemaVersion(db);
// If this is a downgrade, just save the downgrade version so we can
// detect it when we go to upgrade again, allowing us to rerun the
Expand Down Expand Up @@ -356,6 +437,10 @@ void LevelDbMigrations::RunMigrations(leveldb::DB* db,
if (from_version < 6 && to_version >= 6) {
EnsureCollectionParentsIndex(db);
}

if (from_version < 7 && to_version >= 7) {
RewriteTargetsCanonicalIds(db, serializer);
}
}

} // namespace local
Expand Down
6 changes: 4 additions & 2 deletions Firestore/core/src/local/leveldb_migrations.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class LevelDbMigrations {
* Runs any migrations needed to bring the given database up to the current
* schema version
*/
static void RunMigrations(leveldb::DB* db);
static void RunMigrations(leveldb::DB* db, const LocalSerializer& serializer);

/**
* Runs any migrations needed to bring the given database up to the given
* schema version
*/
static void RunMigrations(leveldb::DB* db, SchemaVersion version);
static void RunMigrations(leveldb::DB* db,
SchemaVersion version,
const LocalSerializer& serializer);
};

} // namespace local
Expand Down
2 changes: 1 addition & 1 deletion Firestore/core/src/local/leveldb_persistence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ StatusOr<std::unique_ptr<LevelDbPersistence>> LevelDbPersistence::Create(
if (!created.ok()) return created.status();

std::unique_ptr<DB> db = std::move(created).ValueOrDie();
LevelDbMigrations::RunMigrations(db.get());
LevelDbMigrations::RunMigrations(db.get(), serializer);

LevelDbTransaction transaction(db.get(), "Start LevelDB");
std::set<std::string> users = CollectUserSet(&transaction);
Expand Down
86 changes: 70 additions & 16 deletions Firestore/core/test/unit/local/leveldb_migrations_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
#include <vector>

#include "Firestore/Protos/nanopb/firestore/local/mutation.nanopb.h"
#include "Firestore/core/src/core/field_filter.h"
#include "Firestore/core/src/core/query.h"
#include "Firestore/core/src/local/leveldb_key.h"
#include "Firestore/core/src/local/leveldb_target_cache.h"
#include "Firestore/core/src/local/target_data.h"
#include "Firestore/core/src/nanopb/message.h"
#include "Firestore/core/src/util/ordered_code.h"
#include "Firestore/core/src/util/path.h"
Expand All @@ -47,7 +50,9 @@ using model::DocumentKey;
using model::ListenSequenceNumber;
using model::TargetId;
using nanopb::Message;
using testutil::Filter;
using testutil::Key;
using testutil::Query;
using util::OrderedCode;
using util::Path;

Expand All @@ -72,7 +77,8 @@ class LevelDbMigrationsTest : public testing::Test {
protected:
void SetUp() override;

std::unique_ptr<DB> db_;
std::unique_ptr<DB> db_ = nullptr;
std::unique_ptr<LocalSerializer> serializer_ = nullptr;
};

void LevelDbMigrationsTest::SetUp() {
Expand All @@ -86,13 +92,15 @@ void LevelDbMigrationsTest::SetUp() {
ASSERT_TRUE(status.ok()) << "Failed to create db: "
<< status.ToString().c_str();
db_.reset(db);

serializer_ = absl::make_unique<LocalSerializer>(MakeLocalSerializer());
}

TEST_F(LevelDbMigrationsTest, AddsTargetGlobal) {
auto metadata = LevelDbTargetCache::TryReadMetadata(db_.get());
ASSERT_TRUE(!metadata)
<< "Not expecting metadata yet, we should have an empty db";
LevelDbMigrations::RunMigrations(db_.get());
LevelDbMigrations::RunMigrations(db_.get(), *serializer_);

metadata = LevelDbTargetCache::TryReadMetadata(db_.get());
ASSERT_TRUE(metadata) << "Migrations should have added the metadata";
Expand All @@ -103,7 +111,7 @@ TEST_F(LevelDbMigrationsTest, SetsVersionNumber) {
ASSERT_EQ(0, initial) << "No version should be equivalent to 0";

// Pick an arbitrary high migration number and migrate to it.
LevelDbMigrations::RunMigrations(db_.get());
LevelDbMigrations::RunMigrations(db_.get(), *serializer_);

SchemaVersion actual = LevelDbMigrations::ReadSchemaVersion(db_.get());
ASSERT_GT(actual, 0) << "Expected to migrate to a schema version > 0";
Expand Down Expand Up @@ -145,7 +153,7 @@ TEST_F(LevelDbMigrationsTest, DropsTheTargetCache) {
LevelDbMutationKey::Key(user_id, batch_id),
};

LevelDbMigrations::RunMigrations(db_.get(), 2);
LevelDbMigrations::RunMigrations(db_.get(), 2, *serializer_);
{
// Setup some targets to be counted in the migration.
LevelDbTransaction transaction(db_.get(),
Expand All @@ -159,7 +167,7 @@ TEST_F(LevelDbMigrationsTest, DropsTheTargetCache) {
transaction.Commit();
}

LevelDbMigrations::RunMigrations(db_.get(), 3);
LevelDbMigrations::RunMigrations(db_.get(), 3, *serializer_);
{
LevelDbTransaction transaction(db_.get(), "test_drops_the_target_cache");
for (const std::string& key : target_keys) {
Expand All @@ -176,7 +184,7 @@ TEST_F(LevelDbMigrationsTest, DropsTheTargetCache) {
}

TEST_F(LevelDbMigrationsTest, DropsTheTargetCacheWithThousandsOfEntries) {
LevelDbMigrations::RunMigrations(db_.get(), 2);
LevelDbMigrations::RunMigrations(db_.get(), 2, *serializer_);
{
// Setup some targets to be destroyed.
LevelDbTransaction transaction(
Expand All @@ -188,7 +196,7 @@ TEST_F(LevelDbMigrationsTest, DropsTheTargetCacheWithThousandsOfEntries) {
transaction.Commit();
}

LevelDbMigrations::RunMigrations(db_.get(), 3);
LevelDbMigrations::RunMigrations(db_.get(), 3, *serializer_);
{
LevelDbTransaction transaction(db_.get(), "Verify");
std::string prefix = LevelDbTargetKey::KeyPrefix();
Expand All @@ -209,7 +217,7 @@ TEST_F(LevelDbMigrationsTest, AddsSentinelRows) {
ListenSequenceNumber new_sequence_number = 2;
std::string encoded_old_sequence_number =
LevelDbDocumentTargetKey::EncodeSentinelValue(old_sequence_number);
LevelDbMigrations::RunMigrations(db_.get(), 3);
LevelDbMigrations::RunMigrations(db_.get(), 3, *serializer_);
{
std::string empty_buffer;
LevelDbTransaction transaction(db_.get(), "Setup");
Expand All @@ -234,7 +242,7 @@ TEST_F(LevelDbMigrationsTest, AddsSentinelRows) {
transaction.Commit();
}

LevelDbMigrations::RunMigrations(db_.get(), 4);
LevelDbMigrations::RunMigrations(db_.get(), 4, *serializer_);
{
LevelDbTransaction transaction(db_.get(), "Verify");
auto it = transaction.NewIterator();
Expand Down Expand Up @@ -271,7 +279,7 @@ TEST_F(LevelDbMigrationsTest, RemovesMutationBatches) {
DocumentKey test_write_baz = DocumentKey::FromPathString("docs/baz");
DocumentKey test_write_pending = DocumentKey::FromPathString("docs/pending");
// Do everything up until the mutation batch migration.
LevelDbMigrations::RunMigrations(db_.get(), 3);
LevelDbMigrations::RunMigrations(db_.get(), 3, *serializer_);
// Set up data
{
LevelDbTransaction transaction(db_.get(), "Setup Foo");
Expand Down Expand Up @@ -344,7 +352,7 @@ TEST_F(LevelDbMigrationsTest, RemovesMutationBatches) {
transaction.Commit();
}

LevelDbMigrations::RunMigrations(db_.get(), 5);
LevelDbMigrations::RunMigrations(db_.get(), 5, *serializer_);

{
// Verify
Expand Down Expand Up @@ -412,7 +420,7 @@ TEST_F(LevelDbMigrationsTest, CreateCollectionParentsIndex) {
{"cg3", {"blah/x/blah/x", "cg2/x"}}};

std::string empty_buffer;
LevelDbMigrations::RunMigrations(db_.get(), 5);
LevelDbMigrations::RunMigrations(db_.get(), 5, *serializer_);
{
LevelDbTransaction transaction(db_.get(),
"Write Mutations and Remote Documents");
Expand All @@ -436,7 +444,7 @@ TEST_F(LevelDbMigrationsTest, CreateCollectionParentsIndex) {
}

// Migrate to v6 and verify index entries.
LevelDbMigrations::RunMigrations(db_.get(), 6);
LevelDbMigrations::RunMigrations(db_.get(), 6, *serializer_);
{
LevelDbTransaction transaction(db_.get(), "Verify");

Expand All @@ -459,22 +467,68 @@ TEST_F(LevelDbMigrationsTest, CreateCollectionParentsIndex) {
}
}

TEST_F(LevelDbMigrationsTest, RewritesCanonicalIds) {
LevelDbMigrations::RunMigrations(db_.get(), 6, *serializer_);
auto query = Query("collection").AddingFilter(Filter("foo", "==", "bar"));
TargetData initial_target_data(query.ToTarget(),
/* target_id= */ 2,
/* sequence_number= */ 1,
QueryPurpose::Listen);
auto invalid_key = LevelDbQueryTargetKey::Key(
"invalid_canonical_id", initial_target_data.target_id());

// Write the target with invalid canonical id into leveldb.
{
LevelDbTransaction transaction(db_.get(),
"Write target with invalid canonical ID");
auto target_key = LevelDbTargetKey::Key(2);
transaction.Put(target_key,
serializer_->EncodeTargetData(initial_target_data));

std::string empty_buffer;
transaction.Put(invalid_key, empty_buffer);

transaction.Commit();
}

// Run migration and verify canonical id is rewritten with valid string.
{
LevelDbMigrations::RunMigrations(db_.get(), *serializer_);

LevelDbTransaction transaction(
db_.get(), "Read target to verify canonical ID rewritten");

auto query_target_key =
LevelDbQueryTargetKey::Key(initial_target_data.target().CanonicalId(),
initial_target_data.target_id());
auto it = transaction.NewIterator();
// Verify we are able to seek to the key built with proper canonical ID.
it->Seek(query_target_key);
ASSERT_EQ(it->key(), query_target_key);

// Verify original invalid key is deleted.
it->Seek(invalid_key);
ASSERT_NE(it->key(), invalid_key);
transaction.Commit();
}
}

TEST_F(LevelDbMigrationsTest, CanDowngrade) {
// First, run all of the migrations
LevelDbMigrations::RunMigrations(db_.get());
LevelDbMigrations::RunMigrations(db_.get(), *serializer_);

LevelDbMigrations::SchemaVersion latest_version =
LevelDbMigrations::ReadSchemaVersion(db_.get());

// Downgrade to an early version.
LevelDbMigrations::SchemaVersion downgrade_version = 1;
LevelDbMigrations::RunMigrations(db_.get(), downgrade_version);
LevelDbMigrations::RunMigrations(db_.get(), downgrade_version, *serializer_);
LevelDbMigrations::SchemaVersion post_downgrade_version =
LevelDbMigrations::ReadSchemaVersion(db_.get());
ASSERT_EQ(downgrade_version, post_downgrade_version);

// Verify that we can upgrade again to the latest version.
LevelDbMigrations::RunMigrations(db_.get());
LevelDbMigrations::RunMigrations(db_.get(), *serializer_);
LevelDbMigrations::SchemaVersion final_version =
LevelDbMigrations::ReadSchemaVersion(db_.get());
ASSERT_EQ(final_version, latest_version);
Expand Down
4 changes: 2 additions & 2 deletions Firestore/core/test/unit/local/persistence_testing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ using util::Filesystem;
using util::Path;
using util::Status;

} // namespace

LocalSerializer MakeLocalSerializer() {
Serializer remote_serializer{DatabaseId("p", "d")};
return LocalSerializer(std::move(remote_serializer));
}

} // namespace

Path LevelDbDir() {
auto* fs = Filesystem::Default();
Path dir = fs->TempDir().AppendUtf8("PersistenceTesting");
Expand Down
Loading