Skip to content

Commit db48a91

Browse files
committed
fix: handle nullable vector compact data paths
1 parent e1bab98 commit db48a91

71 files changed

Lines changed: 5707 additions & 869 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

internal/core/src/common/FieldData.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ FieldDataImpl<Type, is_type_entire_row>::FillFieldData(
128128
if (element_count == 0) {
129129
return;
130130
}
131-
null_count_ = array->null_count();
131+
if (!IsVectorDataType(data_type_)) {
132+
null_count_ = array->null_count();
133+
}
132134
switch (data_type_) {
133135
case DataType::BOOL: {
134136
AssertInfo(array->type()->id() == arrow::Type::type::BOOL,
@@ -687,6 +689,9 @@ FieldDataVectorImpl<Type, is_type_entire_row>::FillFieldData(
687689
this->valid_data_.data(),
688690
this->length_,
689691
total_element_count);
692+
} else {
693+
bitset::detail::ElementWiseBitsetPolicy<uint8_t>::op_fill(
694+
this->valid_data_.data(), this->length_, total_element_count, true);
690695
}
691696

692697
// update logical to physical offset mapping
@@ -703,7 +708,7 @@ FieldDataVectorImpl<Type, is_type_entire_row>::FillFieldData(
703708
this->valid_count_ += valid_count;
704709
}
705710

706-
this->null_count_ = total_element_count - valid_count;
711+
this->null_count_ += total_element_count - valid_count;
707712
this->length_ += total_element_count;
708713
}
709714

internal/core/src/index/VectorMemIndex.cpp

Lines changed: 126 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,79 @@
7878

7979
namespace milvus::index {
8080

81+
namespace {
82+
83+
bool
84+
IsValidDataBinary(const std::string& name) {
85+
return name == VALID_DATA_COUNT_KEY || name == VALID_DATA_KEY;
86+
}
87+
88+
bool
89+
ContainsOnlyValidData(const BinarySet& binary_set) {
90+
if (!binary_set.Contains(VALID_DATA_COUNT_KEY) ||
91+
!binary_set.Contains(VALID_DATA_KEY)) {
92+
return false;
93+
}
94+
for (const auto& [name, _] : binary_set.binary_map_) {
95+
if (!IsValidDataBinary(name)) {
96+
return false;
97+
}
98+
}
99+
return true;
100+
}
101+
102+
void
103+
AppendValidDataToBinarySet(const OffsetMapping& offset_mapping,
104+
BinarySet& binary_set) {
105+
if (!offset_mapping.IsEnabled()) {
106+
return;
107+
}
108+
109+
auto total_count = offset_mapping.GetTotalCount();
110+
111+
std::shared_ptr<uint8_t[]> count_buf(new uint8_t[sizeof(size_t)]);
112+
size_t count = static_cast<size_t>(total_count);
113+
std::memcpy(count_buf.get(), &count, sizeof(size_t));
114+
binary_set.Append(VALID_DATA_COUNT_KEY, count_buf, sizeof(size_t));
115+
116+
size_t byte_size = (count + 7) / 8;
117+
std::shared_ptr<uint8_t[]> data(new uint8_t[byte_size]);
118+
std::memset(data.get(), 0, byte_size);
119+
for (size_t i = 0; i < count; ++i) {
120+
if (offset_mapping.IsValid(i)) {
121+
data[i / 8] |= (1 << (i % 8));
122+
}
123+
}
124+
binary_set.Append(VALID_DATA_KEY, data, byte_size);
125+
}
126+
127+
bool
128+
LoadValidDataFromBinarySet(const BinarySet& binary_set,
129+
VectorIndex* vector_index) {
130+
bool has_count = binary_set.Contains(VALID_DATA_COUNT_KEY);
131+
bool has_data = binary_set.Contains(VALID_DATA_KEY);
132+
if (!has_count && !has_data) {
133+
return false;
134+
}
135+
AssertInfo(has_count && has_data,
136+
"nullable vector index valid_data files are incomplete");
137+
138+
auto count_ptr = binary_set.GetByName(VALID_DATA_COUNT_KEY);
139+
size_t count;
140+
std::memcpy(&count, count_ptr->data.get(), sizeof(size_t));
141+
142+
auto data_ptr = binary_set.GetByName(VALID_DATA_KEY);
143+
std::unique_ptr<bool[]> valid_data(new bool[count]);
144+
auto bitmap = data_ptr->data.get();
145+
for (size_t i = 0; i < count; ++i) {
146+
valid_data[i] = (bitmap[i / 8] >> (i % 8)) & 1;
147+
}
148+
vector_index->BuildValidData(valid_data.get(), count);
149+
return true;
150+
}
151+
152+
} // namespace
153+
81154
template <typename T>
82155
VectorMemIndex<T>::VectorMemIndex(
83156
DataType elem_type,
@@ -166,32 +239,18 @@ template <typename T>
166239
BinarySet
167240
VectorMemIndex<T>::Serialize(const Config& config) {
168241
knowhere::BinarySet ret;
169-
auto stat = index_.Serialize(ret);
170-
if (stat != knowhere::Status::success)
171-
ThrowInfo(ErrorCode::UnexpectedError,
172-
"failed to serialize index: {}",
173-
KnowhereStatusString(stat));
174-
175-
// Serialize valid_data from offset_mapping if enabled
176-
if (offset_mapping_.IsEnabled()) {
177-
auto total_count = offset_mapping_.GetTotalCount();
178-
179-
std::shared_ptr<uint8_t[]> count_buf(new uint8_t[sizeof(size_t)]);
180-
size_t count = static_cast<size_t>(total_count);
181-
std::memcpy(count_buf.get(), &count, sizeof(size_t));
182-
ret.Append(VALID_DATA_COUNT_KEY, count_buf, sizeof(size_t));
183-
184-
size_t byte_size = (count + 7) / 8;
185-
std::shared_ptr<uint8_t[]> data(new uint8_t[byte_size]);
186-
std::memset(data.get(), 0, byte_size);
187-
for (size_t i = 0; i < count; ++i) {
188-
if (offset_mapping_.IsValid(i)) {
189-
data[i / 8] |= (1 << (i % 8));
190-
}
191-
}
192-
ret.Append(VALID_DATA_KEY, data, byte_size);
242+
bool all_null_nullable =
243+
offset_mapping_.IsEnabled() && offset_mapping_.GetTotalCount() > 0 &&
244+
offset_mapping_.GetValidCount() == 0;
245+
if (!all_null_nullable) {
246+
auto stat = index_.Serialize(ret);
247+
if (stat != knowhere::Status::success)
248+
ThrowInfo(ErrorCode::UnexpectedError,
249+
"failed to serialize index: {}",
250+
KnowhereStatusString(stat));
193251
}
194252

253+
AppendValidDataToBinarySet(offset_mapping_, ret);
195254
Disassemble(ret);
196255

197256
return ret;
@@ -201,31 +260,20 @@ template <typename T>
201260
void
202261
VectorMemIndex<T>::LoadWithoutAssemble(const BinarySet& binary_set,
203262
const Config& config) {
204-
auto stat = index_.Deserialize(binary_set, config);
205-
if (stat != knowhere::Status::success)
206-
ThrowInfo(ErrorCode::UnexpectedError,
207-
"failed to Deserialize index: {}",
208-
KnowhereStatusString(stat));
209-
210-
// Deserialize valid_data bitmap and rebuild offset_mapping
211-
if (binary_set.Contains(VALID_DATA_COUNT_KEY) &&
212-
binary_set.Contains(VALID_DATA_KEY)) {
213-
knowhere::BinaryPtr ptr;
214-
ptr = binary_set.GetByName(VALID_DATA_COUNT_KEY);
215-
size_t count;
216-
std::memcpy(&count, ptr->data.get(), sizeof(size_t));
217-
218-
ptr = binary_set.GetByName(VALID_DATA_KEY);
219-
// Convert bitmap to bool array
220-
std::unique_ptr<bool[]> valid_data(new bool[count]);
221-
auto bitmap = ptr->data.get();
222-
for (size_t i = 0; i < count; ++i) {
223-
valid_data[i] = (bitmap[i / 8] >> (i % 8)) & 1;
263+
if (ContainsOnlyValidData(binary_set)) {
264+
if (config.contains(DIM_KEY)) {
265+
SetDim(GetDimFromConfig(config));
224266
}
225-
BuildValidData(valid_data.get(), count);
267+
} else {
268+
auto stat = index_.Deserialize(binary_set, config);
269+
if (stat != knowhere::Status::success)
270+
ThrowInfo(ErrorCode::UnexpectedError,
271+
"failed to Deserialize index: {}",
272+
KnowhereStatusString(stat));
273+
SetDim(index_.Dim());
226274
}
227275

228-
SetDim(index_.Dim());
276+
LoadValidDataFromBinarySet(binary_set, this);
229277
}
230278

231279
template <typename T>
@@ -451,6 +499,11 @@ VectorMemIndex<T>::Build(const Config& config) {
451499
total_size += data->Size();
452500
}
453501
}
502+
if (nullable && total_valid_rows == 0) {
503+
SetDim(dim);
504+
BuildValidData(valid_data.get(), total_num_rows);
505+
return;
506+
}
454507
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[total_size]);
455508

456509
size_t lim_offset = 0;
@@ -523,6 +576,11 @@ VectorMemIndex<T>::Build(const Config& config) {
523576
field_data)
524577
->Dim());
525578
}
579+
if (nullable && total_valid_rows == 0) {
580+
SetDim(dim);
581+
BuildValidData(valid_data.get(), total_num_rows);
582+
return;
583+
}
526584
std::vector<knowhere::sparse::SparseRow<SparseValueType>> vec(
527585
total_valid_rows);
528586
int64_t offset = 0;
@@ -775,6 +833,7 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
775833
std::chrono::duration<double> write_disk_duration_sum;
776834
std::unique_ptr<storage::DataCodec> valid_data_count_codec;
777835
std::unique_ptr<storage::DataCodec> valid_data_codec;
836+
bool wrote_index_data = false;
778837
// load files in two parts:
779838
// 1. EMB_LIST_META: Written separately to embedding_list_meta_writer_ptr (if embedding list type)
780839
// 2. All other binaries: Merged and written to file_writer, forming a unified index file for knowhere
@@ -818,6 +877,7 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
818877
} else {
819878
file_writer.Write(data->PayloadData(),
820879
data->PayloadSize());
880+
wrote_index_data = true;
821881
}
822882
write_disk_duration_sum +=
823883
(std::chrono::system_clock::now() - start_write_file);
@@ -863,6 +923,7 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
863923
} else {
864924
file_writer.Write(index_data->PayloadData(),
865925
index_data->PayloadSize());
926+
wrote_index_data = true;
866927
}
867928
}
868929
write_disk_duration_sum +=
@@ -880,29 +941,38 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
880941
embedding_list_meta_writer_ptr->Finish();
881942
}
882943

