Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dd4a1996982534636734674db66e44464b0c0d83" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ impl RegionServerInner {
RegionChange::Register(attribute)
}
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Put(_) | RegionRequest::Delete(_) => RegionChange::Ingest,
RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
RegionChange::Ingest
}
RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
Expand Down
4 changes: 4 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl RegionEngine for MetricEngine {
}
}
RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
RegionRequest::BulkInserts(_) => {
// todo(hl): find a way to support bulk inserts in metric engine.
UnsupportedRegionRequestSnafu { request }.fail()
}
};

result.map_err(BoxedError::new).map(|rows| RegionResponse {
Expand Down
12 changes: 12 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,17 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Failed to convert ConcreteDataType to ColumnDataType: {:?}",
data_type
))]
ConvertDataType {
data_type: ConcreteDataType,
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -1172,6 +1183,7 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,

IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ConvertDataType { .. } => StatusCode::Internal,
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use store_api::manifest::ManifestVersion;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest, RegionTruncateRequest,
AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
Expand Down Expand Up @@ -569,6 +569,13 @@ pub(crate) enum WorkerRequest {

/// Keep the manifest of a region up to date.
SyncRegion(RegionSyncRequest),

/// Bulk inserts request and region metadata.
BulkInserts {
metadata: Option<RegionMetadataRef>,
request: RegionBulkInsertsRequest,
sender: OptionOutputTx,
},
}

impl WorkerRequest {
Expand Down Expand Up @@ -668,6 +675,11 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Catchup(v),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
sender: sender.into(),
request: region_bulk_inserts_request,
},
};

Ok((worker_request, receiver))
Expand Down
25 changes: 21 additions & 4 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Structs and utilities for writing regions.

mod handle_alter;
mod handle_bulk_insert;
mod handle_catchup;
mod handle_close;
mod handle_compaction;
Expand All @@ -25,6 +26,7 @@ mod handle_manifest;
mod handle_open;
mod handle_truncate;
mod handle_write;

use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -52,6 +54,7 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
Expand Down Expand Up @@ -820,16 +823,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
// then exit.
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}

WorkerRequest::SyncRegion(req) => {
self.handle_region_sync(req).await;
}
WorkerRequest::BulkInserts {
metadata,
request,
sender,
} => {
if let Some(region_metadata) = metadata {
self.handle_bulk_inserts(request, region_metadata, write_requests, sender)
.await;
} else {
Comment thread
v0y4g3r marked this conversation as resolved.
error!("Cannot find region metadata for {}", request.region_id);
sender.send(
error::RegionNotFoundSnafu {
region_id: request.region_id,
}
.fail(),
);
}
}
}
}

Expand Down
Loading
Loading