-
Notifications
You must be signed in to change notification settings - Fork 4.1k
fix: enforce nullable vector compact data invariants #49924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |
| #include "glog/logging.h" | ||
| #include "index/Meta.h" | ||
| #include "index/Utils.h" | ||
| #include "index/VectorIndexValidDataUtils.h" | ||
| #include "knowhere/binaryset.h" | ||
| #include "knowhere/comp/index_param.h" | ||
| #include "knowhere/dataset.h" | ||
|
|
@@ -63,6 +64,69 @@ namespace milvus::index { | |
| #define kPrepareDim 100 | ||
| #define kPrepareRows 1 | ||
|
|
||
| namespace { | ||
|
|
||
| struct DiskValidData { | ||
| bool found = false; | ||
| size_t total_count = 0; | ||
| size_t valid_count = 0; | ||
| std::vector<uint8_t> bitmap; | ||
| }; | ||
|
|
||
| template <typename LocalChunkManagerPtr> | ||
| DiskValidData | ||
| ReadDiskValidData(const LocalChunkManagerPtr& local_chunk_manager, | ||
| const std::string& valid_data_path) { | ||
| DiskValidData valid_data; | ||
| if (!local_chunk_manager->Exist(valid_data_path)) { | ||
| return valid_data; | ||
| } | ||
|
|
||
| valid_data.found = true; | ||
| auto file_size = local_chunk_manager->Size(valid_data_path); | ||
| AssertInfo(file_size >= sizeof(uint64_t), | ||
| "nullable vector disk valid_data file is too small"); | ||
| uint64_t wire_count = 0; | ||
| local_chunk_manager->Read( | ||
| valid_data_path, 0, &wire_count, sizeof(uint64_t)); | ||
| valid_data.total_count = FromValidDataCount(wire_count); | ||
| valid_data.bitmap.resize(GetValidDataBitmapSize(valid_data.total_count)); | ||
| AssertInfo(file_size >= sizeof(uint64_t) + valid_data.bitmap.size(), | ||
| "nullable vector disk valid_data bitmap file is too small"); | ||
| if (!valid_data.bitmap.empty()) { | ||
| local_chunk_manager->Read(valid_data_path, | ||
| sizeof(uint64_t), | ||
| valid_data.bitmap.data(), | ||
| valid_data.bitmap.size()); | ||
| } | ||
| valid_data.valid_count = | ||
| CountValidDataBitmap(valid_data.total_count, valid_data.bitmap.data()); | ||
| return valid_data; | ||
| } | ||
|
|
||
| template <typename LocalChunkManagerPtr> | ||
| void | ||
| WriteDiskValidData(const LocalChunkManagerPtr& local_chunk_manager, | ||
| const std::string& valid_data_path, | ||
| const OffsetMapping& offset_mapping) { | ||
| auto total_count = static_cast<size_t>(offset_mapping.GetTotalCount()); | ||
| auto wire_count = ToValidDataCount(total_count); | ||
| auto packed_data = PackValidDataBitmap(offset_mapping); | ||
| if (!local_chunk_manager->Exist(valid_data_path)) { | ||
| local_chunk_manager->CreateFile(valid_data_path); | ||
| } | ||
| local_chunk_manager->Write( | ||
| valid_data_path, 0, &wire_count, sizeof(uint64_t)); | ||
| if (!packed_data.empty()) { | ||
| local_chunk_manager->Write(valid_data_path, | ||
| sizeof(uint64_t), | ||
| packed_data.data(), | ||
| packed_data.size()); | ||
| } | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| template <typename T> | ||
| VectorDiskAnnIndex<T>::VectorDiskAnnIndex( | ||
| DataType elem_type, | ||
|
|
@@ -138,52 +202,55 @@ VectorDiskAnnIndex<T>::Load(milvus::tracer::TraceContext ctx, | |
| read_file_span->End(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Added |
||
| } | ||
|
|
||
| // start engine load index span | ||
| auto span_load_engine = | ||
| milvus::tracer::StartSpan("SegCoreEngineLoadDiskIndex", &ctx); | ||
| opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> | ||
| nostd_span_load_engine(span_load_engine); | ||
| auto engine_scope = | ||
| opentelemetry::trace::Tracer::WithActiveSpan(nostd_span_load_engine); | ||
| auto stat = index_.Deserialize(knowhere::BinarySet(), load_config); | ||
| if (stat != knowhere::Status::success) | ||
| ThrowInfo(ErrorCode::UnexpectedError, | ||
| "failed to Deserialize index, {}", | ||
| KnowhereStatusString(stat)); | ||
| span_load_engine->End(); | ||
|
|
||
| auto local_chunk_manager = | ||
| storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); | ||
| auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix(); | ||
|
|
||
| auto valid_data_path = local_index_path_prefix + "/" + VALID_DATA_KEY; | ||
| if (local_chunk_manager->Exist(valid_data_path)) { | ||
| size_t count; | ||
| local_chunk_manager->Read(valid_data_path, 0, &count, sizeof(size_t)); | ||
| size_t byte_size = (count + 7) / 8; | ||
| std::vector<uint8_t> valid_bitmap(byte_size); | ||
| local_chunk_manager->Read( | ||
| valid_data_path, sizeof(size_t), valid_bitmap.data(), byte_size); | ||
| // Convert bitmap to bool array | ||
| std::unique_ptr<bool[]> valid_data(new bool[count]); | ||
| for (size_t i = 0; i < count; ++i) { | ||
| valid_data[i] = (valid_bitmap[i / 8] >> (i % 8)) & 1; | ||
| auto disk_valid_data = | ||
| ReadDiskValidData(local_chunk_manager, valid_data_path); | ||
| bool all_null_nullable = disk_valid_data.found && | ||
| disk_valid_data.total_count > 0 && | ||
| disk_valid_data.valid_count == 0; | ||
| if (!all_null_nullable) { | ||
| // start engine load index span | ||
| auto span_load_engine = | ||
| milvus::tracer::StartSpan("SegCoreEngineLoadDiskIndex", &ctx); | ||
| opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> | ||
| nostd_span_load_engine(span_load_engine); | ||
| auto engine_scope = opentelemetry::trace::Tracer::WithActiveSpan( | ||
| nostd_span_load_engine); | ||
| auto stat = index_.Deserialize(knowhere::BinarySet(), load_config); | ||
| if (stat != knowhere::Status::success) | ||
| ThrowInfo(ErrorCode::UnexpectedError, | ||
| "failed to Deserialize index, {}", | ||
| KnowhereStatusString(stat)); | ||
| span_load_engine->End(); | ||
| SetDim(index_.Dim()); | ||
| } else { | ||
| auto dim = GetValueFromConfig<int64_t>(load_config, DIM_KEY); | ||
| if (dim.has_value()) { | ||
| SetDim(dim.value()); | ||
| } | ||
| BuildValidData(valid_data.get(), count); | ||
| } | ||
|
|
||
| SetDim(index_.Dim()); | ||
| if (disk_valid_data.found) { | ||
| BuildValidDataFromBitmap( | ||
| this, disk_valid_data.total_count, disk_valid_data.bitmap.data()); | ||
| } | ||
| } | ||
|
|
||
| template <typename T> | ||
| IndexStatsPtr | ||
| VectorDiskAnnIndex<T>::Upload(const Config& config) { | ||
| BinarySet ret; | ||
| auto stat = index_.Serialize(ret); | ||
| if (stat != knowhere::Status::success) { | ||
| ThrowInfo(ErrorCode::UnexpectedError, | ||
| "failed to serialize index, {}", | ||
| KnowhereStatusString(stat)); | ||
| if (!IsAllNullNullable(offset_mapping_)) { | ||
| auto stat = index_.Serialize(ret); | ||
| if (stat != knowhere::Status::success) { | ||
| ThrowInfo(ErrorCode::UnexpectedError, | ||
| "failed to serialize index, {}", | ||
| KnowhereStatusString(stat)); | ||
| } | ||
| } | ||
| auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize(); | ||
| return IndexStats::NewFromSizeMap(file_manager_->GetAddedTotalFileSize(), | ||
|
|
@@ -226,6 +293,25 @@ VectorDiskAnnIndex<T>::Build(const Config& config) { | |
| file_manager_->CacheRawDataToDisk<T>(config_with_emb_list); | ||
| build_config[DISK_ANN_RAW_DATA_PATH] = local_data_path; | ||
|
|
||
| auto disk_valid_data = | ||
| ReadDiskValidData(local_chunk_manager, valid_data_path); | ||
| if (disk_valid_data.found) { | ||
| BuildValidDataFromBitmap( | ||
| this, disk_valid_data.total_count, disk_valid_data.bitmap.data()); | ||
| if (disk_valid_data.valid_count == 0) { | ||
| auto dim = GetValueFromConfig<int64_t>(build_config, DIM_KEY); | ||
| if (dim.has_value()) { | ||
| SetDim(dim.value()); | ||
| } | ||
| file_manager_->AddFile(valid_data_path); | ||
| local_chunk_manager->RemoveDir(storage::GenFieldRawDataPathPrefix( | ||
| local_chunk_manager, segment_id, field_id)); | ||
| LOG_INFO("build all-null nullable disk index done, build_id: {}", | ||
| config.value("build_id", "unknown")); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // For VECTOR_ARRAY, verify offsets file exists and pass its path to build_config | ||
| if (is_embedding_list) { | ||
| if (!local_chunk_manager->Exist(offsets_path)) { | ||
|
|
@@ -303,6 +389,19 @@ VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset, | |
| auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix(); | ||
| build_config[DISK_ANN_PREFIX_PATH] = local_index_path_prefix; | ||
|
|
||
| if (HasValidData() && GetValidCount() == 0 && | ||
| offset_mapping_.GetTotalCount() > 0) { | ||
| auto valid_data_path = local_index_path_prefix + "/" + VALID_DATA_KEY; | ||
| WriteDiskValidData( | ||
| local_chunk_manager, valid_data_path, offset_mapping_); | ||
| file_manager_->AddFile(valid_data_path); | ||
| auto dim = GetValueFromConfig<int64_t>(build_config, DIM_KEY); | ||
| if (dim.has_value()) { | ||
| SetDim(dim.value()); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| if (GetIndexType() == knowhere::IndexEnum::INDEX_DISKANN) { | ||
| auto num_threads = GetValueFromConfig<std::string>( | ||
| build_config, DISK_ANN_BUILD_THREAD_NUM); | ||
|
|
@@ -380,17 +479,8 @@ VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset, | |
|
|
||
| if (HasValidData()) { | ||
| auto valid_data_path = local_index_path_prefix + "/" + VALID_DATA_KEY; | ||
| size_t count = offset_mapping_.GetTotalCount(); | ||
| local_chunk_manager->Write(valid_data_path, 0, &count, sizeof(size_t)); | ||
| size_t byte_size = (count + 7) / 8; | ||
| std::vector<uint8_t> packed_data(byte_size, 0); | ||
| for (size_t i = 0; i < count; ++i) { | ||
| if (offset_mapping_.IsValid(i)) { | ||
| packed_data[i / 8] |= (1 << (i % 8)); | ||
| } | ||
| } | ||
| local_chunk_manager->Write( | ||
| valid_data_path, sizeof(size_t), packed_data.data(), byte_size); | ||
| WriteDiskValidData( | ||
| local_chunk_manager, valid_data_path, offset_mapping_); | ||
| file_manager_->AddFile(valid_data_path); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
size_tpersistence issue as inVectorIndexValidDataUtils.h. The disk index load and build paths serialize/deserialize the valid-data count assize_t. This must match the fixeduint64_ttype across all persisted metadata paths. TheVectorDiskAnnIndex::Loadall-null branch that skips Knowhere deserialization also lacks a direct round-trip test (see separate testing issue).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Disk valid-data uses the same
uint64_twire count and file-size checks. Added all-null disk build/load coverage as well.