883-
LOG_INFO("load index into Knowhere...");
884944
auto conf = config;
885945
conf.erase(MMAP_FILE_PATH);
886946
conf[ENABLE_MMAP] = true;
887947
if (is_embedding_list) {
888948
conf["emb_list_meta_file_path"] = embedding_list_meta_path.value();
889949
}
890950
auto start_deserialize = std::chrono::system_clock::now();
891-
auto stat = index_.DeserializeFromFile(local_filepath.value(), conf);
892-
auto deserialize_duration =
893-
std::chrono::system_clock::now() - start_deserialize;
894-
if (stat != knowhere::Status::success) {
895-
ThrowInfo(ErrorCode::UnexpectedError,
896-
"failed to Deserialize index: {}",
897-
KnowhereStatusString(stat));
951+
std::chrono::duration<double> deserialize_duration{};
952+
if (wrote_index_data) {
953+
LOG_INFO("load index into Knowhere...");
954+
auto stat = index_.DeserializeFromFile(local_filepath.value(), conf);
955+
deserialize_duration = std::chrono::system_clock::now() -
956+
start_deserialize;
957+
if (stat != knowhere::Status::success) {
958+
ThrowInfo(ErrorCode::UnexpectedError,
959+
"failed to Deserialize index: {}",
960+
KnowhereStatusString(stat));
961+
}
962+
this->SetDim(index_.Dim());
963+
} else {
964+
LOG_INFO("load all-null nullable vector index valid data only...");
965+
AssertInfo(valid_data_count_codec && valid_data_codec,
966+
"nullable vector index valid_data files are incomplete");
967+
if (conf.contains(DIM_KEY)) {
968+
this->SetDim(GetDimFromConfig(conf));
969+
}
898970
}
899971
milvus::monitor::internal_storage_deserialize_duration.Observe(
900972
std::chrono::duration_cast<std::chrono::milliseconds>(
901973
deserialize_duration)
902974
.count());
903975

904-
this->SetDim(index_.Dim());
905-
906976
// Restore valid_data for nullable vector support
907977
if (valid_data_count_codec && valid_data_codec) {
908978
size_t count;

internal/core/src/indexbuilder/VecIndexCreator.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,19 @@ void
7070
VecIndexCreator::Build(const milvus::DatasetPtr& dataset,
7171
const bool* valid_data,
7272
const int64_t valid_data_len) {
73-
index_->BuildWithDataset(dataset, config_);
7473
if (valid_data && valid_data_len > 0) {
7574
auto vec_index = dynamic_cast<index::VectorIndex*>(index_.get());
7675
AssertInfo(vec_index != nullptr, "failed to cast index to VectorIndex");
76+
if (dataset->GetRows() == 0) {
77+
vec_index->SetDim(dataset->GetDim());
78+
vec_index->BuildValidData(valid_data, valid_data_len);
79+
return;
80+
}
81+
index_->BuildWithDataset(dataset, config_);
7782
vec_index->BuildValidData(valid_data, valid_data_len);
83+
return;
7884
}
85+
index_->BuildWithDataset(dataset, config_);
7986
}
8087

8188
void

internal/core/src/query/Utils.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ ApplyElementIDMapping(const std::vector<int64_t>& element_ids,
5555
inline TargetBitmap
5656
TransformBitset(const BitsetView& bitset,
5757
const milvus::OffsetMapping& mapping) {
58+
// bit=true means filtered out. The logical bitset may be shorter than the
59+
// full mapping on growing segments because it is sized by query timestamp
60+
// visibility. Physical rows outside that logical view are not visible yet.
5861
TargetBitmap result;
5962
auto count = mapping.GetValidCount();
60-
result.resize(count);
63+
result.resize(count, true);
6164
for (int64_t physical_idx = 0; physical_idx < count; physical_idx++) {
6265
auto logical_idx = mapping.GetLogicalOffset(physical_idx);
6366
if (logical_idx >= 0 &&

0 commit comments

Comments
 (0)