Skip to content
Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "072ce580502e015df1a6b03a185b60309a7c2a7a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6ee24435642d0a6360ddb481152e8453182400d0" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
12 changes: 4 additions & 8 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;

let cluster_id = 0; // TODO(hl): read from config
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
Expand All @@ -296,13 +295,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;

let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Datanode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;

let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
Expand Down
14 changes: 4 additions & 10 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ impl StartCommand {
let mut opts = opts.component;
opts.grpc.detect_server_addr();

// TODO(discord9): make it not optionale after cluster id is required
let cluster_id = opts.cluster_id.unwrap_or(0);

let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
Expand All @@ -252,13 +249,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;

let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Flownode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;

let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;
Expand Down
12 changes: 4 additions & 8 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,10 @@ impl StartCommand {
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;

let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled.
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Frontend,
meta_client_options,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
.await
.context(MetaClientInitSnafu)?;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
Expand Down
49 changes: 12 additions & 37 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::error::{
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::peer::Peer;
use crate::ClusterId;

const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";

Expand Down Expand Up @@ -56,12 +55,9 @@ pub trait ClusterInfo {
// TODO(jeremy): Other info, like region status, etc.
}

/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
Comment thread
MichaelScofield marked this conversation as resolved.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
/// The node id.
Expand All @@ -84,24 +80,15 @@ impl NodeInfoKey {
_ => peer.id,
};

Some(NodeInfoKey {
cluster_id: header.cluster_id,
role,
node_id,
})
Some(NodeInfoKey { role, node_id })
}

pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String {
format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id)
pub fn key_prefix() -> String {
format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
}

pub fn key_prefix_with_role(cluster_id: ClusterId, role: Role) -> String {
format!(
"{}-{}-{}-",
CLUSTER_NODE_INFO_PREFIX,
cluster_id,
i32::from(role)
)
pub fn key_prefix_with_role(role: Role) -> String {
format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
}
}

Expand Down Expand Up @@ -193,15 +180,10 @@ impl FromStr for NodeInfoKey {
let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
.captures(key)
.context(InvalidNodeInfoKeySnafu { key })?;

ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });

let cluster_id = caps[1].to_string();
let role = caps[2].to_string();
let node_id = caps[3].to_string();
let cluster_id: u64 = cluster_id.parse().context(ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let role: i32 = role.parse().context(ParseNumSnafu {
err_msg: format!("invalid role {role}"),
})?;
Expand All @@ -210,11 +192,7 @@ impl FromStr for NodeInfoKey {
err_msg: format!("invalid node_id: {node_id}"),
})?;

Ok(Self {
cluster_id,
role,
node_id,
})
Ok(Self { role, node_id })
}
}

