Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ pub struct WorkerConfig {

/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,

/// The interval in which the worker will send a heartbeat.
/// The timer is reset on each existing RPC call that also happens to send this data, like
/// `PollWorkflowTaskQueueRequest`.
#[builder(default)]
pub heartbeat_interval: Option<Duration>,
}

impl WorkerConfig {
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum-iterator = "2"
flate2 = { version = "1.0", optional = true }
futures-util = { version = "0.3", default-features = false }
futures-channel = { version = "0.3", default-features = false, features = ["std"] }
gethostname = "1.0.2"
governor = "0.8"
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.2", optional = true }
Expand Down
36 changes: 18 additions & 18 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
gen_assert_and_reply, mock_manual_poller, mock_poller, mock_poller_from_resps,
mock_sdk_cfg, mock_worker, poll_and_reply, single_hist_mock_sg, test_worker_cfg,
},
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
worker::client::mocks::{mock_manual_worker_client, mock_worker_client},
};
use futures_util::FutureExt;
use itertools::Itertools;
Expand Down Expand Up @@ -86,7 +86,7 @@ fn three_tasks() -> VecDeque<PollActivityTaskQueueResponse> {
async fn max_activities_respected() {
let _task_q = "q";
let mut tasks = three_tasks();
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_poll_activity_task()
.times(3)
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn max_activities_respected() {

#[tokio::test]
async fn activity_not_found_returns_ok() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
// Mock won't even be called, since we weren't tracking activity
mock_client.expect_complete_activity_task().times(0);

Expand All @@ -139,7 +139,7 @@ async fn activity_not_found_returns_ok() {

#[tokio::test]
async fn heartbeats_report_cancels_only_once() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_record_activity_heartbeat()
.times(2)
Expand Down Expand Up @@ -265,7 +265,7 @@ async fn activity_cancel_interrupts_poll() {
.times(3)
.returning(move || poll_resps.pop_front().unwrap());

let mut mock_client = mock_manual_workflow_client();
let mut mock_client = mock_manual_worker_client();
mock_client
.expect_record_activity_heartbeat()
.times(1)
Expand Down Expand Up @@ -323,7 +323,7 @@ async fn activity_cancel_interrupts_poll() {

#[tokio::test]
async fn activity_poll_timeout_retries() {
let mock_client = mock_workflow_client();
let mock_client = mock_worker_client();
let mut calls = 0;
let mut mock_act_poller = mock_poller();
mock_act_poller.expect_poll().times(3).returning(move || {
Expand Down Expand Up @@ -352,7 +352,7 @@ async fn many_concurrent_heartbeat_cancels() {
// them after a few successful heartbeats
const CONCURRENCY_NUM: usize = 5;

let mut mock_client = mock_manual_workflow_client();
let mut mock_client = mock_manual_worker_client();
let mut poll_resps = VecDeque::from(
(0..CONCURRENCY_NUM)
.map(|i| {
Expand Down Expand Up @@ -516,7 +516,7 @@ async fn activity_timeout_no_double_resolve() {

#[tokio::test]
async fn can_heartbeat_acts_during_shutdown() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_record_activity_heartbeat()
.times(1)
Expand Down Expand Up @@ -567,7 +567,7 @@ async fn can_heartbeat_acts_during_shutdown() {
#[tokio::test]
async fn complete_act_with_fail_flushes_heartbeat() {
let last_hb = 50;
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
let last_seen_payload = Rc::new(RefCell::new(None));
let lsp = last_seen_payload.clone();
mock_client
Expand Down Expand Up @@ -622,7 +622,7 @@ async fn complete_act_with_fail_flushes_heartbeat() {
#[tokio::test]
async fn max_tq_acts_set_passed_to_poll_properly() {
let rate = 9.28;
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_poll_activity_task()
.returning(move |_, ao| {
Expand Down Expand Up @@ -659,7 +659,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it(
let num_eager_requested = Arc::new(AtomicUsize::new(0));
let num_eager_requested_clone = num_eager_requested.clone();

let mut mock = mock_workflow_client();
let mut mock = mock_worker_client();
mock.expect_complete_workflow_task()
.times(1)
.returning(move |req| {
Expand Down Expand Up @@ -747,7 +747,7 @@ async fn activity_tasks_from_completion_are_delivered() {
// Clone it to move into the callback below
let num_eager_requested_clone = num_eager_requested.clone();

let mut mock = mock_workflow_client();
let mut mock = mock_worker_client();
mock.expect_complete_workflow_task()
.times(1)
.returning(move |req| {
Expand Down Expand Up @@ -876,7 +876,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
t.add_full_wf_task();
t.add_workflow_execution_completed();

let mut mock = mock_workflow_client();
let mut mock = mock_worker_client();
// Set up two tasks to be returned via normal activity polling
let act_tasks = VecDeque::from(vec![
PollActivityTaskQueueResponse {
Expand Down Expand Up @@ -1004,7 +1004,7 @@ async fn activity_tasks_from_completion_reserve_slots() {

#[tokio::test]
async fn retryable_net_error_exhaustion_is_nonfatal() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_complete_activity_task()
.times(1)
Expand Down Expand Up @@ -1033,7 +1033,7 @@ async fn retryable_net_error_exhaustion_is_nonfatal() {

#[tokio::test]
async fn cant_complete_activity_with_unset_result_payload() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_poll_activity_task()
.returning(move |_, _| {
Expand Down Expand Up @@ -1076,7 +1076,7 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
.times(1)
.returning(move || None);
// They shall all be reported as failed
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
mock_client
.expect_fail_activity_task()
.times(3)
Expand Down Expand Up @@ -1153,7 +1153,7 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
.expect_poll()
.times(1)
.returning(move || None);
let mut mock_client = mock_manual_workflow_client();
let mut mock_client = mock_manual_worker_client();
mock_client
.expect_complete_activity_task()
.times(1)
Expand Down Expand Up @@ -1251,7 +1251,7 @@ async fn pass_activity_summary_to_metadata() {

#[tokio::test]
async fn heartbeat_response_can_be_paused() {
let mut mock_client = mock_workflow_client();
let mut mock_client = mock_worker_client();
// First heartbeat returns pause only
mock_client
.expect_record_activity_heartbeat()
Expand Down
8 changes: 4 additions & 4 deletions core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
MockPollCfg, ResponseType, build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg,
mock_worker, single_hist_mock_sg,
},
worker::client::mocks::mock_workflow_client,
worker::client::mocks::mock_worker_client,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult};
Expand Down Expand Up @@ -32,7 +32,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
let wf_id = "fakeid";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let t = canned_histories::single_child_workflow_signaled("child-id-1", SIGNAME);
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut worker = mock_sdk(MockPollCfg::from_resp_batches(
wf_id,
t,
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is(
}
_ => canned_histories::single_child_workflow_cancelled("child-id-1"),
};
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mock = single_hist_mock_sg("fakeid", t, [ResponseType::AllHistory], mock, true);
let core = mock_worker(mock);
let act = core.poll_workflow_activation().await.unwrap();
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is(
#[tokio::test]
async fn cancel_already_complete_child_ignored() {
let t = canned_histories::single_child_workflow("child-id-1");
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mock = single_hist_mock_sg("fakeid", t, [ResponseType::AllHistory], mock, true);
let core = mock_worker(mock);
let act = core.poll_workflow_activation().await.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions core/src/core_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
internal_flags::CoreInternalFlags,
replay::DEFAULT_WORKFLOW_TYPE,
test_help::{MockPollCfg, ResponseType, canned_histories, mock_sdk, mock_sdk_cfg},
worker::client::mocks::mock_workflow_client,
worker::client::mocks::mock_worker_client,
};
use std::{
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
Expand Down Expand Up @@ -40,7 +40,7 @@ async fn test_panic_wf_task_rejected_properly() {
let wf_id = "fakeid";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let t = canned_histories::workflow_fails_with_failure_after_timer("1");
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 2], mock);
// We should see one wft failure which has unspecified cause, since panics don't have a defined
// type.
Expand Down Expand Up @@ -72,7 +72,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
let wf_id = "fakeid";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let t = canned_histories::single_timer_wf_completes("1");
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut mh = MockPollCfg::from_resp_batches(
wf_id,
t,
Expand Down Expand Up @@ -131,7 +131,7 @@ async fn activity_id_or_type_change_is_nondeterministic(
canned_histories::single_activity("1")
};
t.set_flags_first_wft(&[CoreInternalFlags::IdAndTypeDeterminismChecks as u32], &[]);
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut mh = MockPollCfg::from_resp_batches(
wf_id,
t,
Expand Down Expand Up @@ -214,7 +214,7 @@ async fn child_wf_id_or_type_change_is_nondeterministic(
let wf_type = DEFAULT_WORKFLOW_TYPE;
let mut t = canned_histories::single_child_workflow("1");
t.set_flags_first_wft(&[CoreInternalFlags::IdAndTypeDeterminismChecks as u32], &[]);
let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut mh = MockPollCfg::from_resp_batches(
wf_id,
t,
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn repro_channel_missing_because_nondeterminism() {
let _ts = t.add_by_type(EventType::TimerStarted);
t.add_workflow_task_scheduled_and_started();

let mock = mock_workflow_client();
let mock = mock_worker_client();
let mut mh =
MockPollCfg::from_resp_batches(wf_id, t, [1.into(), ResponseType::AllHistory], mock);
mh.num_expected_fails = 1;
Expand Down
Loading
Loading