Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
node_id = 42
mode = 'distributed'
rpc_addr = '127.0.0.1:3001'
rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
mysql_addr = '127.0.0.1:4406'
mysql_runtime_size = 4
Expand Down
7 changes: 7 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct StartCommand {
#[clap(long)]
rpc_addr: Option<String>,
#[clap(long)]
rpc_hostname: Option<String>,
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
metasrv_addr: Option<String>,
Expand Down Expand Up @@ -94,6 +96,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(addr) = cmd.rpc_addr {
opts.rpc_addr = addr;
}

if cmd.rpc_hostname.is_some() {
opts.rpc_hostname = cmd.rpc_hostname;
}

if let Some(addr) = cmd.mysql_addr {
opts.mysql_addr = addr;
}
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Default for WalConfig {
pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: usize,
Expand All @@ -99,6 +100,7 @@ impl Default for DatanodeOptions {
Self {
node_id: None,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
mysql_addr: "127.0.0.1:4406".to_string(),
mysql_runtime_size: 2,
Expand Down
48 changes: 46 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::error::{MetaClientInitSnafu, Result};
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
server_hostname: Option<String>,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
Expand All @@ -44,12 +45,14 @@ impl HeartbeatTask {
pub fn new(
node_id: u64,
server_addr: String,
server_hostname: Option<String>,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
) -> Self {
Self {
node_id,
server_addr,
server_hostname,
running: Arc::new(AtomicBool::new(false)),
meta_client,
catalog_manager,
Expand Down Expand Up @@ -96,7 +99,7 @@ impl HeartbeatTask {
}
let interval = self.interval;
let node_id = self.node_id;
let server_addr = self.server_addr.clone();
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let meta_client = self.meta_client.clone();

let catalog_manager_clone = self.catalog_manager.clone();
Expand All @@ -114,7 +117,7 @@ impl HeartbeatTask {
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: server_addr.clone(),
addr: addr.clone(),
}),
node_stat: Some(NodeStat {
region_num,
Expand Down Expand Up @@ -142,3 +145,44 @@ impl HeartbeatTask {
Ok(())
}
}

/// Resolves hostname:port address for meta registration
///
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.clone()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = &bind_addr.split(':').nth(1).unwrap();
Comment thread
sunng87 marked this conversation as resolved.
Outdated
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_owned(),
}
}

#[cfg(test)]
mod tests {

Comment thread
sunng87 marked this conversation as resolved.
Outdated
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", &None)
);
}
}
1 change: 1 addition & 0 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl Instance {
Mode::Distributed => Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.rpc_addr.clone(),
opts.rpc_hostname.clone(),
meta_client.as_ref().unwrap().clone(),
catalog_manager.clone(),
)),
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl Instance {
let heartbeat_task = HeartbeatTask::new(
opts.node_id.unwrap_or(42),
opts.rpc_addr.clone(),
None,
meta_client.clone(),
catalog_manager.clone(),
);
Expand Down