Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6d4131f57ece9a8d250a289b3af1567eab768c86" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ffc2bdfabea578b1d264a11b741df12395e89e87" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
15 changes: 2 additions & 13 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
#![feature(try_blocks)]

use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use api::v1::meta::{RegionStat, TableIdent, TableName};
use api::v1::meta::RegionStat;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
Expand Down Expand Up @@ -248,23 +247,13 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<Reg
region_number += region_numbers.len() as u64;

let engine = &table_info.meta.engine;
let table_id = table_info.ident.table_id;

match table.region_stats() {
Ok(stats) => {
let stats = stats.into_iter().map(|stat| RegionStat {
region_id: stat.region_id,
table_ident: Some(TableIdent {
table_id,
table_name: Some(TableName {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
}),
engine: engine.clone(),
}),
approximate_bytes: stat.disk_usage_bytes as i64,
attrs: HashMap::from([("engine_name".to_owned(), engine.clone())]),
engine: engine.clone(),
..Default::default()
});

Expand Down
52 changes: 25 additions & 27 deletions src/catalog/src/remote/region_alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_meta::ident::TableIdent;
use common_meta::RegionIdent;
use common_telemetry::{debug, error, info, warn};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
use table::metadata::TableId;
Expand Down Expand Up @@ -166,39 +166,35 @@ impl RegionAliveKeepers {
#[async_trait]
impl HeartbeatResponseHandler for RegionAliveKeepers {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
!ctx.response.region_leases.is_empty()
ctx.response.region_lease.is_some()
}

async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let leases = ctx.response.region_leases.drain(..).collect::<Vec<_>>();
for lease in leases {
let table_ident: TableIdent = match lease
.table_ident
.context(InvalidProtoMsgSnafu {
err_msg: "'table_ident' is missing in RegionLease",
})
.and_then(|x| x.try_into())
{
Ok(x) => x,
Err(e) => {
error!(e; "");
continue;
}
};

let table_id = table_ident.table_id;
let region_lease = ctx
.response
.region_lease
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'region_lease' is missing in heartbeat response",
})?;
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
for raw_region_id in &region_lease.region_ids {
let region_id = RegionId::from_u64(*raw_region_id);
let table_id = region_id.table_id();
let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for table {table_ident} is not found!");
warn!("Alive keeper for table {table_id} is not found!");
continue;
};

let start_instant = self.epoch + Duration::from_millis(lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(lease.lease_seconds);
keeper.keep_lived(lease.regions, deadline).await;
// TODO(jeremy): refactor this, use region_id
keeper
.keep_lived(vec![region_id.region_number()], deadline)
.await;
}
Ok(HandleControl::Continue)
}
Expand Down Expand Up @@ -602,12 +598,14 @@ mod test {
let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _;
let lease_seconds = 100;
let response = HeartbeatResponse {
region_leases: vec![RegionLease {
table_ident: Some(table_ident.clone().into()),
regions: vec![1, 3], // Not extending region 2's lease time.
region_lease: Some(RegionLease {
region_ids: vec![
RegionId::new(table_ident.table_id, 1).as_u64(),
RegionId::new(table_ident.table_id, 3).as_u64(),
], // Not extending region 2's lease time.
duration_since_epoch,
lease_seconds,
}],
}),
..Default::default()
};
let keep_alive_until = keepers.epoch
Expand Down
18 changes: 5 additions & 13 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, NodeStat, Peer, RegionStat, TableIdent};
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
Expand Down Expand Up @@ -216,16 +216,13 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let (region_num,region_stats) = Self::load_stats(&region_server_clone).await;
// TODO(jeremy): refactor load_status
let (_,region_stats) = Self::load_stats(&region_server_clone).await;
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
node_stat: Some(NodeStat {
region_num: region_num as _,
..Default::default()
}),
region_stats,
duration_since_epoch: (Instant::now() - epoch).as_millis() as u64,
node_epoch,
Expand Down Expand Up @@ -268,14 +265,9 @@ impl HeartbeatTask {
let region_stats = region_ids
.into_iter()
.map(|region_id| RegionStat {
// TODO: scratch more info
// TODO(ruihang): scratch more info
region_id: region_id.as_u64(),
table_ident: Some(TableIdent {
table_id: region_id.table_id(),
table_name: None,
engine: "MitoEngine".to_string(),
}),

engine: "MitoEngine".to_string(),
..Default::default()
})
.collect::<Vec<_>>();
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
pub region_leases: Vec<RegionLease>,
pub region_lease: Option<RegionLease>,
}

impl HeartbeatAccumulator {
Expand Down Expand Up @@ -256,7 +256,7 @@ impl HeartbeatHandlerGroup {
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,
region_leases: acc.region_leases,
region_lease: acc.region_lease,
..Default::default()
};
Ok(res)
Expand Down
32 changes: 13 additions & 19 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ impl HeartbeatHandler for RegionFailureHandler {
region_idents: stat
.region_stats
.iter()
.map(|x| RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_ident: TableIdent {
catalog: x.table_ident.catalog.clone(),
schema: x.table_ident.schema.clone(),
table: x.table_ident.table.clone(),
table_id: RegionId::from(x.id).table_id(),
// TODO(#1583): Use the actual table engine.
engine: MITO_ENGINE.to_string(),
},
region_number: x.id as u32,
.map(|x| {
let region_id = RegionId::from(x.id);
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_ident: TableIdent {
table_id: region_id.table_id(),
// TODO(#1583): Use the actual table engine.
engine: MITO_ENGINE.to_string(),
..Default::default()
},
region_number: region_id.region_number(),
}
})
.collect(),
heartbeat_time: stat.timestamp_millis,
Expand Down Expand Up @@ -128,13 +129,6 @@ mod tests {
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
table_ident: TableIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
table_id: 0,
engine: "d".to_string(),
},
rcus: 0,
wcus: 0,
approximate_bytes: 0,
Expand Down
38 changes: 7 additions & 31 deletions src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
// limitations under the License.

use api::v1::meta::HeartbeatRequest;
use common_meta::ident::TableIdent;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;

use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
Expand All @@ -31,10 +29,8 @@ pub struct Stat {
pub rcus: i64,
/// The write capacity units during this period
pub wcus: i64,
/// How many tables on this node
pub table_num: i64,
/// How many regions on this node
pub region_num: Option<u64>,
pub region_num: u64,
pub region_stats: Vec<RegionStat>,
// The node epoch is used to check whether the node has restarted or redeployed.
pub node_epoch: u64,
Expand All @@ -43,7 +39,6 @@ pub struct Stat {
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct RegionStat {
pub id: u64,
pub table_ident: TableIdent,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
Expand All @@ -70,19 +65,13 @@ impl TryFrom<HeartbeatRequest> for Stat {
let HeartbeatRequest {
header,
peer,
node_stat,
region_stats,
node_epoch,
..
} = value;

match (header, peer, node_stat) {
(Some(header), Some(peer), Some(node_stat)) => {
let region_num = if node_stat.region_num >= 0 {
Some(node_stat.region_num as u64)
} else {
None
};
match (header, peer) {
(Some(header), Some(peer)) => {
let region_stats = region_stats
.into_iter()
.map(RegionStat::try_from)
Expand All @@ -93,10 +82,9 @@ impl TryFrom<HeartbeatRequest> for Stat {
cluster_id: header.cluster_id,
id: peer.id,
addr: peer.addr,
rcus: node_stat.rcus,
wcus: node_stat.wcus,
table_num: node_stat.table_num,
region_num,
rcus: region_stats.iter().map(|s| s.rcus).sum(),
wcus: region_stats.iter().map(|s| s.wcus).sum(),
region_num: region_stats.len() as u64,
region_stats,
node_epoch,
})
Expand All @@ -113,20 +101,8 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
type Error = Error;

fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
let table_ident = value.table_ident.context(InvalidHeartbeatRequestSnafu {
err_msg: "missing table_ident",
})?;
let table_ident_result = TableIdent::try_from(table_ident);
let Ok(table_ident) = table_ident_result else {
return InvalidHeartbeatRequestSnafu {
err_msg: format!("invalid table_ident: {:?}", table_ident_result.err()),
}
.fail();
};

Ok(Self {
id: value.region_id,
table_ident,
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
Expand All @@ -144,7 +120,7 @@ mod tests {
let stat = Stat {
cluster_id: 3,
id: 101,
region_num: Some(10),
region_num: 10,
..Default::default()
};

Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ mod tests {
let val: StatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(Some(1), val.stats[0].region_num);
assert_eq!(1, val.stats[0].region_num);

handle_request_many_times(ctx.clone(), &handler, 10).await;

Expand All @@ -221,7 +221,7 @@ mod tests {
stat: Some(Stat {
cluster_id: 3,
id: 101,
region_num: Some(i as _),
region_num: i as _,
..Default::default()
}),
..Default::default()
Expand Down
Loading