Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datatypes/src/types/duration_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;

use super::LogicalPrimitiveType;
use crate::data_type::DataType;
use crate::duration::{
DurationMicrosecond, DurationMillisecond, DurationNanosecond, DurationSecond,
Expand All @@ -34,6 +33,7 @@ use crate::error;
use crate::prelude::{
ConcreteDataType, LogicalTypeId, MutableVector, ScalarVectorBuilder, Value, ValueRef, Vector,
};
use crate::types::LogicalPrimitiveType;
use crate::vectors::{
DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector,
DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder,
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/types/primitive_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use num::NumCast;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;

use super::boolean_type::bool_to_numeric;
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Result};
use crate::scalars::{Scalar, ScalarRef, ScalarVectorBuilder};
use crate::type_id::LogicalTypeId;
use crate::types::boolean_type::bool_to_numeric;
use crate::types::{DateTimeType, DateType};
use crate::value::{Value, ValueRef};
use crate::vectors::{MutableVector, PrimitiveVector, PrimitiveVectorBuilder, Vector};
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/vectors/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECIS
use common_decimal::Decimal128;
use snafu::{OptionExt, ResultExt};

use super::{MutableVector, Validity, Vector, VectorRef};
use crate::arrow::datatypes::DataType as ArrowDataType;
use crate::data_type::ConcreteDataType;
use crate::error::{
Expand All @@ -33,6 +32,7 @@ use crate::prelude::{ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::value::{Value, ValueRef};
use crate::vectors;
use crate::vectors::{MutableVector, Validity, Vector, VectorRef};

/// Decimal128Vector is a vector keep i128 values with precision and scale.
#[derive(Debug, PartialEq)]
Expand Down
10 changes: 4 additions & 6 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ use arrow_schema::IntervalUnit;
use datafusion_common::ScalarValue;
use snafu::{OptionExt, ResultExt};

use super::{
Decimal128Vector, DurationMicrosecondVector, DurationMillisecondVector,
DurationNanosecondVector, DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector,
};
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::scalars::{Scalar, ScalarVectorBuilder};
use crate::value::{ListValue, ListValueRef};
use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector,
Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalMonthDayNanoVector,
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Decimal128Vector,
DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector,
Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
ListVector, ListVectorBuilder, MutableVector, NullVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector,
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/vectors/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{PrimitiveVector, PrimitiveVectorBuilder};
use crate::types::{IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType};
use crate::vectors::{PrimitiveVector, PrimitiveVectorBuilder};

pub type IntervalYearMonthVector = PrimitiveVector<IntervalYearMonthType>;
pub type IntervalYearMonthVectorBuilder = PrimitiveVectorBuilder<IntervalYearMonthType>;
Expand Down
4 changes: 1 addition & 3 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
//! Cache for the engine.

mod cache_size;
// TODO(yingwen): Remove this after the write cache is ready.
#[allow(unused)]

pub(crate) mod file_cache;
#[cfg(test)]
pub(crate) mod test_util;
#[allow(unused)]
pub(crate) mod write_cache;

use std::mem;
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! A cache for files.

use std::ops::{Range, RangeBounds};
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -165,6 +165,7 @@ impl FileCache {
}
}

#[allow(unused)]
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
Expand Down
30 changes: 9 additions & 21 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@

//! A write-through cache for remote object stores.

use std::ops::Range;
use std::sync::Arc;

use api::v1::region;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::access_layer::{new_fs_object_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::index::IndexerBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
Expand All @@ -46,6 +39,8 @@ pub struct WriteCache {
/// Local file cache.
file_cache: FileCacheRef,
/// Object store manager.
#[allow(unused)]
/// TODO: Remove unused after implementing async write cache
object_store_manager: ObjectStoreManagerRef,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
Expand Down Expand Up @@ -231,24 +226,17 @@ pub struct SstUploadRequest {

#[cfg(test)]
mod tests {
use api::v1::OpType;

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use object_store::manager::ObjectStoreManager;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::storage::RegionId;

use super::*;
use crate::cache::file_cache::{self, FileCache};
use crate::cache::test_util::new_fs_store;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv};
use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
use crate::test_util::TestEnv;

#[tokio::test]
async fn test_write_and_upload_sst() {
Expand Down Expand Up @@ -296,7 +284,7 @@ mod tests {
cache_manager: Default::default(),
};

let request = SstUploadRequest {
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
remote_store: mock_store.clone(),
Expand All @@ -308,8 +296,8 @@ mod tests {
};

// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(write_request, request, &write_opts)
write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ impl MitoConfig {

Ok(())
}

/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self {
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
self
}
}

/// Operational mode for certain actions.
Expand Down
43 changes: 43 additions & 0 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::Rows;
use common_base::readable_size::ReadableSize;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
Expand Down Expand Up @@ -555,3 +556,45 @@ async fn test_region_usage() {
// region total usage
assert_eq!(region_stat.disk_usage(), 4072);
}

#[tokio::test]
async fn test_engine_with_write_cache() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512));
let engine = env.create_engine(mito_config).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id, None).await;

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();

let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
2 changes: 1 addition & 1 deletion src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use super::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::TableRef;

use super::semantic_type;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
TableNotFoundSnafu,
};
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;

const DEFAULT_PLACEHOLDER_VALUE: &str = "default";

Expand Down