Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
derive_builder = "0.12"
etcd-client = "0.10"
futures.workspace = true
h2 = "0.3"
Expand Down
10 changes: 8 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;

use crate::cluster::MetaPeerClient;
use crate::cluster::MetaPeerClientBuilder;
use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -91,7 +91,13 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
};

let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef;
let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone());

let meta_peer_client = MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
// Safety: all required fields set at initialization
.unwrap();

let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
Expand Down
144 changes: 105 additions & 39 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,93 @@
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::warn;
use derive_builder::Builder;
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::Result;
use crate::error::{match_for_io_error, Result};
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::ElectionRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::{error, util};

#[derive(Clone)]
#[derive(Builder, Clone)]
pub struct MetaPeerClient {
election: Option<ElectionRef>,
in_memory: ResetableKvStoreRef,
#[builder(default = "ChannelManager::default()")]
channel_manager: ChannelManager,
#[builder(default = "3")]
max_retry_count: usize,
#[builder(default = "1000")]
retry_interval_ms: u64,
}

impl MetaPeerClient {
pub fn new(in_mem: ResetableKvStoreRef, election: Option<ElectionRef>) -> Self {
Self {
election,
in_memory: in_mem,
channel_manager: ChannelManager::default(),
}
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&stat_prefix);
let req = RangeRequest {
key: stat_prefix.clone(),
range_end,
..Default::default()
};
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);

let kvs = self.range(key, range_end).await?;

to_stat_kv_map(kvs)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();

let kvs = self.batch_get(stat_keys).await?;

to_stat_kv_map(kvs)
}

// Range kv information from the leader's in_mem kv store
pub async fn range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let kvs = self.in_memory.range(req).await?.kvs;
return to_stat_kv_map(kvs);
let request = RangeRequest {
key,
range_end,
..Default::default()
};

return self.in_memory.range(request).await.map(|resp| resp.kvs);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_range(key.clone(), range_end.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
}

async fn remote_range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -69,39 +110,54 @@ impl MetaPeerClient {
.get(&leader_addr)
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(req);
let request = tonic::Request::new(RangeRequest {
key,
range_end,
..Default::default()
});

let response: RangeResponse = ClusterClient::new(channel)
.range(request)
.await
.context(error::BatchGetSnafu)?
.context(error::RangeSnafu)?
.into_inner();

check_resp_header(&response.header, Context { addr: &leader_addr })?;

to_stat_kv_map(response.kvs)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let stat_kvs = self.batch_get(stat_keys).await?;

let mut result = HashMap::with_capacity(stat_kvs.len());
for stat_kv in stat_kvs {
let stat_key = stat_kv.key.try_into()?;
let stat_val = stat_kv.value.try_into()?;
result.insert(stat_key, stat_val);
}
Ok(result)
Ok(response.kvs)
}

// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
return self.in_memory.batch_get(keys).await;
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_batch_get(keys.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}

async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -113,11 +169,11 @@ impl MetaPeerClient {
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(BatchGetRequest {
keys: keys.clone(),
keys,
..Default::default()
});

let response: BatchGetResponse = ClusterClient::new(channel.clone())
let response: BatchGetResponse = ClusterClient::new(channel)
.batch_get(request)
.await
.context(error::BatchGetSnafu)?
Expand Down Expand Up @@ -165,6 +221,16 @@ fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()
Ok(())
}

fn need_retry(error: &error::Error) -> bool {
match error {
error::Error::IsNotLeader { .. } => true,
error::Error::Range { source, .. } | error::Error::BatchGet { source, .. } => {
match_for_io_error(source).is_some()
}
_ => false,
}
}

#[cfg(test)]
mod tests {
use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader};
Expand Down
22 changes: 22 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to batch range KVs from leader's in_memory kv store, source: {}",
source
))]
Range {
source: tonic::Status,
backtrace: Backtrace,
},

#[snafu(display("Response header not found"))]
ResponseHeaderNotFound { backtrace: Backtrace },

Expand All @@ -213,6 +222,17 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"The number of retries for the grpc call {} exceeded the limit, {}",
func_name,
retry_num
))]
ExceededRetryLimit {
func_name: String,
retry_num: usize,
backtrace: Backtrace,
},

#[snafu(display("An error occurred in Meta, source: {}", source))]
MetaInternal {
#[snafu(backtrace)]
Expand Down Expand Up @@ -271,6 +291,7 @@ impl ErrorExt for Error {
| Error::NoLeader { .. }
| Error::CreateChannel { .. }
| Error::BatchGet { .. }
| Error::Range { .. }
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
Expand All @@ -279,6 +300,7 @@ impl ErrorExt for Error {
| Error::Unlock { .. }
| Error::LeaseGrant { .. }
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/service/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use api::v1::meta::{
cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse,
ResponseHeader,
};
use common_telemetry::warn;
use tonic::{Request, Response};

use crate::metasrv::MetaSrv;
Expand All @@ -31,6 +32,8 @@ impl cluster_server::Cluster for MetaSrv {
header: Some(is_not_leader),
..Default::default()
};

warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}

Expand All @@ -53,6 +56,8 @@ impl cluster_server::Cluster for MetaSrv {
header: Some(is_not_leader),
..Default::default()
};

warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}

Expand Down