Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ pub struct TableEntryValue {

#[cfg(test)]
mod tests {
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;
Expand Down
25 changes: 10 additions & 15 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogstore;
use log_store::LogConfig;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
Expand All @@ -42,7 +42,7 @@ use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, Result, StartLogStoreSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu,
Comment thread
v0y4g3r marked this conversation as resolved.
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
Expand All @@ -53,7 +53,7 @@ mod grpc;
mod script;
mod sql;

pub(crate) type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogstore>>;

// An abstraction to read/write services.
pub struct Instance {
Expand All @@ -63,15 +63,15 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<LocalFileLogStore>,
pub(crate) logstore: Arc<RaftEngineLogstore>,
Comment thread
waynexia marked this conversation as resolved.
Outdated
}

pub type InstanceRef = Arc<Instance>;

impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);

let meta_client = match opts.mode {
Mode::Standalone => None,
Expand Down Expand Up @@ -166,11 +166,11 @@ impl Instance {
}

pub async fn start(&self) -> Result<()> {
self.logstore.start().await.context(StartLogStoreSnafu)?;
self.catalog_manager
.start()
.await
.context(NewCatalogSnafu)?;
self.logstore.start().await.context(StartLogStoreSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
Expand Down Expand Up @@ -293,9 +293,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
Ok(meta_client)
}

pub(crate) async fn create_local_file_log_store(
path: impl AsRef<str>,
) -> Result<LocalFileLogStore> {
pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngineLogstore> {
let path = path.as_ref();
// create WAL directory
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
Expand All @@ -307,9 +305,6 @@ pub(crate) async fn create_local_file_log_store(
..Default::default()
};

let log_store = LocalFileLogStore::open(&log_config)
.await
.context(error::OpenLogStoreSnafu)?;

Ok(log_store)
let logstore = RaftEngineLogstore::try_new(log_config).context(OpenLogStoreSnafu)?;
Ok(logstore)
}
4 changes: 2 additions & 2 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use table::table::TableIdProvider;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;

Expand All @@ -41,7 +41,7 @@ impl Instance {

pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;
Expand Down
11 changes: 9 additions & 2 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[build-dependencies]
protobuf-build = { version = "0.14", default-features = false, features = [
"protobuf-codec",
] }

[dependencies]
async-trait.workspace = true
arc-swap = "1.5"
async-stream.workspace = true
async-trait.workspace = true
base64 = "0.13"
byteorder = "1.4"
bytes = "1.1"
Expand All @@ -16,9 +21,11 @@ common-error = { path = "../common/error" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
crc = "3.0"
futures.workspace = true
futures-util = "0.3"
futures.workspace = true
hex = "0.4"
protobuf = { version = "2", features = ["bytes"] }
raft-engine = "0.3"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tempdir = "0.3"
Expand Down
Loading