Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a173785b3376267c4d62b6e0b0a54ca040822aa" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fastbloom = "0.8"
fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
mockall.workspace = true
pin-project.workspace = true
prost.workspace = true
Expand Down
34 changes: 0 additions & 34 deletions src/index/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

pub mod applier;
pub mod creator;
pub mod error;
Expand All @@ -24,35 +22,3 @@ pub type BytesRef<'a> = &'a [u8];

/// The seed used for the Bloom filter.
pub const SEED: u128 = 42;

/// The Meta information of the bloom filter stored in the file.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct BloomFilterMeta {
/// The number of rows per segment.
pub rows_per_segment: usize,

/// The number of segments.
pub seg_count: usize,

/// The number of total rows.
pub row_count: usize,

/// The size of the bloom filter excluding the meta information.
pub bloom_filter_segments_size: usize,

/// Offset and size of bloom filters in the file.
pub bloom_filter_segments: Vec<BloomFilterSegmentLocation>,
}

/// The location of the bloom filter segment in the file.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BloomFilterSegmentLocation {
/// The offset of the bloom filter segment in the file.
pub offset: u64,

/// The size of the bloom filter segment in the file.
pub size: u64,

/// The number of elements in the bloom filter segment.
pub elem_count: usize,
}
89 changes: 67 additions & 22 deletions src/index/src/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
use std::collections::HashSet;
use std::ops::Range;

use greptime_proto::v1::index::BloomFilterMeta;
use itertools::Itertools;

use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::{BloomFilterMeta, Bytes};
use crate::bloom_filter::Bytes;

