Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fn main() {
"greptime/v1/meta/heartbeat.proto",
"greptime/v1/meta/route.proto",
"greptime/v1/meta/store.proto",
"greptime/v1/meta/cluster.proto",
"prometheus/remote/remote.proto",
],
&["."],
Expand Down
27 changes: 27 additions & 0 deletions src/api/greptime/v1/meta/cluster.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

package greptime.v1.meta;

import "greptime/v1/meta/common.proto";
import "greptime/v1/meta/store.proto";

// Cluster service is used for communication between meta nodes.
service Cluster {
// Batch get kvs by input keys from leader's in_memory kv store.
rpc BatchGet(GetKvRequest) returns (GetKvResponse);

// Range get the kvs from leader's in_memory kv store.
rpc Range(RangeRequest) returns (RangeResponse);
}

message GetKvRequest {
Comment thread
fengys1996 marked this conversation as resolved.
Outdated
RequestHeader header = 1;

repeated bytes keys = 2;
}

message GetKvResponse {
Comment thread
fengys1996 marked this conversation as resolved.
Outdated
ResponseHeader header = 1;

repeated KeyValue kvs = 2;
}
34 changes: 31 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
Expand All @@ -22,10 +23,15 @@ use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;

use crate::cluster::MetaPeerClient;
use crate::election::etcd::EtcdElection;
use crate::metasrv::{MetaSrv, MetaSrvOptions};
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::service::store::memory::MemStore;
use crate::{error, Result};

Expand Down Expand Up @@ -57,6 +63,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
}

Expand All @@ -69,8 +76,29 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?),
)
};
let selector = opts.selector.clone().into();
let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), election, None).await;

let mem_kv = Arc::new(MemStore::default()) as ResetableKvStoreRef;
let meta_peer_client = MetaPeerClient::new(mem_kv.clone(), election.clone());

let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
meta_peer_client: meta_peer_client.clone(),
}) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};

let meta_srv = MetaSrv::new(
opts,
kv_store,
Some(mem_kv),
Some(selector),
election,
None,
Some(meta_peer_client),
)
.await;

meta_srv.start().await;

Ok(meta_srv)
}
232 changes: 232 additions & 0 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{
GetKvRequest, GetKvResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::Result;
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::ElectionRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::{error, util};

#[derive(Clone)]
pub struct MetaPeerClient {
election: Option<ElectionRef>,
in_memory: ResetableKvStoreRef,
channel_manager: ChannelManager,
}

impl MetaPeerClient {
pub fn new(in_mem: ResetableKvStoreRef, election: Option<ElectionRef>) -> Self {
Self {
election,
in_memory: in_mem,
channel_manager: ChannelManager::default(),
}
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&stat_prefix);
let req = RangeRequest {
key: stat_prefix.clone(),
range_end,
..Default::default()
};

if self.is_leader() {
let kvs = self.in_memory.range(req).await?.kvs;
return to_stat_kv_map(kvs);
}

// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

let leader_addr = election.leader().await?.0;

let channel = self
.channel_manager
.get(leader_addr)
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(req);

let response: RangeResponse = ClusterClient::new(channel)
.range(request)
.await
.context(error::BatchGetSnafu)?
.into_inner();

check_resp_header(&response.header)?;

to_stat_kv_map(response.kvs)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let stat_kvs = self.batch_get(stat_keys).await?;

let mut result = HashMap::with_capacity(stat_kvs.len());
for stat_kv in stat_kvs {
let stat_key = stat_kv.key.try_into()?;
let stat_val = stat_kv.value.try_into()?;
result.insert(stat_key, stat_val);
}
Ok(result)
}

// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
return self.in_memory.batch_get(keys).await;
}

// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

let leader_addr = election.leader().await?.0;

let channel = self
.channel_manager
.get(leader_addr)
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(GetKvRequest {
keys: keys.clone(),
..Default::default()
});

let response: GetKvResponse = ClusterClient::new(channel.clone())
.batch_get(request)
.await
.context(error::BatchGetSnafu)?
.into_inner();

check_resp_header(&response.header)?;

Ok(response.kvs)
}

// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())
.unwrap_or(true)
}
}

fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
let mut map = HashMap::with_capacity(kvs.len());
for kv in kvs {
map.insert(kv.key.try_into()?, kv.value.try_into()?);
}
Ok(map)
}

fn check_resp_header(header: &Option<ResponseHeader>) -> Result<()> {
let header = header
.as_ref()
.context(error::ResponseHeaderNotFoundSnafu)?;

ensure!(!header.is_not_leader(), error::IsNotLeaderSnafu);

Ok(())
}

#[cfg(test)]
mod tests {
use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader};

use super::{check_resp_header, to_stat_kv_map};
use crate::error;
use crate::handler::node_stat::Stat;
use crate::keys::{StatKey, StatValue};

#[test]
fn test_to_stat_kv_map() {
let stat_key = StatKey {
cluster_id: 0,
node_id: 100,
};

let stat = Stat {
cluster_id: 0,
id: 100,
addr: "127.0.0.1:3001".to_string(),
is_leader: true,
..Default::default()
};
let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap();

let kv = KeyValue {
key: stat_key.clone().into(),
value: stat_val,
};

let kv_map = to_stat_kv_map(vec![kv]).unwrap();
assert_eq!(1, kv_map.len());
assert!(kv_map.get(&stat_key).is_some());

let stat_val = kv_map.get(&stat_key).unwrap();
let stat = stat_val.stats.get(0).unwrap();

assert_eq!(0, stat.cluster_id);
assert_eq!(100, stat.id);
assert_eq!("127.0.0.1:3001", stat.addr);
assert!(stat.is_leader);
}

#[test]
fn test_check_resp_header() {
let header = Some(ResponseHeader {
error: None,
..Default::default()
});
let result = check_resp_header(&header);
assert!(result.is_ok());

let result = check_resp_header(&None);
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
error::Error::ResponseHeaderNotFound { .. }
));

let header = Some(ResponseHeader {
error: Some(Error {
code: ErrorCode::NotLeader as i32,
err_msg: "The current meta is not leader".to_string(),
}),
..Default::default()
});
let result = check_resp_header(&header);
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
error::Error::IsNotLeader { .. }
));
}
}
33 changes: 33 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,33 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Invalid KVs length, expect: {}", expect))]
InvalidKvsLength { expect: usize, backtrace: Backtrace },
Comment thread
fengys1996 marked this conversation as resolved.
Outdated

#[snafu(display("Failed to create gRPC channel, source: {}", source))]
CreateChannel {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},

#[snafu(display(
"Failed to batch get KVs from leader's in_memory kv store, source: {}",
source
))]
BatchGet {
source: tonic::Status,
backtrace: Backtrace,
},

#[snafu(display("Response header not found"))]
ResponseHeaderNotFound { backtrace: Backtrace },

#[snafu(display("The requested meta node is not leader"))]
IsNotLeader { backtrace: Backtrace },
Comment thread
fengys1996 marked this conversation as resolved.
Outdated

#[snafu(display("MetaSrv has no meta peer client"))]
NoMetaPeerClient { backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -194,6 +221,11 @@ impl ErrorExt for Error {
| Error::DeserializeFromJson { .. }
| Error::DecodeTableRoute { .. }
| Error::NoLeader { .. }
| Error::CreateChannel { .. }
| Error::BatchGet { .. }
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
Expand All @@ -210,6 +242,7 @@ impl ErrorExt for Error {
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }
| Error::MoveValue { .. }
| Error::InvalidKvsLength { .. }
| Error::InvalidTxnResult { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Expand Down
Loading