Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 132 additions & 26 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,105 @@ inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) {
Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
if (!internal_iter_initialized_) {
*prop = std::to_string(sv_number_);
return Status::OK();
}
// First try to pass the value returned from inner iterator.
if (!db_iter_->GetProperty(prop_name, prop).ok()) {
*prop = std::to_string(sv_number_);
}
return Status::OK();
}
return db_iter_->GetProperty(prop_name, prop);
if (!EnsureInternalIteratorInitialized(nullptr).ok()) {
return db_iter_->status();
}
return db_iter_->GetProperty(std::move(prop_name), prop);
}

void ArenaWrappedDBIter::CleanupDeferredSuperVersion() {
if (deferred_sv_ != nullptr) {
assert(db_impl_ != nullptr);
db_impl_->CleanupIteratorSuperVersion(
deferred_sv_, read_options_.background_purge_on_iterator_cleanup);
deferred_sv_ = nullptr;
}
}

void ArenaWrappedDBIter::DestroyDBIter() {
db_iter_->~DBIter();
CleanupDeferredSuperVersion();
}

void ArenaWrappedDBIter::ColumnFamilyDataUnrefDeleter::operator()(
ColumnFamilyData* cfd) const {
if (cfd == nullptr) {
return;
}
assert(db_impl != nullptr);

InstrumentedMutexLock lock(db_impl->mutex());
cfd->UnrefAndTryDelete();
}

void ArenaWrappedDBIter::DestroyDBIterAndArena() {
DestroyDBIter();
arena_.~Arena();
}

Status ArenaWrappedDBIter::EnsureInternalIteratorInitialized(
const MultiScanArgs* scan_opts) {
if (internal_iter_initialized_) {
return Status::OK();
}
if (db_impl_ == nullptr || deferred_cfd_ == nullptr ||
deferred_sv_ == nullptr) {
Status s = Status::InvalidArgument(
"Internal iterator cannot be initialized without deferred DB state");
db_iter_->set_status(s);
db_iter_->set_valid(false);
return s;
}

const MultiScanArgs* pruning_scan_opts =
scan_opts != nullptr && scan_opts->HasBoundedScanRanges() ? scan_opts
: nullptr;
child_read_options_ = read_options_;
child_read_options_.snapshot = nullptr;
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
child_read_options_, deferred_cfd_, deferred_sv_, &arena_, sequence_,
/*allow_unprepared_value=*/true, this, pruning_scan_opts);
deferred_cfd_ = nullptr;
deferred_sv_ = nullptr;
SetIterUnderDBIterImpl(internal_iter);
return Status::OK();
}

