Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod sort;
mod sort_create;
pub mod sort;
pub mod sort_create;

use async_trait::async_trait;

Expand Down
4 changes: 2 additions & 2 deletions src/index/src/inverted_index/create/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod external_provider;
mod external_sort;
pub mod external_provider;
pub mod external_sort;
mod intermediate_rw;
mod merge_stream;

Expand Down
10 changes: 9 additions & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::any::Any;
use std::io::Error as IoError;

use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
Expand Down Expand Up @@ -167,6 +167,12 @@ pub enum Error {
total_row_count: usize,
expected_row_count: usize,
},

#[snafu(display("External error"))]
External {
source: BoxedError,
location: Location,
},
}

impl ErrorExt for Error {
Expand Down Expand Up @@ -197,6 +203,8 @@ impl ErrorExt for Error {
| FstInsert { .. }
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,

External { source, .. } => source.status_code(),
}
}

Expand Down
40 changes: 36 additions & 4 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to push index value"))]
PushIndexValue {
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to write index completely"))]
IndexFinish {
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Operate on aborted index"))]
OperateAbortedIndex { location: Location },

#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
source: puffin::error::Error,
Expand All @@ -463,6 +478,18 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to clean dir {dir}"))]
CleanDir {
dir: String,
Expand Down Expand Up @@ -510,6 +537,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
Expand Down Expand Up @@ -557,10 +585,14 @@ impl ErrorExt for Error {
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
}
Expand Down
43 changes: 42 additions & 1 deletion src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ lazy_static! {
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!(
"index_apply_elapsed",
"greptime_index_apply_elapsed",
"index apply elapsed",
)
.unwrap();
Expand All @@ -160,6 +160,26 @@ lazy_static! {
"index apply memory usage",
)
.unwrap();
/// Timer of index creation.
pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL]
)
.unwrap();
/// Counter of rows indexed.
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_rows_total",
"index create rows total",
)
.unwrap();
/// Counter of created index bytes.
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_bytes_total",
"index create bytes total",
)
.unwrap();

/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_bytes_total",
Expand All @@ -170,6 +190,15 @@ lazy_static! {
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of write bytes on puffin files.
pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of read bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of write bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "intermediate"]);

/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
Expand All @@ -190,5 +219,17 @@ lazy_static! {
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
/// Counter of read operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of seek operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "intermediate"]);
/// Counter of write operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "intermediate"]);
/// Counter of flush operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "intermediate"]);
// ------- End of index metrics.
}
2 changes: 1 addition & 1 deletion src/mito2/src/row_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait RowCodec {
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
data_type: ConcreteDataType,
}
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

pub mod applier;
mod codec;
pub mod creator;
mod store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

// TODO(zhongzc): how to determine this value?
/// The minimum memory usage threshold for a column to qualify for external sorting during index creation.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192;

/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
33 changes: 17 additions & 16 deletions src/mito2/src/sst/index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;

use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;

type ColumnName = String;

/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.
pub struct SstIndexApplierBuilder<'a> {
/// Directory of the region, required argument for constructing [`SstIndexApplier`].
Expand All @@ -52,7 +51,7 @@ pub struct SstIndexApplierBuilder<'a> {
metadata: &'a RegionMetadata,

/// Stores predicates during traversal on the Expr tree.
output: HashMap<ColumnName, Vec<Predicate>>,
output: HashMap<ColumnId, Vec<Predicate>>,
}

impl<'a> SstIndexApplierBuilder<'a> {
Expand Down Expand Up @@ -81,7 +80,11 @@ impl<'a> SstIndexApplierBuilder<'a> {
return Ok(None);
}

let predicates = self.output.into_iter().collect();
let predicates = self
.output
.into_iter()
.map(|(column_id, predicates)| (column_id.to_string(), predicates))
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
self.region_dir,
Expand Down Expand Up @@ -122,18 +125,16 @@ impl<'a> SstIndexApplierBuilder<'a> {
}

/// Helper function to add a predicate to the output.
fn add_predicate(&mut self, column_name: &str, predicate: Predicate) {
match self.output.get_mut(column_name) {
Some(predicates) => predicates.push(predicate),
None => {
self.output.insert(column_name.to_string(), vec![predicate]);
}
}
fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
self.output.entry(column_id).or_default().push(predicate);
}

/// Helper function to get the column type of a tag column.
/// Helper function to get the column id and the column type of a tag column.
/// Returns `None` if the column is not a tag column.
fn tag_column_type(&self, column_name: &str) -> Result<Option<ConcreteDataType>> {
fn tag_column_id_and_type(
&self,
column_name: &str,
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
let column = self
.metadata
.column_by_name(column_name)
Expand All @@ -142,7 +143,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
})?;

Ok((column.semantic_type == SemanticType::Tag)
.then(|| column.column_schema.data_type.clone()))
.then(|| (column.column_id, column.column_schema.data_type.clone())))
}

/// Helper function to get a non-null literal.
Expand Down Expand Up @@ -303,15 +304,15 @@ mod tests {
});

builder.traverse_and_collect(&expr);
let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::RegexMatch(RegexMatchPredicate {
pattern: "bar".to_string()
})
);
let predicates = builder.output.get("b").unwrap();
let predicates = builder.output.get(&2).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/sst/index/applier/builder/between.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(column_name) = Self::column_name(&between.expr) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};
let Some(low) = Self::nonnull_lit(&between.low) else {
Expand All @@ -51,7 +51,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
},
});

self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
}
Expand Down Expand Up @@ -80,7 +80,7 @@ mod tests {

builder.collect_between(&between).unwrap();

let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/sst/index/applier/builder/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ impl<'a> SstIndexApplierBuilder<'a> {
let Some(lit) = Self::nonnull_lit(literal) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
return Ok(());
};

let predicate = Predicate::Range(RangePredicate {
range: range_builder(Self::encode_lit(lit, data_type)?),
});

self.add_predicate(column_name, predicate);
self.add_predicate(column_id, predicate);
Ok(())
}
}
Expand Down Expand Up @@ -230,7 +230,7 @@ mod tests {
builder.collect_comparison_expr(left, op, right).unwrap();
}

let predicates = builder.output.get("a").unwrap();
let predicates = builder.output.get(&1).unwrap();
assert_eq!(predicates.len(), cases.len());
for ((_, expected), actual) in cases.into_iter().zip(predicates) {
assert_eq!(
Expand Down
Loading