Expand All @@ -233,9 +211,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
"{}-0-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
key.cluster_id,
i32::from(key.role),
key.node_id
)
Expand Down Expand Up @@ -308,15 +285,13 @@ mod tests {
#[test]
fn test_node_info_key_round_trip() {
let key = NodeInfoKey {
cluster_id: 1,
role: Datanode,
node_id: 2,
};

let key_bytes: Vec<u8> = (&key).into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();

assert_eq!(1, new_key.cluster_id);
assert_eq!(Datanode, new_key.role);
assert_eq!(2, new_key.node_id);
}
Expand Down Expand Up @@ -362,11 +337,11 @@ mod tests {

#[test]
fn test_node_info_key_prefix() {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(1);
assert_eq!(prefix, "__meta_cluster_node_info-1-");
let prefix = NodeInfoKey::key_prefix();
assert_eq!(prefix, "__meta_cluster_node_info-0-");

let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend);
assert_eq!(prefix, "__meta_cluster_node_info-2-1-");
let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
}

#[test]
Expand Down
45 changes: 10 additions & 35 deletions src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use store_api::region_engine::{RegionRole, RegionStatistic};
use store_api::storage::RegionId;
use table::metadata::TableId;

use crate::error;
use crate::error::Result;
use crate::{error, ClusterId};

pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
Expand All @@ -48,11 +48,10 @@ lazy_static! {

/// The key of the datanode stat in the storage.
///
/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`.
/// The format is `__meta_datanode_stat-0-{node_id}`.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: ClusterId,
// The datanode Id.
pub id: u64,
// The datanode address.
Expand Down Expand Up @@ -102,10 +101,7 @@ impl Stat {
}

pub fn stat_key(&self) -> DatanodeStatKey {
DatanodeStatKey {
cluster_id: self.cluster_id,
node_id: self.id,
}
DatanodeStatKey { node_id: self.id }
}

/// Returns a tuple array containing [RegionId] and [RegionRole].
Expand Down Expand Up @@ -145,15 +141,14 @@ impl TryFrom<&HeartbeatRequest> for Stat {
} = value;

match (header, peer) {
(Some(header), Some(peer)) => {
(Some(_header), Some(peer)) => {
let region_stats = region_stats
.iter()
.map(RegionStat::from)
.collect::<Vec<_>>();

Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
// datanode id
id: peer.id,
// datanode address
Expand Down Expand Up @@ -196,32 +191,24 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {

/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`.
/// The format is `__meta_datanode_stat-0-{node_id}`.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct DatanodeStatKey {
pub cluster_id: ClusterId,
pub node_id: u64,
}

impl DatanodeStatKey {
/// The key prefix.
pub fn prefix_key() -> Vec<u8> {
format!("{DATANODE_STAT_PREFIX}-").into_bytes()
}

/// The key prefix with the cluster id.
pub fn key_prefix_with_cluster_id(cluster_id: ClusterId) -> String {
format!("{DATANODE_STAT_PREFIX}-{cluster_id}-")
// todo(hl): remove cluster id in prefix
format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
}
}

impl From<DatanodeStatKey> for Vec<u8> {
fn from(value: DatanodeStatKey) -> Self {
format!(
"{}-{}-{}",
DATANODE_STAT_PREFIX, value.cluster_id, value.node_id
)
.into_bytes()
// todo(hl): remove cluster id in prefix
format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
}
}

Expand All @@ -234,20 +221,12 @@ impl FromStr for DatanodeStatKey {
.context(error::InvalidStatKeySnafu { key })?;

ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });

let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;

Ok(Self {
cluster_id,
node_id,
})
Ok(Self { node_id })
}
}

Expand Down Expand Up @@ -321,22 +300,19 @@ mod tests {
#[test]
fn test_stat_key() {
let stat = Stat {
cluster_id: 3,
id: 101,
region_num: 10,
..Default::default()
};

let stat_key = stat.stat_key();

assert_eq!(3, stat_key.cluster_id);
assert_eq!(101, stat_key.node_id);
}

#[test]
fn test_stat_val_round_trip() {
let stat = Stat {
cluster_id: 0,
id: 101,
region_num: 100,
..Default::default()
Expand All @@ -351,7 +327,6 @@ mod tests {
assert_eq!(1, stats.len());

let stat = stats.first().unwrap();
assert_eq!(0, stat.cluster_id);
assert_eq!(101, stat.id);
assert_eq!(100, stat.region_num);
}
Expand Down
9 changes: 2 additions & 7 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};
use crate::DatanodeId;

pub mod alter_database;
pub mod alter_logical_tables;
Expand All @@ -57,7 +57,6 @@ pub mod utils;

#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
pub tracing_context: Option<W3cTrace>,
}

Expand Down Expand Up @@ -90,10 +89,6 @@ pub trait ProcedureExecutor: Send + Sync {

pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;

pub struct TableMetadataAllocatorContext {
pub cluster_id: ClusterId,
}

/// Metadata allocated to a table.
#[derive(Default)]
pub struct TableMetadata {
Expand All @@ -108,7 +103,7 @@ pub struct TableMetadata {

pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;

pub type DetectingRegion = (ClusterId, DatanodeId, RegionId);
pub type DetectingRegion = (DatanodeId, RegionId);

/// Used for actively registering Region failure detectors.
///
Expand Down
Loading