pub struct BloomFilterApplier {
reader: Box<dyn BloomFilterReader + Send>,
Expand All @@ -37,27 +40,42 @@ impl BloomFilterApplier {
probes: &HashSet<Bytes>,
search_range: Range<usize>,
) -> Result<Vec<Range<usize>>> {
let rows_per_segment = self.meta.rows_per_segment;
let rows_per_segment = self.meta.rows_per_segment as usize;
let start_seg = search_range.start / rows_per_segment;
let end_seg = search_range.end.div_ceil(rows_per_segment);

let locs = &self.meta.bloom_filter_segments[start_seg..end_seg];
let bfs = self.reader.bloom_filter_vec(locs).await?;

let mut ranges: Vec<Range<usize>> = Vec::with_capacity(end_seg - start_seg);
for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) {
let start = seg_id * rows_per_segment;
let locs = &self.meta.segment_loc_indices[start_seg..end_seg];

// dedup locs
let deduped_locs = locs
.iter()
.dedup()
.map(|i| self.meta.bloom_filter_locs[*i as usize].clone())
.collect::<Vec<_>>();
let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?;

let mut ranges: Vec<Range<usize>> = Vec::with_capacity(bfs.len());
for ((_, mut group), bloom) in locs
.iter()
.zip(start_seg..end_seg)
.group_by(|(x, _)| **x)
.into_iter()
.zip(bfs.iter())
{
let start = group.next().unwrap().1 * rows_per_segment; // SAFETY: group is not empty
let end = group.last().map_or(start + rows_per_segment, |(_, end)| {
(end + 1) * rows_per_segment
});
let actual_start = start.max(search_range.start);
let actual_end = end.min(search_range.end);
for probe in probes {
if bloom.contains(probe) {
let end = (start + rows_per_segment).min(search_range.end);
let start = start.max(search_range.start);

match ranges.last_mut() {
Some(last) if last.end == start => {
last.end = end;
Some(last) if last.end == actual_start => {
last.end = actual_end;
}
_ => {
ranges.push(start..end);
ranges.push(actual_start..actual_end);
}
}
break;
Expand Down Expand Up @@ -93,46 +111,73 @@ mod tests {
);

let rows = vec![
// seg 0
vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
// seg 1
vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
// seg 2
vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
// duplicate rows
// seg 3
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
// seg 4
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
// seg 5
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
// seg 6
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
vec![b"dup".to_vec()],
];

let cases = vec![
(vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range
(vec![b"row00".to_vec()], 0..28, vec![0..4]), // search one row in full range
(vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range
(vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range
(
vec![b"row01".to_vec(), b"row06".to_vec()],
0..12,
0..28,
vec![0..8],
), // search multiple rows in multiple ranges
(
vec![b"row01".to_vec(), b"row11".to_vec()],
0..12,
0..28,
vec![0..4, 8..12],
), // search multiple rows in multiple ranges
(vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range
(vec![b"row99".to_vec()], 0..28, vec![]), // search for a row that doesn't exist in the full range
(vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range
(
vec![b"row04".to_vec(), b"row05".to_vec()],
0..12,
vec![4..8],
), // search multiple rows in same segment
(vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment
(vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range
(vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments
(vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment
(vec![b"seg01".to_vec()], 0..28, vec![4..8]), // search rows in a segment
(vec![b"seg01".to_vec()], 6..28, vec![6..8]), // search rows in a segment in partial range
(vec![b"overl".to_vec()], 0..28, vec![0..8]), // search rows in multiple segments
(vec![b"overl".to_vec()], 2..28, vec![2..8]), // search range starts from the middle of a segment
(vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment
(vec![b"dup".to_vec()], 0..12, vec![]), // search for a duplicate row not in the range
(vec![b"dup".to_vec()], 0..16, vec![12..16]), // search for a duplicate row in the range
(vec![b"dup".to_vec()], 0..28, vec![12..28]), // search for a duplicate row in the full range
];

for row in rows {
Expand Down
82 changes: 48 additions & 34 deletions src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use std::sync::Arc;

use finalize_segment::FinalizedBloomFilterStorage;
use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
use prost::Message;
use snafu::ResultExt;

use crate::bloom_filter::error::{IoSnafu, Result, SerdeJsonSnafu};
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes, SEED};
use crate::bloom_filter::error::{IoSnafu, Result};
use crate::bloom_filter::{Bytes, SEED};
use crate::external_provider::ExternalTempFileProvider;

/// The false positive rate of the Bloom filter.
Expand Down Expand Up @@ -170,30 +172,32 @@ impl BloomFilterCreator {
}

let mut meta = BloomFilterMeta {
rows_per_segment: self.rows_per_segment,
row_count: self.accumulated_row_count,
rows_per_segment: self.rows_per_segment as _,
row_count: self.accumulated_row_count as _,
..Default::default()
};

let mut segs = self.finalized_bloom_filters.drain().await?;
let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
meta.segment_count = meta.segment_loc_indices.len() as _;

while let Some(segment) = segs.next().await {
let segment = segment?;
writer
.write_all(&segment.bloom_filter_bytes)
.await
.context(IoSnafu)?;

let size = segment.bloom_filter_bytes.len();
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
offset: meta.bloom_filter_segments_size as _,
size: size as _,
elem_count: segment.element_count,
let size = segment.bloom_filter_bytes.len() as u64;
meta.bloom_filter_locs.push(BloomFilterLoc {
offset: meta.bloom_filter_size as _,
size,
element_count: segment.element_count as _,
});
meta.bloom_filter_segments_size += size;
meta.seg_count += 1;
meta.bloom_filter_size += size;
}

let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
let meta_bytes = meta.encode_to_vec();
writer.write_all(&meta_bytes).await.context(IoSnafu)?;

let meta_size = meta_bytes.len() as u32;
Expand Down Expand Up @@ -287,34 +291,38 @@ mod tests {
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());

let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap();
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();

assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.seg_count, 2);
assert_eq!(meta.segment_count, 2);
assert_eq!(meta.row_count, 3);
assert_eq!(
meta.bloom_filter_segments_size + meta_bytes.len() + 4,
meta.bloom_filter_size as usize + meta_bytes.len() + 4,
total_size
);

let mut bfs = Vec::new();
for segment in meta.bloom_filter_segments {
for segment in meta.bloom_filter_locs {
let bloom_filter_bytes =
&bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
let v = u64_vec_from_bytes(bloom_filter_bytes);
let bloom_filter = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.elem_count);
.expected_items(segment.element_count as usize);
bfs.push(bloom_filter);
}

assert_eq!(bfs.len(), 2);
assert!(bfs[0].contains(&b"a"));
assert!(bfs[0].contains(&b"b"));
assert!(bfs[0].contains(&b"c"));
assert!(bfs[0].contains(&b"d"));
assert!(bfs[1].contains(&b"e"));
assert!(bfs[1].contains(&b"f"));
assert_eq!(meta.segment_loc_indices.len(), 2);

let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
assert!(bf0.contains(&b"a"));
assert!(bf0.contains(&b"b"));
assert!(bf0.contains(&b"c"));
assert!(bf0.contains(&b"d"));

let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
assert!(bf1.contains(&b"e"));
assert!(bf1.contains(&b"f"));
}

#[tokio::test]
Expand Down Expand Up @@ -356,37 +364,43 @@ mod tests {
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());

let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap();
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();

assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.seg_count, 10);
assert_eq!(meta.segment_count, 10);
assert_eq!(meta.row_count, 20);
assert_eq!(
meta.bloom_filter_segments_size + meta_bytes.len() + 4,
meta.bloom_filter_size as usize + meta_bytes.len() + 4,
total_size
);

let mut bfs = Vec::new();
for segment in meta.bloom_filter_segments {
for segment in meta.bloom_filter_locs {
let bloom_filter_bytes =
&bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
let v = u64_vec_from_bytes(bloom_filter_bytes);
let bloom_filter = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.elem_count);
.expected_items(segment.element_count as _);
bfs.push(bloom_filter);
}

assert_eq!(bfs.len(), 10);
for bf in bfs.iter().take(3) {
// 4 bloom filters to serve 10 segments
assert_eq!(bfs.len(), 4);
assert_eq!(meta.segment_loc_indices.len(), 10);

for idx in meta.segment_loc_indices.iter().take(3) {
let bf = &bfs[*idx as usize];
assert!(bf.contains(&b"a"));
assert!(bf.contains(&b"b"));
}
for bf in bfs.iter().take(5).skip(2) {
for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
let bf = &bfs[*idx as usize];
assert!(bf.contains(&b"c"));
assert!(bf.contains(&b"d"));
}
for bf in bfs.iter().take(10).skip(5) {
for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
let bf = &bfs[*idx as usize];
assert!(bf.contains(&b"e"));
assert!(bf.contains(&b"f"));
}
Expand Down
Loading