void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t version_number,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) {
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem,
DBImpl* db_impl, ColumnFamilyData* cfd) {
read_options_ = read_options;
child_read_options_ = read_options;
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;

db_iter_ = DBIter::NewIter(
env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence,
read_callback, active_mem, cfh, expose_blob_index, &arena_);
if (cfh != nullptr) {
db_impl = cfh->db();
cfd = cfh->cfd();
}

db_iter_ = DBIter::NewIter(env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator,
/*internal_iter=*/nullptr, version, sequence,
read_callback, active_mem, /*cfh=*/nullptr,
expose_blob_index, &arena_, db_impl, cfd);

sv_number_ = version_number;
allow_refresh_ = allow_refresh;
Expand All @@ -65,13 +138,13 @@ void ArenaWrappedDBIter::Init(

void ArenaWrappedDBIter::MaybeAutoRefresh(bool is_seek,
DBIter::Direction direction) {
if (cfh_ != nullptr && read_options_.snapshot != nullptr && allow_refresh_ &&
read_options_.auto_refresh_iterator_with_snapshot) {
if (cfd_ref_ != nullptr && read_options_.snapshot != nullptr &&
allow_refresh_ && read_options_.auto_refresh_iterator_with_snapshot) {
// The intent here is to capture the superversion number change
// reasonably soon from the time it actually happened. As such,
// we're fine with weaker synchronization / ordering guarantees
// provided by relaxed atomic (in favor of less CPU / mem overhead).
uint64_t cur_sv_number = cfh_->cfd()->GetSuperVersionNumberRelaxed();
uint64_t cur_sv_number = cfd_ref_->GetSuperVersionNumberRelaxed();
if ((sv_number_ != cur_sv_number) && status().ok()) {
// Changing iterators' direction is pretty heavy-weight operation and
// could have unintended consequences when it comes to prefix seek.
Expand Down Expand Up @@ -150,37 +223,41 @@ void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
// present in the error log, but won't be reflected in the iterator status.
// This is by design as we expect compaction to clean up those obsolete files
// eventually.
db_iter_->~DBIter();

arena_.~Arena();
DestroyDBIterAndArena();
new (&arena_) Arena();

auto cfd = cfh_->cfd();
auto db_impl = cfh_->db();
auto cfd = cfd_ref_.get();
auto db_impl = db_impl_;
assert(cfd != nullptr);
assert(db_impl != nullptr);

SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
assert(sv->version_number >= sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
// TODO: Preserve Prepare() scan options across Refresh() so a refreshed
// MultiScan iterator can rebuild the same pruned tree.
Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current,
read_seq, sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr);
read_seq, sv->version_number, read_callback_, nullptr,
expose_blob_index_, allow_refresh_,
allow_mark_memtable_for_flush_ ? sv->mem : nullptr, db_impl, cfd);

InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
internal_iter_initialized_ = true;
}

Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfh_ == nullptr || !allow_refresh_) {
if (cfd_ref_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
auto cfd = cfh_->cfd();
auto db_impl = cfh_->db();
auto cfd = cfd_ref_.get();
auto db_impl = db_impl_;

// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
Expand All @@ -193,6 +270,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");

if (!internal_iter_initialized_) {
Status s = EnsureInternalIteratorInitialized(nullptr);
if (!s.ok()) {
return s;
}
}

while (true) {
if (sv_number_ != cur_sv_number) {
DoRefresh(snapshot, cur_sv_number);
Expand Down Expand Up @@ -250,25 +334,47 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
return Status::OK();
}

void ArenaWrappedDBIter::Prepare(const MultiScanArgs& scan_opts) {
if (prepare_called_) {
db_iter_->set_status(Status::InvalidArgument(
"Prepare called more than once on the same iterator"));
db_iter_->set_valid(false);
return;
}
prepare_called_ = true;

Status s = db_iter_->SetScanOptionsForPrepare(scan_opts);
if (!s.ok()) {
return;
}

const MultiScanArgs* pruning_scan_opts =
scan_opts.HasBoundedScanRanges() ? &scan_opts : nullptr;
s = EnsureInternalIteratorInitialized(pruning_scan_opts);
if (!s.ok()) {
return;
}

db_iter_->PrepareInternalChildren();
}

ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, const SequenceNumber& sequence,
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
bool allow_refresh, bool allow_mark_memtable_for_flush) {
ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter();
ColumnFamilyData* cfd = cfh->cfd();
db_iter->Init(env, read_options, cfh->cfd()->ioptions(),
sv->mutable_cf_options, sv->current, sequence,
sv->version_number, read_callback, cfh, expose_blob_index,
allow_refresh,
allow_mark_memtable_for_flush ? sv->mem : nullptr);
if (cfh != nullptr && allow_refresh) {
db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
if (allow_refresh && cfd != nullptr) {
db_iter->StoreRefreshInfo(read_callback, expose_blob_index);
}

InternalIterator* internal_iter = db_impl->NewInternalIterator(
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), sequence,
/*allow_unprepared_value=*/true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
db_iter->StoreDeferredInitInfo(db_impl, cfd, sv, sequence,
allow_mark_memtable_for_flush);

return db_iter;
}
Expand Down
Loading