Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
Comment thread
fengjiachun marked this conversation as resolved.
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
19 changes: 19 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options
# [logging]
# Specify logs directory.
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl DatanodeOptions {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Expand Down
16 changes: 5 additions & 11 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::builder::GrpcServerBuilder;
Expand Down Expand Up @@ -457,27 +457,21 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}

let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
)
.await
.context(BuildMitoEngineSnafu)?,

WalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;

/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
Expand Down
2 changes: 0 additions & 2 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,5 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]

pub mod inverted_index;
50 changes: 37 additions & 13 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
Expand All @@ -37,6 +39,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}

impl std::fmt::Debug for AccessLayer {
Expand All @@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer {

impl AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
intermediate_manager,
}
}

Expand Down Expand Up @@ -105,16 +114,15 @@ impl AccessLayer {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
request,
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
Expand All @@ -124,19 +132,30 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
}
.build();
let mut writer = ParquetWriter::new(
file_path,
request.metadata,
self.object_store.clone(),
indexer,
);
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
}
}

Expand All @@ -150,7 +169,12 @@ pub(crate) struct SstWriteRequest {
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
}

/// Creates a fs object store with atomic write dir.
Expand Down
Loading