Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod sort;
mod sort_create;
pub mod sort;
pub mod sort_create;

use async_trait::async_trait;

Expand Down
4 changes: 2 additions & 2 deletions src/index/src/inverted_index/create/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod external_provider;
mod external_sort;
pub mod external_provider;
pub mod external_sort;
mod intermediate_rw;
mod merge_stream;

Expand Down
10 changes: 9 additions & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::any::Any;
use std::io::Error as IoError;

use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
Expand Down Expand Up @@ -167,6 +167,12 @@ pub enum Error {
total_row_count: usize,
expected_row_count: usize,
},

#[snafu(display("External error"))]
External {
source: BoxedError,
location: Location,
},
}

impl ErrorExt for Error {
Expand Down Expand Up @@ -197,6 +203,8 @@ impl ErrorExt for Error {
| FstInsert { .. }
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,

External { source, .. } => source.status_code(),
}
}

Expand Down
40 changes: 36 additions & 4 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to push index value"))]
PushIndexValue {
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to write index completely"))]
IndexFinish {
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Operate on aborted index"))]
OperateAbortedIndex { location: Location },

#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
source: puffin::error::Error,
Expand All @@ -463,6 +478,18 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to clean dir {dir}"))]
CleanDir {
dir: String,
Expand Down Expand Up @@ -510,6 +537,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
Expand Down Expand Up @@ -557,10 +585,14 @@ impl ErrorExt for Error {
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
}
Expand Down
43 changes: 42 additions & 1 deletion src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ lazy_static! {
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!(
"index_apply_elapsed",
"greptime_index_apply_elapsed",
"index apply elapsed",
)
.unwrap();
Expand All @@ -160,6 +160,26 @@ lazy_static! {
"index apply memory usage",
)
.unwrap();
/// Timer of index creation.
pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL]
)
.unwrap();
/// Counter of rows indexed.
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_rows_total",
"index create rows total",
)
.unwrap();
/// Counter of created index bytes.
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!(
"greptime_index_create_bytes_total",
"index create bytes total",
)
.unwrap();

/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_bytes_total",
Expand All @@ -170,6 +190,15 @@ lazy_static! {
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of write bytes on puffin files.
pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of read bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of write bytes on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "intermediate"]);

/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
Expand All @@ -190,5 +219,17 @@ lazy_static! {
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
/// Counter of read operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "intermediate"]);
/// Counter of seek operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "intermediate"]);
/// Counter of write operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "intermediate"]);
/// Counter of flush operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "intermediate"]);
// ------- End of index metrics.
}
2 changes: 1 addition & 1 deletion src/mito2/src/row_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait RowCodec {
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
data_type: ConcreteDataType,
}
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

pub mod applier;
mod codec;
pub mod creator;
mod store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

// TODO(zhongzc): how to determine this value?
/// The minimum memory usage threshold for a column to qualify for external sorting during index creation.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192;

/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
99 changes: 97 additions & 2 deletions src/mito2/src/sst/index/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use datatypes::value::ValueRef;
use datatypes::value::{Value, ValueRef};
use memcomparable::Serializer;
use store_api::metadata::ColumnMetadata;

use crate::error::Result;
use crate::row_converter::SortField;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
Expand All @@ -36,9 +37,65 @@ impl IndexValueCodec {
}
}

type ColumnName = String;

/// Decodes primary key values into their corresponding column names, data types and values.
pub struct IndexValuesCodec {
/// The tag column names.
column_names: Vec<ColumnName>,
/// The data types of tag columns.
fields: Vec<SortField>,
/// The decoder for the primary key.
decoder: McmpRowCodec,
}

impl IndexValuesCodec {
/// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns.
pub fn from_tag_columns<'a>(tag_columns: impl Iterator<Item = &'a ColumnMetadata>) -> Self {
let (column_names, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
column.column_schema.name.clone(),
SortField::new(column.column_schema.data_type.clone()),
)
})
.unzip();

let decoder = McmpRowCodec::new(fields.clone());
Self {
column_names,
fields,
decoder,
}
}

/// Decodes a primary key into its corresponding column names, data types and values.
pub fn decode(
&self,
primary_key: &[u8],
) -> Result<impl Iterator<Item = (&ColumnName, &SortField, Option<Value>)>> {
let values = self.decoder.decode(primary_key)?;

let iter = values
.into_iter()
.zip(&self.column_names)
.zip(&self.fields)
.map(|((value, column_name), encoder)| {
if value.is_null() {
(column_name, encoder, None)
} else {
(column_name, encoder, Some(value))
}
});

Ok(iter)
}
}

#[cfg(test)]
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;

use super::*;
use crate::error::Error;
Expand All @@ -62,4 +119,42 @@ mod tests {
let res = IndexValueCodec::encode_value(value, &field, &mut buffer);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
}

#[test]
fn test_decode_primary_key_basic() {
let tag_columns = vec![
ColumnMetadata {
column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
semantic_type: api::v1::SemanticType::Tag,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
semantic_type: api::v1::SemanticType::Tag,
column_id: 2,
},
];

let primary_key = McmpRowCodec::new(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
])
.encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
.unwrap();

let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter());
let mut iter = codec.decode(&primary_key).unwrap();

let (column_name, field, value) = iter.next().unwrap();
assert_eq!(column_name, "tag0");
assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype()));
assert_eq!(value, None);

let (column_name, field, value) = iter.next().unwrap();
assert_eq!(column_name, "tag1");
assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype()));
assert_eq!(value, Some(Value::Int64(10)));

assert!(iter.next().is_none());
}
}
Loading