Skip to content

Commit 83de399

Browse files
zhongzcevenyag
andauthored
feat(inverted_index.create): add external sorter (#2950)
* feat(inverted_index.create): add read/write for external intermediate files Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: MAGIC_CODEC_V1 -> CODEC_V1_MAGIC Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: polish comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: fix typos intermedia -> intermediate Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: typos Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat(inverted_index.create): add external sorter Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: fix typos intermedia -> intermediate Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: polish comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: polish comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: drop the stream as early as possible to avoid recursive calls to poll Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: project merge sorted stream Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: add total_row_count to SortOutput Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: remove change of format Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: rename segment null bitmap Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: test type alias Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: allow `memory_usage_threshold` to be None to turn off dumping Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: change segment_row_count type to NonZeroUsize Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: accept BytesRef instead Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: add `push_n` to adapt mito2 Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: add k-way merge TODO Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: more sorter cases Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * refactor: make the merge tree balance Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * Update src/index/src/inverted_index/create/sort/external_sort.rs Co-authored-by: Yingwen <realevenyag@gmail.com> * chore: address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: stable feature Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
1 parent 6b8dbcf commit 83de399

7 files changed

Lines changed: 693 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/index/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@ bytes.workspace = true
1212
common-base.workspace = true
1313
common-error.workspace = true
1414
common-macro.workspace = true
15+
common-telemetry.workspace = true
1516
fst.workspace = true
1617
futures.workspace = true
1718
greptime-proto.workspace = true
1819
mockall.workspace = true
20+
pin-project.workspace = true
1921
prost.workspace = true
2022
regex-automata.workspace = true
2123
regex.workspace = true
2224
snafu.workspace = true
2325

2426
[dev-dependencies]
27+
rand.workspace = true
2528
tokio-util.workspace = true
2629
tokio.workspace = true

src/index/src/inverted_index.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ pub mod search;
1919

2020
pub type FstMap = fst::Map<Vec<u8>>;
2121
pub type Bytes = Vec<u8>;
22+
pub type BytesRef<'a> = &'a [u8];

src/index/src/inverted_index/create/sort.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,46 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod external_provider;
16+
mod external_sort;
17+
mod intermediate_rw;
18+
mod merge_stream;
19+
20+
use async_trait::async_trait;
1521
use common_base::BitVec;
1622
use futures::Stream;
1723

1824
use crate::inverted_index::error::Result;
19-
use crate::inverted_index::Bytes;
20-
21-
mod intermediate_rw;
25+
use crate::inverted_index::{Bytes, BytesRef};
2226

2327
/// A stream of sorted values along with their associated bitmap
2428
pub type SortedStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;
29+
30+
/// Output of a sorting operation, encapsulating a bitmap for null values and a stream of sorted items
31+
pub struct SortOutput {
32+
/// Bitmap indicating which segments have null values
33+
pub segment_null_bitmap: BitVec,
34+
35+
/// Stream of sorted items
36+
pub sorted_stream: SortedStream,
37+
38+
/// Total number of rows in the sorted data
39+
pub total_row_count: usize,
40+
}
41+
42+
/// Handles data sorting, supporting incremental input and retrieval of sorted output
43+
#[async_trait]
44+
pub trait Sorter: Send {
45+
/// Inputs a non-null or null value into the sorter.
46+
/// Should be equivalent to calling `push_n` with n = 1
47+
async fn push(&mut self, value: Option<BytesRef<'_>>) -> Result<()> {
48+
self.push_n(value, 1).await
49+
}
50+
51+
/// Pushing n identical non-null or null values into the sorter.
52+
/// Should be equivalent to calling `push` n times
53+
async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()>;
54+
55+
/// Completes the sorting process and returns the sorted data
56+
async fn output(&mut self) -> Result<SortOutput>;
57+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use async_trait::async_trait;
16+
use futures::{AsyncRead, AsyncWrite};
17+
18+
use crate::inverted_index::error::Result;
19+
20+
/// Trait for managing intermediate files during external sorting for a particular index.
21+
#[mockall::automock]
22+
#[async_trait]
23+
pub trait ExternalTempFileProvider: Send + Sync {
24+
/// Creates and opens a new intermediate file associated with a specific index for writing.
25+
/// The implementation should ensure that the file does not already exist.
26+
///
27+
/// - `index_name`: the name of the index for which the file will be associated
28+
/// - `file_id`: a unique identifier for the new file
29+
async fn create(
30+
&self,
31+
index_name: &str,
32+
file_id: &str,
33+
) -> Result<Box<dyn AsyncWrite + Unpin + Send>>;
34+
35+
/// Retrieves all intermediate files associated with a specific index for an external sorting operation.
36+
///
37+
/// `index_name`: the name of the index to retrieve intermediate files for
38+
async fn read_all(&self, index_name: &str) -> Result<Vec<Box<dyn AsyncRead + Unpin + Send>>>;
39+
}

0 commit comments

Comments
 (0)