-
Notifications
You must be signed in to change notification settings - Fork 101
Worker heartbeat #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker heartbeat #953
Changes from 1 commit
dc53656
7b83f8e
a1d60a6
aef076e
261201c
f8ea55d
ebc6910
207feaa
100ebe2
35d996f
4b902b6
ba88360
51d0de0
ac35033
0190cb7
3c694e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,97 +1,13 @@ | ||
| use crate::WorkerClient; | ||
| use futures_util::future; | ||
| use futures_util::future::AbortHandle; | ||
| use gethostname::gethostname; | ||
| use parking_lot::Mutex; | ||
| use prost_types::Duration as PbDuration; | ||
| use std::sync::Arc; | ||
| use std::time::{Duration, SystemTime}; | ||
| use temporal_sdk_core_api::worker::WorkerConfig; | ||
| use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; | ||
| use tokio::sync::watch; | ||
| use uuid::Uuid; | ||
|
|
||
| /// Heartbeat information | ||
| /// | ||
| /// Note: Experimental | ||
| pub struct WorkerHeartbeatInfo { | ||
| pub(crate) data: Arc<Mutex<WorkerHeartbeatData>>, | ||
| timer_abort: AbortHandle, | ||
| client: Option<Arc<dyn WorkerClient>>, | ||
| interval: Option<Duration>, | ||
| #[cfg(test)] | ||
| heartbeats_sent: Arc<Mutex<usize>>, | ||
| } | ||
|
|
||
| impl WorkerHeartbeatInfo { | ||
| /// Create a new WorkerHeartbeatInfo. A timer is immediately started to track the worker | ||
| /// heartbeat interval. | ||
| pub(crate) fn new(worker_config: WorkerConfig) -> Self { | ||
| // unused abort handle, will be replaced with a new one when we start a new timer | ||
| let (abort_handle, _) = AbortHandle::new_pair(); | ||
|
|
||
| Self { | ||
| data: Arc::new(Mutex::new(WorkerHeartbeatData::new(worker_config.clone()))), | ||
| timer_abort: abort_handle, | ||
| client: None, | ||
| interval: worker_config.heartbeat_interval, | ||
| #[cfg(test)] | ||
| heartbeats_sent: Arc::new(Mutex::new(0)), | ||
| } | ||
| } | ||
|
|
||
| /// Transform heartbeat data into `WorkerHeartbeat` we can send in gRPC request. Some | ||
| /// metrics are also cached for future calls of this function. | ||
| pub(crate) fn capture_heartbeat(&mut self) -> WorkerHeartbeat { | ||
| self.create_new_timer(); | ||
|
|
||
| self.data.lock().capture_heartbeat() | ||
| } | ||
|
|
||
| fn create_new_timer(&mut self) { | ||
| self.timer_abort.abort(); | ||
|
|
||
| let (abort_handle, abort_reg) = AbortHandle::new_pair(); | ||
| let interval = if let Some(dur) = self.interval { | ||
| dur | ||
| } else { | ||
| Duration::from_secs(60) | ||
| }; | ||
| let data = self.data.clone(); | ||
| #[cfg(test)] | ||
| let heartbeats_sent = self.heartbeats_sent.clone(); | ||
| self.timer_abort = abort_handle.clone(); | ||
| if let Some(client) = self.client.clone() { | ||
| tokio::spawn(future::Abortable::new( | ||
| async move { | ||
| loop { | ||
| tokio::time::sleep(interval).await; | ||
| #[cfg(test)] | ||
| { | ||
| let mut num = heartbeats_sent.lock(); | ||
| *num += 1; | ||
| } | ||
|
|
||
| let heartbeat = data.lock().capture_heartbeat(); | ||
| if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { | ||
| warn!(error=?e, "Network error while sending worker heartbeat"); | ||
| } | ||
| } | ||
| }, | ||
| abort_reg, | ||
| )); | ||
| } else { | ||
| warn!("No client attached to heartbeat_info") | ||
| }; | ||
| } | ||
|
|
||
| pub(crate) fn add_client(&mut self, client: Arc<dyn WorkerClient>) { | ||
| self.client = Some(client); | ||
| self.create_new_timer(); | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| pub(crate) struct WorkerHeartbeatData { | ||
| pub struct WorkerHeartbeatData { | ||
| worker_instance_key: String, | ||
| pub(crate) worker_identity: String, | ||
| host_info: WorkerHostInfo, | ||
|
|
@@ -104,10 +20,12 @@ pub(crate) struct WorkerHeartbeatData { | |
| pub(crate) sdk_version: String, | ||
| /// Worker start time | ||
| pub(crate) start_time: SystemTime, | ||
| pub(crate) heartbeat_interval: Duration, | ||
| pub(crate) reset_tx: Option<watch::Sender<()>>, | ||
| } | ||
|
|
||
| impl WorkerHeartbeatData { | ||
| fn new(worker_config: WorkerConfig) -> Self { | ||
| pub fn new(worker_config: WorkerConfig) -> Self { | ||
| Self { | ||
| // TODO: Is this right for worker_identity? | ||
| worker_identity: worker_config | ||
|
|
@@ -125,13 +43,22 @@ impl WorkerHeartbeatData { | |
| start_time: SystemTime::now(), | ||
| heartbeat_time: None, | ||
| worker_instance_key: Uuid::new_v4().to_string(), | ||
| heartbeat_interval: worker_config.heartbeat_interval, | ||
| reset_tx: None, | ||
| } | ||
| } | ||
|
|
||
| fn capture_heartbeat(&mut self) -> WorkerHeartbeat { | ||
| pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
| let now = SystemTime::now(); | ||
| let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { | ||
| let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); // TODO: do we want to fall back to ZERO? | ||
| let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); | ||
|
|
||
| // Only send poll data if it's nearly been a full interval since this data has been sent | ||
| // In this case, "nearly" is 90% of the interval | ||
| if dur.as_secs_f64() < 0.9 * self.heartbeat_interval.as_secs_f64() { | ||
| println!("Heartbeat interval not yet elapsed, not sending poll data"); | ||
| return None; | ||
| } | ||
| Some(PbDuration { | ||
| seconds: dur.as_secs() as i64, | ||
| nanos: dur.subsec_nanos() as i32, | ||
|
|
@@ -141,8 +68,15 @@ impl WorkerHeartbeatData { | |
| }; | ||
|
|
||
| self.heartbeat_time = Some(now); | ||
| if let Some(reset_tx) = &self.reset_tx { | ||
| let _ = reset_tx.send(()); | ||
|
||
| } else { | ||
| warn!( | ||
| "No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup" | ||
| ); | ||
| } | ||
|
|
||
| WorkerHeartbeat { | ||
| Some(WorkerHeartbeat { | ||
| worker_instance_key: self.worker_instance_key.clone(), | ||
| worker_identity: self.worker_identity.clone(), | ||
| host_info: Some(self.host_info.clone()), | ||
|
|
@@ -154,38 +88,33 @@ impl WorkerHeartbeatData { | |
| heartbeat_time: Some(SystemTime::now().into()), | ||
| elapsed_since_last_heartbeat, | ||
| ..Default::default() | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) { | ||
| self.reset_tx = Some(reset_tx); | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::test_help::WorkerExt; | ||
| use crate::test_help::test_worker_cfg; | ||
| use crate::worker; | ||
| use crate::worker::WorkerHeartbeatInfo; | ||
| use crate::worker::client::mocks::mock_worker_client; | ||
| use parking_lot::Mutex; | ||
| use std::ops::Deref; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use temporal_sdk_core_api::Worker; | ||
| use temporal_sdk_core_api::worker::PollerBehavior; | ||
| use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion; | ||
| use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult; | ||
| use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{ | ||
| PollActivityTaskQueueResponse, RecordWorkerHeartbeatResponse, | ||
| RespondActivityTaskCompletedResponse, | ||
| }; | ||
| use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse; | ||
|
|
||
| #[rstest::rstest] | ||
| #[tokio::test] | ||
| async fn worker_heartbeat(#[values(true, false)] extra_heartbeat: bool) { | ||
| async fn worker_heartbeat() { | ||
| let mut mock = mock_worker_client(); | ||
| let record_heartbeat_calls = if extra_heartbeat { 2 } else { 3 }; | ||
| mock.expect_record_worker_heartbeat() | ||
| .times(record_heartbeat_calls) | ||
| .returning(|heartbeat| { | ||
| .times(2) | ||
| .returning(move |heartbeat| { | ||
| let host_info = heartbeat.host_info.clone().unwrap(); | ||
| assert!(heartbeat.worker_identity.is_empty()); | ||
| assert!(!heartbeat.worker_instance_key.is_empty()); | ||
|
|
@@ -202,54 +131,26 @@ mod tests { | |
|
|
||
| Ok(RecordWorkerHeartbeatResponse {}) | ||
| }); | ||
| mock.expect_poll_activity_task() | ||
| .times(1) | ||
| .returning(move |_, _| { | ||
| Ok(PollActivityTaskQueueResponse { | ||
| task_token: vec![1], | ||
| activity_id: "act1".to_string(), | ||
| ..Default::default() | ||
| }) | ||
| }); | ||
| mock.expect_complete_activity_task() | ||
| .returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default())); | ||
|
|
||
| let config = test_worker_cfg() | ||
| .activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) | ||
| .max_outstanding_activities(1_usize) | ||
| .heartbeat_interval(Duration::from_millis(100)) | ||
| .heartbeat_interval(Duration::from_millis(200)) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let heartbeat_info = Arc::new(Mutex::new(WorkerHeartbeatInfo::new(config.clone()))); | ||
| let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new(config.clone()))); | ||
| let client = Arc::new(mock); | ||
| heartbeat_info.lock().add_client(client.clone()); | ||
| let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_info.clone())); | ||
| let _ = heartbeat_info.lock().capture_heartbeat(); | ||
| let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone())); | ||
| let _ = heartbeat_data.lock().capture_heartbeat_if_needed(); | ||
|
|
||
| // heartbeat timer fires once | ||
| tokio::time::sleep(Duration::from_millis(150)).await; | ||
| if extra_heartbeat { | ||
| // reset heartbeat timer | ||
| heartbeat_info.lock().capture_heartbeat(); | ||
| } | ||
| tokio::time::sleep(Duration::from_millis(300)).await; | ||
| // it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here | ||
| assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed()); | ||
| // heartbeat timer fires once | ||
| tokio::time::sleep(Duration::from_millis(180)).await; | ||
|
|
||
| if extra_heartbeat { | ||
| assert_eq!(2, *heartbeat_info.lock().heartbeats_sent.lock().deref()); | ||
| } else { | ||
| assert_eq!(3, *heartbeat_info.lock().heartbeats_sent.lock().deref()); | ||
| } | ||
| tokio::time::sleep(Duration::from_millis(150)).await; | ||
|
|
||
| let task = worker.poll_activity_task().await.unwrap(); | ||
| worker | ||
| .complete_activity_task(ActivityTaskCompletion { | ||
| task_token: task.task_token, | ||
| result: Some(ActivityExecutionResult::ok(vec![1].into())), | ||
| }) | ||
| .await | ||
| .unwrap(); | ||
| worker.drain_activity_poller_and_shutdown().await; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.