-
Notifications
You must be signed in to change notification settings - Fork 101
Worker heartbeat #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker heartbeat #953
Changes from 1 commit
dc53656
7b83f8e
a1d60a6
aef076e
261201c
f8ea55d
ebc6910
207feaa
100ebe2
35d996f
4b902b6
ba88360
51d0de0
ac35033
0190cb7
3c694e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,50 +1,128 @@ | ||
| use crate::WorkerClient; | ||
| use crate::abstractions::dbg_panic; | ||
| use gethostname::gethostname; | ||
| use parking_lot::Mutex; | ||
| use prost_types::Duration as PbDuration; | ||
| use std::sync::{Arc, OnceLock}; | ||
| use std::time::{Duration, SystemTime}; | ||
| use temporal_sdk_core_api::worker::WorkerConfig; | ||
| use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; | ||
| use tokio::sync::watch; | ||
| use tokio::task::JoinHandle; | ||
| use tokio::time::MissedTickBehavior; | ||
| use uuid::Uuid; | ||
|
|
||
| pub(crate) struct WorkerHeartbeatManager { | ||
| heartbeat_handle: JoinHandle<()>, | ||
| } | ||
|
|
||
| impl WorkerHeartbeatManager { | ||
| pub(crate) fn new( | ||
| config: WorkerConfig, | ||
| identity: String, | ||
| heartbeat_fn: Arc<OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat> + Send + Sync>>>, | ||
| client: Arc<dyn WorkerClient>, | ||
| ) -> Self { | ||
| let sdk_name_and_ver = client.sdk_name_and_version(); | ||
| let (reset_tx, reset_rx) = watch::channel(()); | ||
| let data = Arc::new(Mutex::new(WorkerHeartbeatData::new( | ||
| config, | ||
| identity, | ||
| sdk_name_and_ver, | ||
| reset_tx, | ||
| ))); | ||
| let data_clone = data.clone(); | ||
|
|
||
| let heartbeat_handle = tokio::spawn(async move { | ||
| let mut reset_rx = reset_rx; | ||
| let mut ticker = tokio::time::interval(data_clone.lock().heartbeat_interval); | ||
| ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); | ||
| loop { | ||
| tokio::select! { | ||
| _ = ticker.tick() => { | ||
| let heartbeat = if let Some(heartbeat) = data_clone.lock().capture_heartbeat_if_needed() { | ||
| heartbeat | ||
| } else { | ||
| continue | ||
| }; | ||
| if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { | ||
| warn!(error=?e, "Network error while sending worker heartbeat"); | ||
yuandrew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if matches!( | ||
| e.code(), | ||
| tonic::Code::Unimplemented | ||
| ) { | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| _ = reset_rx.changed() => { | ||
| ticker.reset(); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| let data_clone = data.clone(); | ||
| if let Err(_) = heartbeat_fn.set(Box::new(move || { | ||
| data_clone.lock().capture_heartbeat_if_needed() | ||
| })) { | ||
| dbg_panic!( | ||
| "Failed to set heartbeat_fn, heartbeat_fn should only be set once, when a singular WorkerHeartbeatInfo is created" | ||
| ); | ||
| } | ||
|
|
||
| Self { heartbeat_handle } | ||
| } | ||
|
|
||
| pub(crate) fn shutdown(&self) { | ||
| self.heartbeat_handle.abort() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| pub struct WorkerHeartbeatData { | ||
| struct WorkerHeartbeatData { | ||
| worker_instance_key: String, | ||
| pub(crate) worker_identity: String, | ||
| worker_identity: String, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, de-pub :feelsgoodman: |
||
| host_info: WorkerHostInfo, | ||
| // Time of the last heartbeat. This is used to both for heartbeat_time and last_heartbeat_time | ||
| pub(crate) heartbeat_time: Option<SystemTime>, | ||
| pub(crate) task_queue: String, | ||
| heartbeat_time: Option<SystemTime>, | ||
| task_queue: String, | ||
| /// SDK name | ||
| pub(crate) sdk_name: String, | ||
| sdk_name: String, | ||
| /// SDK version | ||
| pub(crate) sdk_version: String, | ||
| sdk_version: String, | ||
| /// Worker start time | ||
| pub(crate) start_time: SystemTime, | ||
| pub(crate) heartbeat_interval: Duration, | ||
| pub(crate) reset_tx: Option<watch::Sender<()>>, | ||
| start_time: SystemTime, | ||
| heartbeat_interval: Duration, | ||
| reset_tx: watch::Sender<()>, | ||
| } | ||
|
|
||
| impl WorkerHeartbeatData { | ||
| pub fn new(worker_config: WorkerConfig, worker_identity: String) -> Self { | ||
| fn new( | ||
| worker_config: WorkerConfig, | ||
| worker_identity: String, | ||
| sdk_name_and_ver: (String, String), | ||
| reset_tx: watch::Sender<()>, | ||
| ) -> Self { | ||
| Self { | ||
| worker_identity, | ||
| host_info: WorkerHostInfo { | ||
| host_name: gethostname().to_string_lossy().to_string(), | ||
| process_id: std::process::id().to_string(), | ||
| ..Default::default() | ||
| }, | ||
| sdk_name: String::new(), | ||
| sdk_version: String::new(), | ||
| sdk_name: sdk_name_and_ver.0, | ||
| sdk_version: sdk_name_and_ver.1, | ||
| task_queue: worker_config.task_queue.clone(), | ||
| start_time: SystemTime::now(), | ||
| heartbeat_time: None, | ||
| worker_instance_key: Uuid::new_v4().to_string(), | ||
| heartbeat_interval: worker_config.heartbeat_interval, | ||
| reset_tx: None, | ||
| reset_tx, | ||
| } | ||
| } | ||
|
|
||
| pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
| fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
| let now = SystemTime::now(); | ||
| let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { | ||
| let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); | ||
|
|
@@ -64,13 +142,8 @@ impl WorkerHeartbeatData { | |
| }; | ||
|
|
||
| self.heartbeat_time = Some(now); | ||
| if let Some(reset_tx) = &self.reset_tx { | ||
| let _ = reset_tx.send(()); | ||
| } else { | ||
| warn!( | ||
| "No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup" | ||
| ); | ||
| } | ||
|
|
||
| let _ = self.reset_tx.send(()); | ||
|
|
||
| Some(WorkerHeartbeat { | ||
| worker_instance_key: self.worker_instance_key.clone(), | ||
|
|
@@ -86,10 +159,6 @@ impl WorkerHeartbeatData { | |
| ..Default::default() | ||
| }) | ||
| } | ||
|
|
||
| pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) { | ||
| self.reset_tx = Some(reset_tx); | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
@@ -99,7 +168,6 @@ mod tests { | |
| use crate::test_help::test_worker_cfg; | ||
| use crate::worker; | ||
| use crate::worker::client::mocks::mock_worker_client; | ||
| use parking_lot::Mutex; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use temporal_sdk_core_api::worker::PollerBehavior; | ||
|
|
@@ -112,7 +180,7 @@ mod tests { | |
| .times(2) | ||
| .returning(move |heartbeat| { | ||
| let host_info = heartbeat.host_info.clone().unwrap(); | ||
| assert_eq!("test_identity", heartbeat.worker_identity); | ||
| assert_eq!("test-identity", heartbeat.worker_identity); | ||
| assert!(!heartbeat.worker_instance_key.is_empty()); | ||
| assert_eq!( | ||
| host_info.host_name, | ||
|
|
@@ -135,18 +203,15 @@ mod tests { | |
| .build() | ||
| .unwrap(); | ||
|
|
||
| let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new( | ||
| config.clone(), | ||
| "test_identity".to_string(), | ||
| ))); | ||
| let heartbeat_fn = Arc::new(OnceLock::new()); | ||
| let client = Arc::new(mock); | ||
| let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone())); | ||
| let _ = heartbeat_data.lock().capture_heartbeat_if_needed(); | ||
| let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_fn.clone())); | ||
| heartbeat_fn.get().unwrap()(); | ||
|
|
||
| // heartbeat timer fires once | ||
| tokio::time::sleep(Duration::from_millis(300)).await; | ||
| // it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here | ||
| assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed()); | ||
| assert_eq!(None, heartbeat_fn.get().unwrap()()); | ||
| // heartbeat timer fires once | ||
| tokio::time::sleep(Duration::from_millis(150)).await; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm realizing this should probably just be a https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html