diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 96441d5d303..2b8fc50c52e 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -30,13 +30,78 @@ inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) { Status ArenaWrappedDBIter::GetProperty(std::string prop_name, std::string* prop) { if (prop_name == "rocksdb.iterator.super-version-number") { + if (!internal_iter_initialized_) { + *prop = std::to_string(sv_number_); + return Status::OK(); + } // First try to pass the value returned from inner iterator. if (!db_iter_->GetProperty(prop_name, prop).ok()) { *prop = std::to_string(sv_number_); } return Status::OK(); } - return db_iter_->GetProperty(prop_name, prop); + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return db_iter_->status(); + } + return db_iter_->GetProperty(std::move(prop_name), prop); +} + +void ArenaWrappedDBIter::CleanupDeferredSuperVersion() { + if (deferred_sv_ != nullptr) { + assert(db_impl_ != nullptr); + db_impl_->CleanupIteratorSuperVersion( + deferred_sv_, read_options_.background_purge_on_iterator_cleanup); + deferred_sv_ = nullptr; + } +} + +void ArenaWrappedDBIter::DestroyDBIter() { + db_iter_->~DBIter(); + CleanupDeferredSuperVersion(); +} + +void ArenaWrappedDBIter::ColumnFamilyDataUnrefDeleter::operator()( + ColumnFamilyData* cfd) const { + if (cfd == nullptr) { + return; + } + assert(db_impl != nullptr); + + InstrumentedMutexLock lock(db_impl->mutex()); + cfd->UnrefAndTryDelete(); +} + +void ArenaWrappedDBIter::DestroyDBIterAndArena() { + DestroyDBIter(); + arena_.~Arena(); +} + +Status ArenaWrappedDBIter::EnsureInternalIteratorInitialized( + const MultiScanArgs* scan_opts) { + if (internal_iter_initialized_) { + return Status::OK(); + } + if (db_impl_ == nullptr || deferred_cfd_ == nullptr || + deferred_sv_ == nullptr) { + Status s = Status::InvalidArgument( + "Internal iterator cannot be initialized without deferred DB state"); + db_iter_->set_status(s); + db_iter_->set_valid(false); + return s; + } + + const MultiScanArgs* pruning_scan_opts = + scan_opts != nullptr && scan_opts->HasBoundedScanRanges() ? scan_opts + : nullptr; + child_read_options_ = read_options_; + child_read_options_.snapshot = nullptr; + InternalIterator* internal_iter = db_impl_->NewInternalIterator( + child_read_options_, deferred_cfd_, deferred_sv_, &arena_, sequence_, + /*allow_unprepared_value=*/true, this, pruning_scan_opts); + deferred_cfd_ = nullptr; + deferred_sv_ = nullptr; + SetIterUnderDBIterImpl(internal_iter); + return Status::OK(); } void ArenaWrappedDBIter::Init( @@ -44,18 +109,26 @@ void ArenaWrappedDBIter::Init( const MutableCFOptions& mutable_cf_options, const Version* version, const SequenceNumber& sequence, uint64_t version_number, ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, - bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) { + bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem, + DBImpl* db_impl, ColumnFamilyData* cfd) { read_options_ = read_options; + child_read_options_ = read_options; if (!CheckFSFeatureSupport(env->GetFileSystem().get(), FSSupportedOps::kAsyncIO)) { read_options_.async_io = false; } read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only; - db_iter_ = DBIter::NewIter( - env, read_options_, ioptions, mutable_cf_options, - ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence, - read_callback, active_mem, cfh, expose_blob_index, &arena_); + if (cfh != nullptr) { + db_impl = cfh->db(); + cfd = cfh->cfd(); + } + + db_iter_ = DBIter::NewIter(env, read_options_, ioptions, mutable_cf_options, + ioptions.user_comparator, + /*internal_iter=*/nullptr, version, sequence, + read_callback, active_mem, /*cfh=*/nullptr, + expose_blob_index, &arena_, db_impl, cfd); sv_number_ = version_number; allow_refresh_ = allow_refresh; @@ -65,13 +138,13 @@ void ArenaWrappedDBIter::Init( void ArenaWrappedDBIter::MaybeAutoRefresh(bool is_seek, DBIter::Direction direction) { - if (cfh_ != nullptr && read_options_.snapshot != nullptr && allow_refresh_ && - read_options_.auto_refresh_iterator_with_snapshot) { + if (cfd_ref_ != nullptr && read_options_.snapshot != nullptr && + allow_refresh_ && read_options_.auto_refresh_iterator_with_snapshot) { // The intent here is to capture the superversion number change // reasonably soon from the time it actually happened. As such, // we're fine with weaker synchronization / ordering guarantees // provided by relaxed atomic (in favor of less CPU / mem overhead). - uint64_t cur_sv_number = cfh_->cfd()->GetSuperVersionNumberRelaxed(); + uint64_t cur_sv_number = cfd_ref_->GetSuperVersionNumberRelaxed(); if ((sv_number_ != cur_sv_number) && status().ok()) { // Changing iterators' direction is pretty heavy-weight operation and // could have unintended consequences when it comes to prefix seek. @@ -150,13 +223,13 @@ void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot, // present in the error log, but won't be reflected in the iterator status. // This is by design as we expect compaction to clean up those obsolete files // eventually. - db_iter_->~DBIter(); - - arena_.~Arena(); + DestroyDBIterAndArena(); new (&arena_) Arena(); - auto cfd = cfh_->cfd(); - auto db_impl = cfh_->db(); + auto cfd = cfd_ref_.get(); + auto db_impl = db_impl_; + assert(cfd != nullptr); + assert(db_impl != nullptr); SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl); assert(sv->version_number >= sv_number); @@ -164,23 +237,27 @@ void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot, if (read_callback_) { read_callback_->Refresh(read_seq); } + // TODO: Preserve Prepare() scan options across Refresh() so a refreshed + // MultiScan iterator can rebuild the same pruned tree. Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current, - read_seq, sv->version_number, read_callback_, cfh_, expose_blob_index_, - allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr); + read_seq, sv->version_number, read_callback_, nullptr, + expose_blob_index_, allow_refresh_, + allow_mark_memtable_for_flush_ ? sv->mem : nullptr, db_impl, cfd); InternalIterator* internal_iter = db_impl->NewInternalIterator( read_options_, cfd, sv, &arena_, read_seq, /* allow_unprepared_value */ true, /* db_iter */ this); SetIterUnderDBIter(internal_iter); + internal_iter_initialized_ = true; } Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { - if (cfh_ == nullptr || !allow_refresh_) { + if (cfd_ref_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { return Status::NotSupported("Creating renew iterator is not allowed."); } assert(db_iter_ != nullptr); - auto cfd = cfh_->cfd(); - auto db_impl = cfh_->db(); + auto cfd = cfd_ref_.get(); + auto db_impl = db_impl_; // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the // correct behavior. Will be corrected automatically when we take a snapshot @@ -193,6 +270,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); + if (!internal_iter_initialized_) { + Status s = EnsureInternalIteratorInitialized(nullptr); + if (!s.ok()) { + return s; + } + } + while (true) { if (sv_number_ != cur_sv_number) { DoRefresh(snapshot, cur_sv_number); @@ -250,25 +334,47 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { return Status::OK(); } +void ArenaWrappedDBIter::Prepare(const MultiScanArgs& scan_opts) { + if (prepare_called_) { + db_iter_->set_status(Status::InvalidArgument( + "Prepare called more than once on the same iterator")); + db_iter_->set_valid(false); + return; + } + prepare_called_ = true; + + Status s = db_iter_->SetScanOptionsForPrepare(scan_opts); + if (!s.ok()) { + return; + } + + const MultiScanArgs* pruning_scan_opts = + scan_opts.HasBoundedScanRanges() ? &scan_opts : nullptr; + s = EnsureInternalIteratorInitialized(pruning_scan_opts); + if (!s.ok()) { + return; + } + + db_iter_->PrepareInternalChildren(); +} + ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh, SuperVersion* sv, const SequenceNumber& sequence, ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index, bool allow_refresh, bool allow_mark_memtable_for_flush) { ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter(); + ColumnFamilyData* cfd = cfh->cfd(); db_iter->Init(env, read_options, cfh->cfd()->ioptions(), sv->mutable_cf_options, sv->current, sequence, sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh, allow_mark_memtable_for_flush ? sv->mem : nullptr); - if (cfh != nullptr && allow_refresh) { - db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index); + if (allow_refresh && cfd != nullptr) { + db_iter->StoreRefreshInfo(read_callback, expose_blob_index); } - - InternalIterator* internal_iter = db_impl->NewInternalIterator( - db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), sequence, - /*allow_unprepared_value=*/true, db_iter); - db_iter->SetIterUnderDBIter(internal_iter); + db_iter->StoreDeferredInitInfo(db_impl, cfd, sv, sequence, + allow_mark_memtable_for_flush); return db_iter; } diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index 26062497a0b..bfae665ea62 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -10,6 +10,7 @@ #pragma once #include +#include #include #include "db/db_impl/db_impl.h" @@ -33,10 +34,17 @@ class Version; // When using the class's Iterator interface, the behavior is exactly // the same as the inner DBIter. class ArenaWrappedDBIter : public Iterator { + struct ColumnFamilyDataUnrefDeleter { + DBImpl* db_impl = nullptr; + void operator()(ColumnFamilyData* cfd) const; + }; + using ColumnFamilyDataRef = + std::unique_ptr; + public: ~ArenaWrappedDBIter() override { if (db_iter_ != nullptr) { - db_iter_->~DBIter(); + DestroyDBIter(); } else { assert(false); } @@ -51,7 +59,7 @@ class ArenaWrappedDBIter : public Iterator { // Set the internal iterator wrapped inside the DB Iterator. Usually it is // a merging iterator. virtual void SetIterUnderDBIter(InternalIterator* iter) { - db_iter_->SetIter(iter); + SetIterUnderDBIterImpl(iter); } void SetMemtableRangetombstoneIter( @@ -60,26 +68,48 @@ class ArenaWrappedDBIter : public Iterator { } bool Valid() const override { return db_iter_->Valid(); } - void SeekToFirst() override { db_iter_->SeekToFirst(); } - void SeekToLast() override { db_iter_->SeekToLast(); } + void SeekToFirst() override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } + db_iter_->SeekToFirst(); + } + void SeekToLast() override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } + db_iter_->SeekToLast(); + } // 'target' does not contain timestamp, even if user timestamp feature is // enabled. void Seek(const Slice& target) override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } MaybeAutoRefresh(true /* is_seek */, DBIter::kForward); db_iter_->Seek(target); } void SeekForPrev(const Slice& target) override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } MaybeAutoRefresh(true /* is_seek */, DBIter::kReverse); db_iter_->SeekForPrev(target); } void Next() override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } db_iter_->Next(); MaybeAutoRefresh(false /* is_seek */, DBIter::kForward); } void Prev() override { + if (!EnsureInternalIteratorInitialized(nullptr).ok()) { + return; + } db_iter_->Prev(); MaybeAutoRefresh(false /* is_seek */, DBIter::kReverse); } @@ -96,12 +126,13 @@ class ArenaWrappedDBIter : public Iterator { Status Refresh() override; Status Refresh(const Snapshot*) override; - bool PrepareValue() override { return db_iter_->PrepareValue(); } - - void Prepare(const MultiScanArgs& scan_opts) override { - db_iter_->Prepare(scan_opts); + bool PrepareValue() override { + return EnsureInternalIteratorInitialized(nullptr).ok() && + db_iter_->PrepareValue(); } + void Prepare(const MultiScanArgs& scan_opts) override; + // FIXME: we could just pass SV in for mutable cf option, version and version // number, but this is used by SstFileReader which does not have a SV. void Init(Env* env, const ReadOptions& read_options, @@ -110,27 +141,53 @@ class ArenaWrappedDBIter : public Iterator { const SequenceNumber& sequence, uint64_t version_number, ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh, - ReadOnlyMemTable* active_mem); + ReadOnlyMemTable* active_mem, DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr); - // Store some parameters so we can refresh the iterator at a later point - // with these same params - void StoreRefreshInfo(ColumnFamilyHandleImpl* cfh, - ReadCallback* read_callback, bool expose_blob_index) { - cfh_ = cfh; + // Store parameters used only by explicit/auto-refresh. + void StoreRefreshInfo(ReadCallback* read_callback, bool expose_blob_index) { read_callback_ = read_callback; expose_blob_index_ = expose_blob_index; } + void StoreDeferredInitInfo(DBImpl* db_impl, ColumnFamilyData* cfd, + SuperVersion* sv, const SequenceNumber& sequence, + bool allow_mark_memtable_for_flush) { + assert(cfd != nullptr); + db_impl_ = db_impl; + cfd->Ref(); + cfd_ref_ = ColumnFamilyDataRef(cfd, ColumnFamilyDataUnrefDeleter{db_impl}); + deferred_cfd_ = cfd; + deferred_sv_ = sv; + sequence_ = sequence; + allow_mark_memtable_for_flush_ = allow_mark_memtable_for_flush; + } + private: + Status EnsureInternalIteratorInitialized(const MultiScanArgs* scan_opts); + void SetIterUnderDBIterImpl(InternalIterator* iter) { + db_iter_->SetIter(iter); + internal_iter_initialized_ = true; + } + void CleanupDeferredSuperVersion(); + void DestroyDBIter(); + void DestroyDBIterAndArena(); void DoRefresh(const Snapshot* snapshot, uint64_t sv_number); void MaybeAutoRefresh(bool is_seek, DBIter::Direction direction); DBIter* db_iter_ = nullptr; Arena arena_; - uint64_t sv_number_; - ColumnFamilyHandleImpl* cfh_ = nullptr; + uint64_t sv_number_ = 0; + DBImpl* db_impl_ = nullptr; + ColumnFamilyDataRef cfd_ref_{nullptr, ColumnFamilyDataUnrefDeleter{}}; + ColumnFamilyData* deferred_cfd_ = nullptr; + SuperVersion* deferred_sv_ = nullptr; + SequenceNumber sequence_ = kMaxSequenceNumber; + bool internal_iter_initialized_ = false; + bool prepare_called_ = false; ReadOptions read_options_; - ReadCallback* read_callback_; + ReadOptions child_read_options_; + ReadCallback* read_callback_ = nullptr; bool expose_blob_index_ = false; bool allow_refresh_ = true; bool allow_mark_memtable_for_flush_ = true; diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index ed92dd843bc..ff7566cf56f 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -1482,10 +1482,13 @@ TEST_P(DBBlockCacheTypeTest, AddRedundantStats) { // Access just data, forcing redundant load+insert ReadOptions read_options; std::unique_ptr iter{db_->NewIterator(read_options)}; + ASSERT_OK(iter->Refresh()); + cache->SetNthLookupNotFound(1); iter->SeekToFirst(); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(iter->key(), "bar"); + ASSERT_EQ(iter->value(), "value"); EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD)); EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index b921834eec8..ea8a3feb5c7 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2494,30 +2494,8 @@ struct SuperVersionHandle { static void CleanupSuperVersionHandle(void* arg1, void* /*arg2*/) { SuperVersionHandle* sv_handle = static_cast(arg1); - if (sv_handle->super_version->Unref()) { - // Job id == 0 means that this is not our background process, but rather - // user thread - JobContext job_context(0); - - sv_handle->mu->Lock(); - sv_handle->super_version->Cleanup(); - sv_handle->db->FindObsoleteFiles(&job_context, false, true); - if (sv_handle->background_purge) { - sv_handle->db->ScheduleBgLogWriterClose(&job_context); - sv_handle->db->AddSuperVersionsToFreeQueue(sv_handle->super_version); - sv_handle->db->SchedulePurge(); - } - sv_handle->mu->Unlock(); - - if (!sv_handle->background_purge) { - delete sv_handle->super_version; - } - if (job_context.HaveSomethingToDelete()) { - sv_handle->db->PurgeObsoleteFiles(job_context, - sv_handle->background_purge); - } - job_context.Clean(); - } + sv_handle->db->CleanupIteratorSuperVersion(sv_handle->super_version, + sv_handle->background_purge); delete sv_handle; } @@ -2539,7 +2517,8 @@ static void CleanupGetMergeOperandsState(void* arg1, void* /*arg2*/) { InternalIterator* DBImpl::NewInternalIterator( const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena, SequenceNumber sequence, - bool allow_unprepared_value, ArenaWrappedDBIter* db_iter) { + bool allow_unprepared_value, ArenaWrappedDBIter* db_iter, + const MultiScanArgs* scan_opts) { InternalIterator* internal_iter; assert(arena != nullptr); auto prefix_extractor = @@ -2551,47 +2530,62 @@ InternalIterator* DBImpl::NewInternalIterator( // here, and no unit test cares about the value provided here. !read_options.total_order_seek && prefix_extractor != nullptr, read_options.iterate_upper_bound); - // Collect iterator for mutable memtable - auto mem_iter = super_version->mem->NewIterator( - read_options, super_version->GetSeqnoToTimeMapping(), arena, - super_version->mutable_cf_options.prefix_extractor.get(), - /*for_flush=*/false); Status s; - if (!read_options.ignore_range_deletions) { - std::unique_ptr mem_tombstone_iter; - auto range_del_iter = super_version->mem->NewRangeTombstoneIterator( - read_options, sequence, false /* immutable_memtable */); - if (range_del_iter == nullptr || range_del_iter->empty()) { - delete range_del_iter; + const Comparator* user_comparator = cfd->user_comparator(); + const bool mem_intersects = + !super_version->mem->IsEmpty() && + MultiScanIntersectsMemTable(super_version->mem, read_options, + super_version->GetSeqnoToTimeMapping(), + prefix_extractor, scan_opts, user_comparator); + if (scan_opts == nullptr || mem_intersects) { + // Collect iterator for mutable memtable + auto mem_iter = super_version->mem->NewIterator( + read_options, super_version->GetSeqnoToTimeMapping(), arena, + super_version->mutable_cf_options.prefix_extractor.get(), + /*for_flush=*/false); + if (!read_options.ignore_range_deletions) { + std::unique_ptr mem_tombstone_iter; + std::unique_ptr range_del_iter( + super_version->mem->NewRangeTombstoneIterator( + read_options, sequence, false /* immutable_memtable */)); + if (range_del_iter != nullptr && !range_del_iter->empty()) { + mem_tombstone_iter = std::make_unique( + std::move(range_del_iter), &cfd->ioptions().internal_comparator, + nullptr /* smallest */, nullptr /* largest */); + } + merge_iter_builder.AddPointAndTombstoneIterator( + mem_iter, std::move(mem_tombstone_iter)); } else { - mem_tombstone_iter = std::make_unique( - std::unique_ptr(range_del_iter), - &cfd->ioptions().internal_comparator, nullptr /* smallest */, - nullptr /* largest */); + merge_iter_builder.AddIterator(mem_iter); } - merge_iter_builder.AddPointAndTombstoneIterator( - mem_iter, std::move(mem_tombstone_iter)); - } else { - merge_iter_builder.AddIterator(mem_iter); + } else if (scan_opts != nullptr) { + merge_iter_builder.SetMemtablePruned(true); } - // Collect all needed child iterators for immutable memtables - if (s.ok()) { + if (s.ok() && + (scan_opts == nullptr || super_version->imm->GetTotalNumEntries() > 0)) { + // Collect all needed child iterators for immutable memtables. When scan + // ranges are provided, AddIterators prunes each immutable memtable + // individually. super_version->imm->AddIterators( read_options, super_version->GetSeqnoToTimeMapping(), super_version->mutable_cf_options.prefix_extractor.get(), - &merge_iter_builder, !read_options.ignore_range_deletions); + &merge_iter_builder, !read_options.ignore_range_deletions, sequence, + scan_opts, user_comparator); } TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s); if (s.ok()) { // Collect iterators for files in L0 - Ln if (read_options.read_tier != kMemtableTier) { - super_version->current->AddIterators(read_options, file_options_, - &merge_iter_builder, - allow_unprepared_value); + super_version->current->AddIterators( + read_options, file_options_, &merge_iter_builder, + allow_unprepared_value, sequence, scan_opts); } internal_iter = merge_iter_builder.Finish( read_options.ignore_range_deletions ? nullptr : db_iter); + if (internal_iter == nullptr) { + internal_iter = NewEmptyInternalIterator(arena); + } SuperVersionHandle* cleanup = new SuperVersionHandle( this, &mutex_, super_version, read_options.background_purge_on_iterator_cleanup || @@ -2605,6 +2599,35 @@ InternalIterator* DBImpl::NewInternalIterator( return NewErrorInternalIterator(s, arena); } +void DBImpl::CleanupIteratorSuperVersion(SuperVersion* super_version, + bool background_purge) { + background_purge = + background_purge || immutable_db_options_.avoid_unnecessary_blocking_io; + if (super_version->Unref()) { + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); + + mutex_.Lock(); + super_version->Cleanup(); + FindObsoleteFiles(&job_context, false, true); + if (background_purge) { + ScheduleBgLogWriterClose(&job_context); + AddSuperVersionsToFreeQueue(super_version); + SchedulePurge(); + } + mutex_.Unlock(); + + if (!background_purge) { + delete super_version; + } + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context, background_purge); + } + job_context.Clean(); + } +} + ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { return default_cf_handle_; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 66616d2bfaf..1dc071d9ed1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -857,12 +857,22 @@ class DBImpl : public DB { // memtable range tombstone iterator used by the underlying merging iterator. // This range tombstone iterator can be refreshed later by db_iter. // @param read_options Must outlive the returned iterator. - InternalIterator* NewInternalIterator(const ReadOptions& read_options, - ColumnFamilyData* cfd, - SuperVersion* super_version, - Arena* arena, SequenceNumber sequence, - bool allow_unprepared_value, - ArenaWrappedDBIter* db_iter = nullptr); + // @param sequence The snapshot sequence captured when the DB iterator was + // created. Child iterators must use this instead of dereferencing + // read_options.snapshot, which may be released before lazy initialization. + // @param scan_opts Optional bounded scan ranges used only to prune the + // iterator tree during lazy Prepare() initialization. + InternalIterator* NewInternalIterator( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SuperVersion* super_version, Arena* arena, SequenceNumber sequence, + bool allow_unprepared_value, ArenaWrappedDBIter* db_iter = nullptr, + const MultiScanArgs* scan_opts = nullptr); + + // Release a SuperVersion held by an iterator. This preserves the cleanup + // behavior used by materialized internal iterators even when the DB iterator + // never needed to lazily build its child iterator tree. + void CleanupIteratorSuperVersion(SuperVersion* super_version, + bool background_purge); LogsWithPrepTracker* logs_with_prep_tracker() { return &logs_with_prep_tracker_; diff --git a/db/db_iter.cc b/db/db_iter.cc index 656e2096a27..798acd88f95 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -67,8 +67,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, const MutableCFOptions& mutable_cf_options, const Comparator* cmp, InternalIterator* iter, const Version* version, SequenceNumber s, bool arena_mode, - ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, - bool expose_blob_index, ReadOnlyMemTable* active_mem) + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool expose_blob_index, + ReadOnlyMemTable* active_mem) : prefix_extractor_(mutable_cf_options.prefix_extractor.get()), env_(_env), clock_(ioptions.clock), @@ -76,21 +77,26 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, user_comparator_(cmp), merge_operator_(ioptions.merge_operator.get()), iter_(iter), - blob_state_( - version, read_options.read_tier, read_options.verify_checksums, - read_options.fill_cache, read_options.io_activity, - cfh ? cfh->cfd()->blob_file_cache() : nullptr, - cfh != nullptr && cfh->cfd()->blob_partition_manager() != nullptr), + blob_state_(version, read_options.read_tier, + read_options.verify_checksums, read_options.fill_cache, + read_options.io_activity, + cfd ? cfd->blob_file_cache() : nullptr, + cfd != nullptr && cfd->blob_partition_manager() != nullptr), read_callback_(read_callback), sequence_(s), - value_columns_state_(version, read_options, cfh), + value_columns_state_(version, read_options, cfd), statistics_(ioptions.stats), max_skip_(mutable_cf_options.max_sequential_skip_in_iterations), max_skippable_internal_keys_(read_options.max_skippable_internal_keys), num_internal_keys_skipped_(0), iterate_lower_bound_(read_options.iterate_lower_bound), iterate_upper_bound_(read_options.iterate_upper_bound), - cfh_(cfh), + trace_db_(db_impl), + trace_cf_id_(cfd != nullptr ? cfd->GetID() : 0), + has_trace_state_(db_impl != nullptr && cfd != nullptr), + allow_blob_write_path_fallback_(cfd != nullptr && + cfd->blob_partition_manager() != nullptr), + ingest_sst_lock_(cfd != nullptr ? &cfd->GetIngestSstLock() : nullptr), timestamp_ub_(read_options.timestamp), timestamp_lb_(read_options.iter_start_ts), timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0), @@ -305,10 +311,8 @@ bool DBIter::SetValueAndColumnsFromBlobImpl(const Slice& user_key, // Keep the non-BDW iterator path on the pre-existing Version::GetBlob() // fast path. Only enable the direct-write fallback when this CF actually // has a write-path partition manager. - const bool allow_write_path_fallback = - cfh_ != nullptr && cfh_->cfd()->blob_partition_manager() != nullptr; const Status s = blob_state_.mut()->reader.RetrieveAndSetBlobValue( - user_key, blob_index, allow_write_path_fallback); + user_key, blob_index, allow_blob_write_path_fallback_); if (!s.ok()) { status_ = s; valid_ = false; @@ -1617,10 +1621,8 @@ bool DBIter::MergeWithBlobBaseValue(const Slice& blob_index, return false; } - const bool allow_write_path_fallback = - cfh_ != nullptr && cfh_->cfd()->blob_partition_manager() != nullptr; const Status s = blob_state_.mut()->reader.RetrieveAndSetBlobValue( - user_key, blob_index, allow_write_path_fallback); + user_key, blob_index, allow_blob_write_path_fallback_); if (!s.ok()) { status_ = s; valid_ = false; @@ -1816,10 +1818,10 @@ void DBIter::MaybeInsertRangeTombstone(const Slice& end_key) { } } - assert(cfh_ != nullptr); + assert(ingest_sst_lock_ != nullptr); if (active_mem_->AddLogicallyRedundantRangeTombstone( insert_seq, range_tomb_first_key_.GetUserKey(), end_key, - cfh_->cfd()->GetIngestSstLock())) { + *ingest_sst_lock_)) { RecordTick(statistics_, READ_PATH_RANGE_TOMBSTONES_INSERTED); ROCKS_LOG_DEBUG(logger_, "Inserted range tombstone [%s, %s) @ seq %" PRIu64 @@ -1959,10 +1961,10 @@ Status DBIter::ValidateScanOptions(const MultiScanArgs& multiscan_opts) const { return Status::OK(); } -void DBIter::Prepare(const MultiScanArgs& scan_opts) { +Status DBIter::SetScanOptionsForPrepare(const MultiScanArgs& scan_opts) { status_ = ValidateScanOptions(scan_opts); if (!status_.ok()) { - return; + return status_; } std::optional new_scan_opts; new_scan_opts.emplace(scan_opts); @@ -1975,14 +1977,27 @@ void DBIter::Prepare(const MultiScanArgs& scan_opts) { if (!scan_opts_.value().io_dispatcher) { scan_opts_->io_dispatcher.reset(NewIODispatcher()); } + return Status::OK(); +} - if (!scan_opts.empty()) { +void DBIter::PrepareInternalChildren() { + if (!scan_opts_.has_value() || !status_.ok()) { + return; + } + + if (scan_opts_.value().HasBoundedScanRanges()) { iter_.Prepare(&scan_opts_.value()); } else { iter_.Prepare(nullptr); } } +void DBIter::Prepare(const MultiScanArgs& scan_opts) { + if (SetScanOptionsForPrepare(scan_opts).ok()) { + PrepareInternalChildren(); + } +} + void DBIter::Seek(const Slice& target) { PERF_COUNTER_ADD(iter_seek_count, 1); PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_); @@ -2029,7 +2044,7 @@ void DBIter::Seek(const Slice& target) { scan_index_++; } - if (cfh_ != nullptr) { + if (has_trace_state_) { // TODO: What do we do if this returns an error? Slice lower_bound, upper_bound; if (iterate_lower_bound_ != nullptr) { @@ -2042,9 +2057,7 @@ void DBIter::Seek(const Slice& target) { } else { upper_bound = Slice(""); } - cfh_->db() - ->TraceIteratorSeek(cfh_->cfd()->GetID(), target, lower_bound, - upper_bound) + trace_db_->TraceIteratorSeek(trace_cf_id_, target, lower_bound, upper_bound) .PermitUncheckedError(); } @@ -2097,7 +2110,7 @@ void DBIter::SeekForPrev(const Slice& target) { PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_); StopWatch sw(clock_, statistics_, DB_SEEK); - if (cfh_ != nullptr) { + if (has_trace_state_) { // TODO: What do we do if this returns an error? Slice lower_bound, upper_bound; if (iterate_lower_bound_ != nullptr) { @@ -2110,8 +2123,8 @@ void DBIter::SeekForPrev(const Slice& target) { } else { upper_bound = Slice(""); } - cfh_->db() - ->TraceIteratorSeekForPrev(cfh_->cfd()->GetID(), target, lower_bound, + trace_db_ + ->TraceIteratorSeekForPrev(trace_cf_id_, target, lower_bound, upper_bound) .PermitUncheckedError(); } diff --git a/db/db_iter.h b/db/db_iter.h index 955cb6e57c1..c9f2fbe174c 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -32,6 +32,9 @@ namespace ROCKSDB_NAMESPACE { class BlobFileCache; class Version; +namespace port { +class RWMutex; +} // This file declares the factory functions of DBIter, in its original form // or a wrapped form with class ArenaWrappedDBIter, which is defined here. @@ -83,14 +86,19 @@ class DBIter final : public Iterator { ReadCallback* read_callback, ReadOnlyMemTable* active_mem, ColumnFamilyHandleImpl* cfh = nullptr, - bool expose_blob_index = false, - Arena* arena = nullptr) { + bool expose_blob_index = false, Arena* arena = nullptr, + DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr) { + if (cfh != nullptr) { + db_impl = cfh->db(); + cfd = cfh->cfd(); + } void* mem = arena ? arena->AllocateAligned(sizeof(DBIter)) : operator new(sizeof(DBIter)); DBIter* db_iter = new (mem) DBIter(env, read_options, ioptions, mutable_cf_options, user_key_comparator, internal_iter, version, sequence, arena, - read_callback, cfh, expose_blob_index, active_mem); + read_callback, db_impl, cfd, expose_blob_index, active_mem); return db_iter; } @@ -207,7 +215,7 @@ class DBIter final : public Iterator { } Status status() const override { - if (status_.ok()) { + if (status_.ok() && iter_.iter() != nullptr) { return iter_.status(); } else { assert(!valid_); @@ -248,19 +256,22 @@ class DBIter final : public Iterator { iter_.SetRangeDelReadSeqno(s); } void set_valid(bool v) { valid_ = v; } + void set_status(Status s) { status_ = std::move(s); } bool PrepareValue() override; void Prepare(const MultiScanArgs& scan_opts) override; Status ValidateScanOptions(const MultiScanArgs& multiscan_opts) const; + Status SetScanOptionsForPrepare(const MultiScanArgs& scan_opts); + void PrepareInternalChildren(); private: DBIter(Env* _env, const ReadOptions& read_options, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, const Comparator* cmp, InternalIterator* iter, const Version* version, SequenceNumber s, - bool arena_mode, ReadCallback* read_callback, - ColumnFamilyHandleImpl* cfh, bool expose_blob_index, + bool arena_mode, ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool expose_blob_index, ReadOnlyMemTable* active_mem); class BlobReader { @@ -318,13 +329,12 @@ class DBIter final : public Iterator { class ValueColumnsState { public: ValueColumnsState(const Version* version, const ReadOptions& read_options, - ColumnFamilyHandleImpl* cfh) + ColumnFamilyData* cfd) : entity_blob_resolver_( version, read_options.read_tier, read_options.verify_checksums, read_options.fill_cache, read_options.io_activity, - cfh ? cfh->cfd()->blob_file_cache() : nullptr, - cfh != nullptr && - cfh->cfd()->blob_partition_manager() != nullptr) {} + cfd ? cfd->blob_file_cache() : nullptr, + cfd != nullptr && cfd->blob_partition_manager() != nullptr) {} Slice& value() { return value_; } const Slice& value() const { return value_; } @@ -706,7 +716,11 @@ class DBIter final : public Iterator { MergeContext merge_context_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; - ColumnFamilyHandleImpl* cfh_; + DBImpl* trace_db_; + uint32_t trace_cf_id_; + bool has_trace_state_; + bool allow_blob_write_path_fallback_; + port::RWMutex* ingest_sst_lock_; const Slice* const timestamp_ub_; const Slice* const timestamp_lb_; const size_t timestamp_size_; diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7a08b8d6261..c9da740ef2b 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -10,6 +10,9 @@ #include #include #include +#include +#include +#include #include "db/arena_wrapped_db_iter.h" #include "db/db_iter.h" @@ -87,6 +90,392 @@ TEST_F(DBIteratorBaseTest, APICallsWithPerfContext) { delete iter; } +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanPrunesNonIntersectingFiles) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("z", "vz")); + ASSERT_OK(Flush()); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + + int table_iterators_created = 0; + int files_added = 0; + int block_based_iterators = 0; + int level_iterators = 0; + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator::BeforeFindTable", + [&](void* /*arg*/) { ++table_iterators_created; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:AddedFile", + [&](void* /*arg*/) { ++files_added; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:IteratorType", [&](void* arg) { + auto* iterator_type = static_cast*>(arg); + if (iterator_type->first) { + ++block_based_iterators; + } + if (iterator_type->second) { + ++level_iterators; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("b")); + Slice upper_bound("b"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(0, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(0, level_iterators); + iter->Prepare(scan_opts); + ASSERT_EQ(1, table_iterators_created); + ASSERT_EQ(1, files_added); + ASSERT_EQ(1, block_based_iterators); + ASSERT_EQ(0, level_iterators); + std::vector keys; + for (iter->Seek("a"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"a"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanPrunesNonIntersectingLevels) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("z", "vz")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Put("m", "vm")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + int table_iterators_created = 0; + int files_added = 0; + int block_based_iterators = 0; + int level_iterators = 0; + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator::BeforeFindTable", + [&](void* /*arg*/) { ++table_iterators_created; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:AddedFile", + [&](void* /*arg*/) { ++files_added; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:IteratorType", [&](void* arg) { + auto* iterator_type = static_cast*>(arg); + if (iterator_type->first) { + ++block_based_iterators; + } + if (iterator_type->second) { + ++level_iterators; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("n")); + Slice upper_bound("n"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(0, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(0, level_iterators); + iter->Prepare(scan_opts); + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(2, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(1, level_iterators); + std::vector keys; + for (iter->Seek("a"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"a", "m"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanAllowsSingleUnboundedRange) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Put("b", "vb")); + ASSERT_OK(Put("c", "vc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("b")); + ReadOptions read_options; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + iter->Prepare(scan_opts); + std::vector keys; + for (iter->Seek("b"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"b", "c"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanRejectsRepeatedPrepare) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("b")); + Slice upper_bound("b"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + iter->Prepare(scan_opts); + ASSERT_OK(iter->status()); + iter->Prepare(scan_opts); + ASSERT_NOK(iter->status()); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanDedupsMultipleRangesInSameFile) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Put("m", "vm")); + ASSERT_OK(Put("z", "vz")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + int table_iterators_created = 0; + int files_added = 0; + int block_based_iterators = 0; + int level_iterators = 0; + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator::BeforeFindTable", + [&](void* /*arg*/) { ++table_iterators_created; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:AddedFile", + [&](void* /*arg*/) { ++files_added; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:IteratorType", [&](void* arg) { + auto* iterator_type = static_cast*>(arg); + if (iterator_type->first) { + ++block_based_iterators; + } + if (iterator_type->second) { + ++level_iterators; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("b")); + scan_opts.insert(Slice("m"), Slice("n")); + Slice upper_bound("b"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(0, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(0, level_iterators); + iter->Prepare(scan_opts); + ASSERT_EQ(1, table_iterators_created); + ASSERT_EQ(1, files_added); + ASSERT_EQ(1, block_based_iterators); + ASSERT_EQ(0, level_iterators); + + std::vector keys; + for (iter->Seek("a"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(keys, std::vector({"a"})); + + upper_bound = "n"; + for (iter->Seek("m"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"a", "m"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanPrunesOverlappingL0Files) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("m", "vm")); + ASSERT_OK(Flush()); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + + int table_iterators_created = 0; + int files_added = 0; + int block_based_iterators = 0; + int level_iterators = 0; + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator::BeforeFindTable", + [&](void* /*arg*/) { ++table_iterators_created; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:AddedFile", + [&](void* /*arg*/) { ++files_added; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:IteratorType", [&](void* arg) { + auto* iterator_type = static_cast*>(arg); + if (iterator_type->first) { + ++block_based_iterators; + } + if (iterator_type->second) { + ++level_iterators; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("n")); + Slice upper_bound("n"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(0, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(0, level_iterators); + iter->Prepare(scan_opts); + ASSERT_EQ(2, table_iterators_created); + ASSERT_EQ(2, files_added); + ASSERT_EQ(2, block_based_iterators); + ASSERT_EQ(0, level_iterators); + std::vector keys; + for (iter->Seek("a"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"a", "m"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBIteratorBaseTest, PrepareWithMultiScanPrunesNonIntersectingMemTables) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_OK(db_->PauseBackgroundWork()); + ASSERT_OK(Put("z_imm", "vz")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + ASSERT_OK(Put("z_mem", "vz")); + + int table_iterators_created = 0; + int files_added = 0; + int block_based_iterators = 0; + int level_iterators = 0; + int merging_iterators = 0; + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator::BeforeFindTable", + [&](void* /*arg*/) { ++table_iterators_created; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:AddedFile", + [&](void* /*arg*/) { ++files_added; }); + SyncPoint::GetInstance()->SetCallBack( + "Version::AddIteratorsForLevel:IteratorType", [&](void* arg) { + auto* iterator_type = static_cast*>(arg); + if (iterator_type->first) { + ++block_based_iterators; + } + if (iterator_type->second) { + ++level_iterators; + } + }); + SyncPoint::GetInstance()->SetCallBack( + "MergeIteratorBuilder::Finish:UseMergingIterator", [&](void* arg) { + if (*static_cast(arg)) { + ++merging_iterators; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + MultiScanArgs scan_opts(BytewiseComparator()); + scan_opts.insert(Slice("a"), Slice("b")); + Slice upper_bound("b"); + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + std::unique_ptr iter( + db_->NewIterator(read_options, db_->DefaultColumnFamily())); + + ASSERT_EQ(0, table_iterators_created); + ASSERT_EQ(0, files_added); + ASSERT_EQ(0, block_based_iterators); + ASSERT_EQ(0, level_iterators); + ASSERT_EQ(0, merging_iterators); + iter->Prepare(scan_opts); + ASSERT_EQ(1, table_iterators_created); + ASSERT_EQ(1, files_added); + ASSERT_EQ(1, block_based_iterators); + ASSERT_EQ(0, level_iterators); + ASSERT_EQ(0, merging_iterators); + std::vector keys; + for (iter->Seek("a"); iter->Valid(); iter->Next()) { + keys.push_back(iter->key().ToString()); + } + ASSERT_EQ(keys, std::vector({"a"})); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(db_->ContinueBackgroundWork()); +} + // Test param: // bool: whether to pass read_callback to NewIterator(). class DBIteratorTest : public DBIteratorBaseTest, @@ -945,6 +1334,68 @@ TEST_P(DBIteratorTest, IteratorDeleteAfterCfDelete) { delete iter; } +TEST_P(DBIteratorTest, IteratorSeekAfterCfDelete) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + + ASSERT_OK(Put(1, "foo", "delete-cf-then-seek-iter")); + ASSERT_OK(Put(1, "hello", "value2")); + + ColumnFamilyHandle* cf = handles_[1]; + ReadOptions ro; + + auto* iter = db_->NewIterator(ro, cf); + + // Delete the CF handle before the lazy iterator tree is materialized. + EXPECT_OK(db_->DestroyColumnFamilyHandle(cf)); + handles_.erase(std::begin(handles_) + 1); + + iter->Seek("foo"); + ASSERT_EQ(IterStatus(iter), "foo->delete-cf-then-seek-iter"); + iter->SeekForPrev("hello"); + ASSERT_EQ(IterStatus(iter), "hello->value2"); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "foo->delete-cf-then-seek-iter"); + iter->Next(); + ASSERT_EQ(IterStatus(iter), "hello->value2"); + delete iter; +} + +TEST_P(DBIteratorTest, IteratorAutoRefreshAfterCfDeleteBeforeLazyInit) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + + ASSERT_OK(Put(1, "foo", "delete-cf-then-auto-refresh")); + ASSERT_OK(Flush(1)); + + ColumnFamilyHandle* cf = handles_[1]; + const Snapshot* snapshot = db_->GetSnapshot(); + + ReadOptions ro; + ro.snapshot = snapshot; + ro.auto_refresh_iterator_with_snapshot = true; + + auto* iter = db_->NewIterator(ro, cf); + + ASSERT_OK(Put(1, "zzz", "after-snapshot")); + ASSERT_OK(Flush(1)); + + // Delete the CF handle before the lazy iterator tree is materialized. The + // following operations force auto-refresh to acquire a newer SuperVersion. + EXPECT_OK(db_->DestroyColumnFamilyHandle(cf)); + handles_.erase(std::begin(handles_) + 1); + + iter->Seek("foo"); + ASSERT_EQ(IterStatus(iter), "foo->delete-cf-then-auto-refresh"); + ASSERT_OK(iter->status()); + iter->SeekForPrev("foo"); + ASSERT_EQ(IterStatus(iter), "foo->delete-cf-then-auto-refresh"); + ASSERT_OK(iter->status()); + iter->Next(); + ASSERT_OK(iter->status()); + + delete iter; + db_->ReleaseSnapshot(snapshot); +} + TEST_P(DBIteratorTest, IteratorDeleteAfterCfDrop) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -966,6 +1417,30 @@ TEST_P(DBIteratorTest, IteratorDeleteAfterCfDrop) { delete iter; } +TEST_P(DBIteratorTest, IteratorSeekAfterCfDrop) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + + ASSERT_OK(Put(1, "foo", "drop-cf-then-seek-iter")); + + ReadOptions ro; + ColumnFamilyHandle* cf = handles_[1]; + + auto* iter = db_->NewIterator(ro, cf); + + // Drop and delete the CF before the lazy iterator tree is materialized. + EXPECT_OK(db_->DropColumnFamily(cf)); + EXPECT_OK(db_->DestroyColumnFamilyHandle(cf)); + handles_.erase(std::begin(handles_) + 1); + + iter->Seek("foo"); + ASSERT_EQ(IterStatus(iter), "foo->drop-cf-then-seek-iter"); + iter->SeekForPrev("foo"); + ASSERT_EQ(IterStatus(iter), "foo->drop-cf-then-seek-iter"); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "foo->drop-cf-then-seek-iter"); + delete iter; +} + // SetOptions not defined in ROCKSDB LITE TEST_P(DBIteratorTest, DBIteratorBoundTest) { Options options = CurrentOptions(); @@ -2697,6 +3172,8 @@ TEST_P(DBIteratorTest, CreationFailure) { Iterator* iter = NewIterator(ReadOptions()); ASSERT_FALSE(iter->Valid()); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); ASSERT_TRUE(iter->status().IsCorruption()); delete iter; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 9e99c74db6e..277401e2087 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -17,6 +17,7 @@ #include "db/version_set.h" #include "logging/log_buffer.h" #include "logging/logging.h" +#include "memory/arena.h" #include "monitoring/thread_status_util.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -31,6 +32,78 @@ class InternalKeyComparator; class Mutex; class VersionSet; +bool MultiScanOverlapsUserKeyRange(const MultiScanArgs* scan_opts, + const Comparator* user_comparator, + const Slice& smallest_user_key, + const Slice& largest_user_key) { + if (scan_opts == nullptr || !scan_opts->HasBoundedScanRanges()) { + return true; + } + + for (const ScanOptions& scan_range : scan_opts->GetScanRanges()) { + if (user_comparator->CompareWithoutTimestamp( + scan_range.range.limit.value(), /*a_has_ts=*/false, + smallest_user_key, /*b_has_ts=*/true) <= 0) { + continue; + } + if (user_comparator->CompareWithoutTimestamp( + scan_range.range.start.value(), /*a_has_ts=*/false, + largest_user_key, /*b_has_ts=*/true) > 0) { + continue; + } + return true; + } + return false; +} + +bool MultiScanIteratorOverlapsUserKeyRange(InternalIterator* iter, + const MultiScanArgs* scan_opts, + const Comparator* user_comparator) { + iter->SeekToFirst(); + if (!iter->status().ok()) { + return true; + } + if (!iter->Valid()) { + return true; + } + const std::string smallest_user_key = ExtractUserKey(iter->key()).ToString(); + iter->SeekToLast(); + if (!iter->status().ok()) { + return true; + } + if (!iter->Valid()) { + return true; + } + const std::string largest_user_key = ExtractUserKey(iter->key()).ToString(); + return MultiScanOverlapsUserKeyRange(scan_opts, user_comparator, + smallest_user_key, largest_user_key); +} + +bool MultiScanIntersectsMemTable( + ReadOnlyMemTable* memtable, const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, + const SliceTransform* prefix_extractor, const MultiScanArgs* scan_opts, + const Comparator* user_comparator) { + if (scan_opts == nullptr || !scan_opts->HasBoundedScanRanges()) { + return true; + } + if (memtable->NumRangeDeletion() > 0) { + return true; + } + + Arena arena; + ReadOptions intersect_read_options(read_options); + intersect_read_options.total_order_seek = true; + intersect_read_options.iterate_lower_bound = nullptr; + intersect_read_options.iterate_upper_bound = nullptr; + InternalIterator* iter = + memtable->NewIterator(intersect_read_options, seqno_to_time_mapping, + &arena, prefix_extractor, /*for_flush=*/false); + ScopedArenaPtr iter_guard(iter); + return MultiScanIteratorOverlapsUserKeyRange(iter, scan_opts, + user_comparator); +} + void MemTableListVersion::AddMemTable(ReadOnlyMemTable* m) { if (!memlist_.empty()) { // ID can be equal for MemPurge @@ -229,33 +302,42 @@ void MemTableListVersion::AddIterators( const ReadOptions& options, UnownedPtr seqno_to_time_mapping, const SliceTransform* prefix_extractor, - MergeIteratorBuilder* merge_iter_builder, bool add_range_tombstone_iter) { + MergeIteratorBuilder* merge_iter_builder, bool add_range_tombstone_iter, + SequenceNumber read_seq, const MultiScanArgs* scan_opts, + const Comparator* user_comparator) { for (auto& m : memlist_) { + const bool should_probe_scan_intersection = + scan_opts != nullptr && scan_opts->HasBoundedScanRanges() && + m->NumRangeDeletion() == 0; + if (should_probe_scan_intersection && + !MultiScanIntersectsMemTable(m, options, seqno_to_time_mapping, + prefix_extractor, scan_opts, + user_comparator)) { + continue; + } auto mem_iter = m->NewIterator(options, seqno_to_time_mapping, merge_iter_builder->GetArena(), prefix_extractor, /*for_flush=*/false); + ScopedArenaPtr mem_iter_guard(mem_iter); if (!add_range_tombstone_iter || options.ignore_range_deletions) { - merge_iter_builder->AddIterator(mem_iter); + merge_iter_builder->AddIterator(mem_iter_guard.release()); } else { - // Except for snapshot read, using kMaxSequenceNumber is OK because these - // are immutable memtables. - SequenceNumber read_seq = options.snapshot != nullptr - ? options.snapshot->GetSequenceNumber() - : kMaxSequenceNumber; + const SequenceNumber range_del_read_seq = + read_seq != kMaxSequenceNumber || options.snapshot == nullptr + ? read_seq + : options.snapshot->GetSequenceNumber(); std::unique_ptr mem_tombstone_iter; - auto range_del_iter = m->NewRangeTombstoneIterator( - options, read_seq, true /* immutale_memtable */); - if (range_del_iter == nullptr || range_del_iter->empty()) { - delete range_del_iter; - } else { + std::unique_ptr range_del_iter( + m->NewRangeTombstoneIterator(options, range_del_read_seq, + true /* immutale_memtable */)); + if (range_del_iter != nullptr && !range_del_iter->empty()) { mem_tombstone_iter = std::make_unique( - std::unique_ptr(range_del_iter), - &m->GetInternalKeyComparator(), nullptr /* smallest */, - nullptr /* largest */); + std::move(range_del_iter), &m->GetInternalKeyComparator(), + nullptr /* smallest */, nullptr /* largest */); } merge_iter_builder->AddPointAndTombstoneIterator( - mem_iter, std::move(mem_tombstone_iter)); + mem_iter_guard.release(), std::move(mem_tombstone_iter)); } } } diff --git a/db/memtable_list.h b/db/memtable_list.h index a8e36550a53..468a1c33d0a 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -34,6 +34,21 @@ class MemTableList; struct FlushJobInfo; +// Returns true when at least one bounded scan range overlaps the user-key +// range. Unbounded scan options conservatively overlap. +bool MultiScanOverlapsUserKeyRange(const MultiScanArgs* scan_opts, + const Comparator* user_comparator, + const Slice& smallest_user_key, + const Slice& largest_user_key); + +// Returns true when the memtable may contain keys or range deletions relevant +// to the scan ranges. Iterator/status errors conservatively overlap. +bool MultiScanIntersectsMemTable( + ReadOnlyMemTable* memtable, const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, + const SliceTransform* prefix_extractor, const MultiScanArgs* scan_opts, + const Comparator* user_comparator); + // keeps a list of immutable memtables (ReadOnlyMemtable*) in a vector. // The list is immutable if refcount is bigger than one. It is used as // a state for Get() and iterator code paths. @@ -122,11 +137,17 @@ class MemTableListVersion { std::vector* iterator_list, Arena* arena); + // read_seq controls range tombstone visibility. It may be captured before + // lazy iterator initialization. scan_opts can prune immutable memtables that + // cannot intersect the requested scan ranges. void AddIterators(const ReadOptions& options, UnownedPtr seqno_to_time_mapping, const SliceTransform* prefix_extractor, MergeIteratorBuilder* merge_iter_builder, - bool add_range_tombstone_iter); + bool add_range_tombstone_iter, + SequenceNumber read_seq = kMaxSequenceNumber, + const MultiScanArgs* scan_opts = nullptr, + const Comparator* user_comparator = nullptr); uint64_t GetTotalNumEntries() const; diff --git a/db/version_set.cc b/db/version_set.cc index 4dd20488020..0e3475cbfb3 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -18,7 +18,9 @@ #include #include #include +#include #include +#include #include #include "db/blob/blob_fetcher.h" @@ -69,7 +71,6 @@ #include "table/multiget_context.h" #include "table/plain/plain_table_factory.h" #include "table/table_reader.h" -#include "table/two_level_iterator.h" #include "table/unique_id_impl.h" #include "test_util/sync_point.h" #include "util/cast_util.h" @@ -110,6 +111,181 @@ int FindFileInRange(const InternalKeyComparator& icmp, return static_cast(std::lower_bound(b + left, b + right, key, cmp) - b); } +InternalKey MultiScanInternalKey(const Slice& user_key, + const Comparator* user_comparator) { + const size_t timestamp_size = user_comparator->timestamp_size(); + if (timestamp_size == 0) { + return InternalKey(user_key, kMaxSequenceNumber, kValueTypeForSeek); + } + + std::string key_with_ts; + AppendKeyWithMaxTimestamp(&key_with_ts, user_key, timestamp_size); + return InternalKey(key_with_ts, kMaxSequenceNumber, kValueTypeForSeek); +} + +bool MultiScanRangeOverlapsFile(const UserComparatorWrapper& user_comparator, + const ScanOptions& scan_opts, + const FdWithKeyRange& file) { + assert(scan_opts.range.start.has_value()); + assert(scan_opts.range.limit.has_value()); + + if (user_comparator.CompareWithoutTimestamp( + scan_opts.range.limit.value(), /*a_has_ts=*/false, + ExtractUserKey(file.smallest_key), /*b_has_ts=*/true) <= 0) { + return false; + } + + if (user_comparator.CompareWithoutTimestamp( + scan_opts.range.start.value(), /*a_has_ts=*/false, + ExtractUserKey(file.largest_key), /*b_has_ts=*/true) > 0) { + return false; + } + + return true; +} + +void AddMultiScanFileIndex(std::vector* file_indexes, + std::vector* selected, size_t file_index) { + if ((*selected)[file_index]) { + return; + } + (*selected)[file_index] = true; + file_indexes->push_back(file_index); +} + +template +bool NotifyMultiScanOverlappingFile(OnOverlappingFile& on_overlapping_file, + size_t file_index, + const ScanOptions& scan_range) { + if constexpr (std::is_same_v, + bool>) { + return on_overlapping_file(file_index, scan_range); + } else { + on_overlapping_file(file_index, scan_range); + return true; + } +} + +template +void ForEachMultiScanOverlappingFile(const InternalKeyComparator& icmp, + const UserComparatorWrapper& ucmp, + const LevelFilesBrief& file_level, + int level, const MultiScanArgs& scan_opts, + OnOverlappingFile on_overlapping_file) { + assert(scan_opts.HasBoundedScanRanges()); + if (file_level.num_files == 0) { + return; + } + + if (level == 0) { + for (size_t i = 0; i < file_level.num_files; ++i) { + for (const ScanOptions& scan_range : scan_opts.GetScanRanges()) { + if (MultiScanRangeOverlapsFile(ucmp, scan_range, file_level.files[i])) { + if (!NotifyMultiScanOverlappingFile(on_overlapping_file, i, + scan_range)) { + return; + } + } + + if (ucmp.CompareWithoutTimestamp( + scan_range.range.start.value(), /*a_has_ts=*/false, + ExtractUserKey(file_level.files[i].largest_key), + /*b_has_ts=*/true) > 0) { + break; + } + } + } + return; + } + + for (const ScanOptions& scan_range : scan_opts.GetScanRanges()) { + const InternalKey start_key = MultiScanInternalKey( + scan_range.range.start.value(), ucmp.user_comparator()); + size_t first_file = FindFile(icmp, file_level, start_key.Encode()); + if (first_file >= file_level.num_files) { + continue; + } + + const InternalKey limit_key = MultiScanInternalKey( + scan_range.range.limit.value(), ucmp.user_comparator()); + size_t last_file = FindFile(icmp, file_level, limit_key.Encode()); + if (last_file >= file_level.num_files) { + last_file = file_level.num_files - 1; + } + + for (size_t i = first_file; i <= last_file; ++i) { + if (MultiScanRangeOverlapsFile(ucmp, scan_range, file_level.files[i])) { + if (!NotifyMultiScanOverlappingFile(on_overlapping_file, i, + scan_range)) { + return; + } + } + } + } +} + +std::vector GetMultiScanOverlappingFiles( + const InternalKeyComparator& icmp, const UserComparatorWrapper& ucmp, + const LevelFilesBrief& file_level, int level, + const MultiScanArgs& scan_opts, + size_t max_file_indexes = std::numeric_limits::max()) { + std::vector file_indexes; + if (!scan_opts.HasBoundedScanRanges() || file_level.num_files == 0) { + return file_indexes; + } + file_indexes.reserve( + std::min(static_cast(file_level.num_files), max_file_indexes)); + + std::vector selected(file_level.num_files, false); + ForEachMultiScanOverlappingFile( + icmp, ucmp, file_level, level, scan_opts, + [&](size_t file_index, const ScanOptions& /*scan_range*/) { + if (file_indexes.size() >= max_file_indexes) { + return false; + } + AddMultiScanFileIndex(&file_indexes, &selected, file_index); + return file_indexes.size() < max_file_indexes; + }); + + return file_indexes; +} + +void AddTableIteratorForLevel( + ColumnFamilyData* cfd, const ReadOptions& read_options, + const FileOptions& soptions, const MutableCFOptions& mutable_cf_options, + MergeIteratorBuilder* merge_iter_builder, Arena* arena, + const LevelFilesBrief& file_level, int level, size_t file_index, + bool skip_filters, size_t max_file_size_for_l0_meta_pin, + bool allow_unprepared_value, SequenceNumber read_seq) { + std::unique_ptr tombstone_iter = nullptr; + const auto& file = file_level.files[file_index]; + auto table_iter = cfd->table_cache()->NewIterator( + read_options, soptions, cfd->internal_comparator(), *file.file_metadata, + /*range_del_agg=*/nullptr, mutable_cf_options, nullptr, + cfd->internal_stats()->GetFileReadHist(level), + TableReaderCaller::kUserIterator, arena, skip_filters, level, + max_file_size_for_l0_meta_pin, + /*smallest_compaction_key=*/nullptr, + /*largest_compaction_key=*/nullptr, allow_unprepared_value, &read_seq, + &tombstone_iter, + /*maybe_pin_table_handle=*/true); +#ifndef NDEBUG + TEST_SYNC_POINT_CALLBACK("Version::AddIteratorsForLevel:AddedFile", + file.file_metadata); + std::pair iterator_type(true /* is_block_based_table_iterator */, + false /* is_level_iterator */); + TEST_SYNC_POINT_CALLBACK("Version::AddIteratorsForLevel:IteratorType", + &iterator_type); +#endif + if (read_options.ignore_range_deletions) { + merge_iter_builder->AddIterator(table_iter); + } else { + merge_iter_builder->AddPointAndTombstoneIterator(table_iter, + std::move(tombstone_iter)); + } +} + Status OverlapWithIterator(const Comparator* ucmp, const Slice& smallest_user_key, const Slice& largest_user_key, @@ -990,6 +1166,7 @@ class LevelIterator final : public InternalIterator { bool allow_unprepared_value = false, std::unique_ptr*** range_tombstone_iter_ptr_ = nullptr, + SequenceNumber read_seq = kMaxSequenceNumber, Statistics* db_statistics = nullptr, SystemClock* clock = nullptr, bool open_ephemeral_table_reader = false) : table_cache_(table_cache), @@ -1007,9 +1184,11 @@ class LevelIterator final : public InternalIterator { pinned_iters_mgr_(nullptr), compaction_boundaries_(compaction_boundaries), range_tombstone_iter_(nullptr), - read_seq_(read_options.snapshot - ? read_options.snapshot->GetSequenceNumber() - : kMaxSequenceNumber), + range_tombstone_iter_required_(range_tombstone_iter_ptr_ != nullptr), + read_seq_(read_seq != kMaxSequenceNumber || + read_options.snapshot == nullptr + ? read_seq + : read_options.snapshot->GetSequenceNumber()), level_(level), skip_filters_(skip_filters), allow_unprepared_value_(allow_unprepared_value), @@ -1158,69 +1337,18 @@ class LevelIterator final : public InternalIterator { assert(so->GetComparator() == user_comparator_.user_comparator()); file_to_scan_opts_ = std::make_unique(); - for (size_t k = 0; k < scan_opts_->size(); k++) { - const ScanOptions& opt = scan_opts_->GetScanRanges().at(k); - auto start = opt.range.start; - auto end = opt.range.limit; - - if (!start.has_value()) { - continue; - } - - // We can capture this case in the future, but for now lets skip this. - if (!end.has_value()) { - continue; - } - - const size_t timestamp_size = - user_comparator_.user_comparator()->timestamp_size(); - InternalKey istart, iend; - if (timestamp_size == 0) { - istart = - InternalKey(start.value(), kMaxSequenceNumber, kValueTypeForSeek); - // end key is exclusive for multiscan - iend = InternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek); - } else { - std::string start_key_with_ts, end_key_with_ts; - AppendKeyWithMaxTimestamp(&start_key_with_ts, start.value(), - timestamp_size); - AppendKeyWithMaxTimestamp(&end_key_with_ts, end.value(), - timestamp_size); - istart = InternalKey(start_key_with_ts, kMaxSequenceNumber, - kValueTypeForSeek); - // end key is exclusive for multiscan - iend = - InternalKey(end_key_with_ts, kMaxSequenceNumber, kValueTypeForSeek); - } - - // TODO: This needs to be optimized, right now we iterate twice, which - // we dont need to. We can do this in N rather than 2N. - size_t fstart = FindFile(icomparator_, *flevel_, istart.Encode()); - size_t fend = FindFile(icomparator_, *flevel_, iend.Encode()); - - // We need to check the relevant cases - // Cases: - // 1. [ S E ] - // 2. [ S ] [ E ] - // 3. [ S ] ...... [ E ] - for (auto i = fstart; i <= fend; i++) { - if (i < flevel_->num_files) { - // FindFile only compares against the largest_key, so we need this - // additional check to ensure the scan range overlaps the file - if (icomparator_.InternalKeyComparator::Compare( - iend.Encode(), flevel_->files[i].smallest_key) < 0) { - continue; - } - auto const metadata = flevel_->files[i].file_metadata; + ForEachMultiScanOverlappingFile( + icomparator_, user_comparator_, *flevel_, level_, *scan_opts_, + [&](size_t file_index, const ScanOptions& scan_range) { + auto const metadata = flevel_->files[file_index].file_metadata; if (metadata->FileIsStandAloneRangeTombstone()) { // Skip stand alone range deletion files. - continue; + return; } - auto& args = GetMultiScanArgForFile(i); - args.insert(start.value(), end.value(), opt.property_bag); - } - } - } + auto& args = GetMultiScanArgForFile(file_index); + args.insert(scan_range.range.start.value(), + scan_range.range.limit.value(), scan_range.property_bag); + }); StopWatch timer(clock_, db_statistics_, MULTISCAN_PREPARE_ITERATORS); @@ -1233,7 +1361,8 @@ class LevelIterator final : public InternalIterator { file_to_arg.first)); } - if (so->use_async_io) { + if (so->use_async_io && + (!range_tombstone_iter_required_ || range_tombstone_iter_ != nullptr)) { auto before = file_index_; // Pre-create and prepare only relevant file iterators for (auto& file_to_arg : *file_to_scan_opts_) { @@ -1385,6 +1514,7 @@ class LevelIterator final : public InternalIterator { // // *range_tombstone_iter_ points to range tombstones of the current SST file std::unique_ptr* range_tombstone_iter_; + bool range_tombstone_iter_required_; // The sentinel key to be returned Slice sentinel_; @@ -1695,22 +1825,20 @@ bool LevelIterator::SkipEmptyFileForward() { if (scan_opts_ && FileHasMultiScanArg(file_index_)) { const ScanOptions& opts = GetMultiScanArgForFile(file_index_).GetScanRanges().front(); - if (opts.range.start.has_value()) { - InternalKey target; - const size_t ts_size = - user_comparator_.user_comparator()->timestamp_size(); - if (ts_size == 0) { - target = InternalKey(opts.range.start.value(), kMaxSequenceNumber, - kValueTypeForSeek); - } else { - std::string seek_key; - AppendKeyWithMaxTimestamp(&seek_key, opts.range.start.value(), - ts_size); - target = - InternalKey(seek_key, kMaxSequenceNumber, kValueTypeForSeek); - } - file_iter_.Seek(target.Encode()); + assert(opts.range.start.has_value()); + InternalKey target; + const size_t ts_size = + user_comparator_.user_comparator()->timestamp_size(); + if (ts_size == 0) { + target = InternalKey(opts.range.start.value(), kMaxSequenceNumber, + kValueTypeForSeek); + } else { + std::string seek_key; + AppendKeyWithMaxTimestamp(&seek_key, opts.range.start.value(), + ts_size); + target = InternalKey(seek_key, kMaxSequenceNumber, kValueTypeForSeek); } + file_iter_.Seek(target.Encode()); } else { file_iter_.SeekToFirst(); } @@ -1757,14 +1885,8 @@ void LevelIterator::SkipEmptyFileBackward() { #ifndef NDEBUG bool LevelIterator::OverlapRange(const ScanOptions& opts, size_t file_index) { - return (user_comparator_.CompareWithoutTimestamp( - opts.range.start.value(), /*a_has_ts=*/false, - ExtractUserKey(flevel_->files[file_index].largest_key), - /*b_has_ts=*/true) <= 0 && - user_comparator_.CompareWithoutTimestamp( - opts.range.limit.value(), /*a_has_ts=*/false, - ExtractUserKey(flevel_->files[file_index].smallest_key), - /*b_has_ts=*/true) > 0); + return MultiScanRangeOverlapsFile(user_comparator_, opts, + flevel_->files[file_index]); } #endif @@ -2286,7 +2408,9 @@ InternalIterator* Version::TEST_GetLevelIterator( mutable_cf_options_, cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, nullptr /* range_del_agg */, nullptr /* compaction_boundaries */, - allow_unprepared_value, &tombstone_iter_ptr, db_statistics_, clock_); + allow_unprepared_value, + read_options.ignore_range_deletions ? nullptr : &tombstone_iter_ptr, + kMaxSequenceNumber, db_statistics_, clock_); if (read_options.ignore_range_deletions) { merge_iter_builder->AddIterator(level_iter); } else { @@ -2357,19 +2481,22 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( void Version::AddIterators(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - bool allow_unprepared_value) { + bool allow_unprepared_value, SequenceNumber read_seq, + const MultiScanArgs* scan_opts) { assert(storage_info_.finalized_); for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level, - allow_unprepared_value); + allow_unprepared_value, read_seq, scan_opts); } } void Version::AddIteratorsForLevel(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - int level, bool allow_unprepared_value) { + int level, bool allow_unprepared_value, + SequenceNumber read_seq, + const MultiScanArgs* scan_opts) { assert(storage_info_.finalized_); if (level >= storage_info_.num_non_empty_levels()) { // This is an empty level @@ -2380,37 +2507,95 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, } auto* arena = merge_iter_builder->GetArena(); - if (level == 0) { - // Merge all level zero files together since they may overlap - std::unique_ptr tombstone_iter = nullptr; - for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { - const auto& file = storage_info_.LevelFilesBrief(0).files[i]; - auto table_iter = cfd_->table_cache()->NewIterator( - read_options, soptions, cfd_->internal_comparator(), - *file.file_metadata, /*range_del_agg=*/nullptr, mutable_cf_options_, - nullptr, cfd_->internal_stats()->GetFileReadHist(0), - TableReaderCaller::kUserIterator, arena, - /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, - /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, allow_unprepared_value, - /*range_del_read_seqno=*/nullptr, &tombstone_iter, - /*maybe_pin_table_handle=*/true); - if (read_options.ignore_range_deletions) { - merge_iter_builder->AddIterator(table_iter); - } else { - merge_iter_builder->AddPointAndTombstoneIterator( - table_iter, std::move(tombstone_iter)); - } + const LevelFilesBrief& file_level = storage_info_.LevelFilesBrief(level); + const UserComparatorWrapper user_comparator( + cfd_->internal_comparator().user_comparator()); + + if (scan_opts != nullptr) { + const std::vector file_indexes = GetMultiScanOverlappingFiles( + cfd_->internal_comparator(), user_comparator, file_level, level, + *scan_opts, level == 0 ? std::numeric_limits::max() : 2); + if (file_indexes.empty()) { + return; } - if (should_sample_file_read()) { - // Count ones for every L0 files. This is done per iterator creation - // rather than Seek(), while files in other levels are sampled on - // seek/next/prev. - for (FileMetaData* meta : storage_info_.LevelFiles(0)) { - sample_file_read_inc(meta); + + if (level == 0) { + const bool should_sample = should_sample_file_read(); + for (size_t file_index : file_indexes) { + AddTableIteratorForLevel( + cfd_, read_options, soptions, mutable_cf_options_, + merge_iter_builder, arena, file_level, level, file_index, + /*skip_filters=*/false, max_file_size_for_l0_meta_pin_, + allow_unprepared_value, read_seq); + if (should_sample) { + sample_file_read_inc(file_level.files[file_index].file_metadata); + } } + return; } - } else if (storage_info_.LevelFilesBrief(level).num_files > 0) { + + if (file_indexes.size() == 1) { + AddTableIteratorForLevel(cfd_, read_options, soptions, + mutable_cf_options_, merge_iter_builder, arena, + file_level, level, file_indexes.front(), + IsFilterSkipped(level), + /*max_file_size_for_l0_meta_pin=*/0, + allow_unprepared_value, read_seq); + return; + } + +#ifndef NDEBUG + for (size_t file_index = 0; file_index < file_level.num_files; + ++file_index) { + TEST_SYNC_POINT_CALLBACK("Version::AddIteratorsForLevel:AddedFile", + file_level.files[file_index].file_metadata); + } +#endif + auto* mem = arena->AllocateAligned(sizeof(LevelIterator)); + std::unique_ptr** tombstone_iter_ptr = nullptr; + auto level_iter = new (mem) LevelIterator( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), &file_level, mutable_cf_options_, + cfd_->internal_stats()->GetFileReadHist(level), + TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, + /*range_del_agg=*/nullptr, + /*compaction_boundaries=*/nullptr, allow_unprepared_value, + read_options.ignore_range_deletions ? nullptr : &tombstone_iter_ptr, + read_seq, db_statistics_, clock_); +#ifndef NDEBUG + std::pair iterator_type( + false /* is_block_based_table_iterator */, + true /* is_level_iterator */); + TEST_SYNC_POINT_CALLBACK("Version::AddIteratorsForLevel:IteratorType", + &iterator_type); +#endif + if (read_options.ignore_range_deletions) { + merge_iter_builder->AddIterator(level_iter); + } else { + assert(tombstone_iter_ptr); + merge_iter_builder->AddPointAndTombstoneIterator( + level_iter, nullptr /* tombstone_iter */, tombstone_iter_ptr); + } + return; + } + + if (level == 0) { + // Merge all level zero files together since they may overlap + const bool should_sample = should_sample_file_read(); + for (size_t i = 0; i < file_level.num_files; i++) { + AddTableIteratorForLevel( + cfd_, read_options, soptions, mutable_cf_options_, merge_iter_builder, + arena, file_level, level, i, + /*skip_filters=*/false, max_file_size_for_l0_meta_pin_, + allow_unprepared_value, read_seq); + if (should_sample) { + // Count once for every L0 file. This is done per iterator creation + // rather than Seek(), while files in other levels are sampled on + // seek/next/prev. + sample_file_read_inc(file_level.files[i].file_metadata); + } + } + } else if (file_level.num_files > 0) { // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. @@ -2418,12 +2603,13 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, std::unique_ptr** tombstone_iter_ptr = nullptr; auto level_iter = new (mem) LevelIterator( cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), - mutable_cf_options_, cfd_->internal_stats()->GetFileReadHist(level), + cfd_->internal_comparator(), &file_level, mutable_cf_options_, + cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, /*range_del_agg=*/nullptr, /*compaction_boundaries=*/nullptr, allow_unprepared_value, - &tombstone_iter_ptr, db_statistics_, clock_); + read_options.ignore_range_deletions ? nullptr : &tombstone_iter_ptr, + read_seq, db_statistics_, clock_); if (read_options.ignore_range_deletions) { merge_iter_builder->AddIterator(level_iter); } else { @@ -2479,7 +2665,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), mutable_cf_options_, cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, - &range_del_agg, nullptr, false, nullptr, db_statistics_, clock_)); + &range_del_agg, nullptr, false, nullptr, kMaxSequenceNumber, + db_statistics_, clock_)); status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); } @@ -7948,7 +8135,8 @@ InternalIterator* VersionSet::MakeInputIterator( TableReaderCaller::kCompaction, /*skip_filters=*/false, /*level=*/static_cast(c->level(which)), range_del_agg, c->boundaries(which), false, &tombstone_iter_ptr, - db_options_->statistics.get(), clock_, open_ephemeral_table_reader); + kMaxSequenceNumber, db_options_->statistics.get(), clock_, + open_ephemeral_table_reader); range_tombstones.emplace_back(nullptr, tombstone_iter_ptr); } } diff --git a/db/version_set.h b/db/version_set.h index e424316b436..c0237cf3129 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -85,6 +85,7 @@ class MergeIteratorBuilder; class SystemClock; class ManifestTailer; class FilePickerMultiGet; +class MultiScanArgs; // VersionEdit is always supposed to be valid and it is used to point at // entries in Manifest. Ideally it should not be used as a container to @@ -916,17 +917,27 @@ class Version { // yield the contents of this Version when merged together. // @param read_options Must outlive any iterator built by // `merger_iter_builder`. + // @param read_seq Snapshot sequence to use for range tombstone visibility. + // This is passed separately because lazy iterator initialization may happen + // after read_options.snapshot has been released by the caller. + // @param scan_opts Optional bounded scan ranges used to prune levels/files + // while building the iterator tree. void AddIterators(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - bool allow_unprepared_value); + bool allow_unprepared_value, SequenceNumber read_seq, + const MultiScanArgs* scan_opts = nullptr); // @param read_options Must outlive any iterator built by // `merger_iter_builder`. + // @param read_seq Snapshot sequence to use for range tombstone visibility. + // @param scan_opts Optional bounded scan ranges used to prune this level. void AddIteratorsForLevel(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - int level, bool allow_unprepared_value); + int level, bool allow_unprepared_value, + SequenceNumber read_seq, + const MultiScanArgs* scan_opts = nullptr); Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&, const Slice& smallest_user_key, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index cea9c20864d..2644d71cd7d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2058,6 +2058,21 @@ class MultiScanArgs { size_t size() const { return original_ranges_.size(); } bool empty() const { return original_ranges_.empty(); } + bool HasBoundedScanRanges() const { + if (empty()) { + return false; + } + + for (const ScanOptions& scan_range : original_ranges_) { + if (!scan_range.range.start.has_value() || + !scan_range.range.limit.has_value()) { + return false; + } + } + + return true; + } + void reserve(size_t size) { original_ranges_.reserve(size); } operator std::vector*() { return &original_ranges_; } diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 3ecabdd9b4d..a196370dbb0 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -11,6 +11,7 @@ #include "db/arena_wrapped_db_iter.h" #include "monitoring/file_read_sample.h" +#include "test_util/sync_point.h" namespace ROCKSDB_NAMESPACE { // MergingIterator uses a min/max heap to combine data from point iterators. @@ -1740,6 +1741,8 @@ void MergeIteratorBuilder::AddPointAndTombstoneIterator( InternalIterator* MergeIteratorBuilder::Finish(ArenaWrappedDBIter* db_iter) { InternalIterator* ret = nullptr; + TEST_SYNC_POINT_CALLBACK("MergeIteratorBuilder::Finish:UseMergingIterator", + &use_merging_iter); if (!use_merging_iter) { ret = first_iter; first_iter = nullptr; @@ -1748,9 +1751,13 @@ InternalIterator* MergeIteratorBuilder::Finish(ArenaWrappedDBIter* db_iter) { *(p.second) = &(merge_iter->range_tombstone_iters_[p.first]); } if (db_iter && !merge_iter->range_tombstone_iters_.empty()) { - // memtable is always the first level - db_iter->SetMemtableRangetombstoneIter( - &merge_iter->range_tombstone_iters_.front()); + // memtable is always the first level, unless it has been pruned + if (memtable_pruned_) { + db_iter->SetMemtableRangetombstoneIter(nullptr); + } else { + db_iter->SetMemtableRangetombstoneIter( + &merge_iter->range_tombstone_iters_.front()); + } } merge_iter->Finish(); ret = merge_iter; diff --git a/table/merging_iterator.h b/table/merging_iterator.h index b21a3e8ef9f..e0171ff96df 100644 --- a/table/merging_iterator.h +++ b/table/merging_iterator.h @@ -79,6 +79,10 @@ class MergeIteratorBuilder { // iterator needs to be allocated. Arena* GetArena() { return arena; } + void SetMemtablePruned(bool memtable_pruned) { + memtable_pruned_ = memtable_pruned; + } + // Return the result merging iterator. // If db_iter is not nullptr, then db_iter->SetMemtableRangetombstoneIter() // will be called with pointer to where the merging iterator @@ -95,6 +99,9 @@ class MergeIteratorBuilder { // See AddRangeTombstoneIterator() implementation for more detail. std::vector**>> range_del_iter_ptrs_; + // if supplying multiscan ranges allowed us to prune the memtable range delete + // iterator, we need to handle this in Finish() + bool memtable_pruned_ = false; }; } // namespace ROCKSDB_NAMESPACE