Skip to content

Commit 7f546c9

Browse files
committed
Merge branch 'master' into worker-heartbeat
2 parents eee45b8 + 561ca79 commit 7f546c9

File tree

13 files changed

+368
-30
lines changed

13 files changed

+368
-30
lines changed

client/src/metrics.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ use std::{
99
task::{Context, Poll},
1010
time::{Duration, Instant},
1111
};
12-
use temporal_sdk_core_api::telemetry::metrics::{
13-
CoreMeter, Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
14-
MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
12+
use temporal_sdk_core_api::telemetry::{
13+
TaskQueueLabelStrategy,
14+
metrics::{
15+
CoreMeter, Counter, CounterBase, HistogramDuration, HistogramDurationBase,
16+
MetricAttributable, MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
17+
},
1518
};
1619
use tonic::{Code, body::Body, transport::Channel};
1720
use tower::Service;
@@ -31,6 +34,7 @@ pub(crate) struct MetricsContext {
3134
kvs: MetricAttributes,
3235
poll_is_long: bool,
3336
instruments: Instruments,
37+
task_queue_label_strategy: TaskQueueLabelStrategy,
3438
}
3539
#[derive(Clone)]
3640
struct Instruments {
@@ -46,6 +50,7 @@ struct Instruments {
4650
impl MetricsContext {
4751
pub(crate) fn new(tm: TemporalMeter) -> Self {
4852
let meter = tm.inner;
53+
let task_queue_label_strategy = tm.task_queue_label_strategy;
4954
let kvs = meter.new_attributes(tm.default_attribs);
5055
let instruments = Instruments {
5156
svc_request: meter.counter(MetricParameters {
@@ -84,6 +89,7 @@ impl MetricsContext {
8489
poll_is_long: false,
8590
instruments,
8691
meter,
92+
task_queue_label_strategy,
8793
}
8894
}
8995

@@ -250,7 +256,23 @@ impl Service<http::Request<Body>> for GrpcMetricSvc {
250256
.map(|mut m| {
251257
// Attach labels from client wrapper
252258
if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
253-
m.with_new_attrs(other_labels.labels)
259+
m.with_new_attrs(other_labels.labels);
260+
261+
if other_labels.normal_task_queue.is_some()
262+
|| other_labels.sticky_task_queue.is_some()
263+
{
264+
let task_queue_name = match m.task_queue_label_strategy {
265+
TaskQueueLabelStrategy::UseNormal => other_labels.normal_task_queue,
266+
TaskQueueLabelStrategy::UseNormalAndSticky => other_labels
267+
.sticky_task_queue
268+
.or(other_labels.normal_task_queue),
269+
_ => other_labels.normal_task_queue,
270+
};
271+
272+
if let Some(tq_name) = task_queue_name {
273+
m.with_new_attrs([task_queue_kv(tq_name)]);
274+
}
275+
}
254276
}
255277
if let Some(ct) = req.extensions().get::<CallType>()
256278
&& ct.is_long()

client/src/raw.rs

Lines changed: 169 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use crate::{
66
Client, ConfiguredClient, LONG_POLL_TIMEOUT, RequestExt, RetryClient, SharedReplaceableClient,
77
TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
8-
metrics::{namespace_kv, task_queue_kv},
8+
metrics::namespace_kv,
99
worker_registry::{ClientWorkerSet, Slot},
1010
};
1111
use dyn_clone::DynClone;
@@ -370,22 +370,33 @@ impl RawGrpcCaller for Client {}
370370
#[derive(Clone, Debug)]
371371
pub(super) struct AttachMetricLabels {
372372
pub(super) labels: Vec<MetricKeyValue>,
373+
pub(super) normal_task_queue: Option<String>,
374+
pub(super) sticky_task_queue: Option<String>,
373375
}
374376
impl AttachMetricLabels {
375377
pub(super) fn new(kvs: impl Into<Vec<MetricKeyValue>>) -> Self {
376-
Self { labels: kvs.into() }
378+
Self {
379+
labels: kvs.into(),
380+
normal_task_queue: None,
381+
sticky_task_queue: None,
382+
}
377383
}
378384
pub(super) fn namespace(ns: impl Into<String>) -> Self {
379385
AttachMetricLabels::new(vec![namespace_kv(ns.into())])
380386
}
381387
pub(super) fn task_q(&mut self, tq: Option<TaskQueue>) -> &mut Self {
382388
if let Some(tq) = tq {
383-
self.task_q_str(tq.name);
389+
if !tq.normal_name.is_empty() {
390+
self.sticky_task_queue = Some(tq.name);
391+
self.normal_task_queue = Some(tq.normal_name);
392+
} else {
393+
self.normal_task_queue = Some(tq.name);
394+
}
384395
}
385396
self
386397
}
387398
pub(super) fn task_q_str(&mut self, tq: impl Into<String>) -> &mut Self {
388-
self.labels.push(task_queue_kv(tq.into()));
399+
self.normal_task_queue = Some(tq.into());
389400
self
390401
}
391402
}
@@ -635,7 +646,21 @@ proxier! {
635646
let task_queue = req_mut.task_queue.as_ref()
636647
.map(|tq| tq.name.clone()).unwrap_or_default();
637648
match workers.try_reserve_wft_slot(namespace, task_queue) {
638-
Some(s) => slot = Some(s),
649+
Some(reservation) => {
650+
// Populate eager_worker_deployment_options from the slot reservation
651+
if let Some(opts) = reservation.deployment_options {
652+
req_mut.eager_worker_deployment_options = Some(temporal_sdk_core_protos::temporal::api::deployment::v1::WorkerDeploymentOptions {
653+
deployment_name: opts.version.deployment_name,
654+
build_id: opts.version.build_id,
655+
worker_versioning_mode: if opts.use_worker_versioning {
656+
temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into()
657+
} else {
658+
temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into()
659+
},
660+
});
661+
}
662+
slot = Some(reservation.slot);
663+
}
639664
None => req_mut.request_eager_execution = false
640665
}
641666
}
@@ -1398,15 +1423,6 @@ proxier! {
13981423
r.extensions_mut().insert(labels);
13991424
}
14001425
);
1401-
(
1402-
describe_worker,
1403-
DescribeWorkerRequest,
1404-
DescribeWorkerResponse,
1405-
|r| {
1406-
let labels = namespaced_request!(r);
1407-
r.extensions_mut().insert(labels);
1408-
}
1409-
);
14101426
(
14111427
record_worker_heartbeat,
14121428
RecordWorkerHeartbeatRequest,
@@ -1444,6 +1460,15 @@ proxier! {
14441460
r.extensions_mut().insert(labels);
14451461
}
14461462
);
1463+
(
1464+
describe_worker,
1465+
DescribeWorkerRequest,
1466+
DescribeWorkerResponse,
1467+
|r| {
1468+
let labels = namespaced_request!(r);
1469+
r.extensions_mut().insert(labels);
1470+
}
1471+
);
14471472
(
14481473
set_worker_deployment_manager,
14491474
SetWorkerDeploymentManagerRequest,
@@ -1701,7 +1726,7 @@ mod tests {
17011726
}
17021727

17031728
#[tokio::test]
1704-
async fn can_mock_workflow_service() {
1729+
async fn can_mock_services() {
17051730
#[derive(Clone)]
17061731
struct MyFakeServices {}
17071732
impl RawGrpcCaller for MyFakeServices {}
@@ -1760,4 +1785,133 @@ mod tests {
17601785
.unwrap();
17611786
assert_eq!(r.into_inner().namespaces[0].failover_version, 12345);
17621787
}
1788+
1789+
#[rstest::rstest]
1790+
#[case::with_versioning(true)]
1791+
#[case::without_versioning(false)]
1792+
#[tokio::test]
1793+
async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
1794+
use crate::worker_registry::{MockClientWorker, MockSlot};
1795+
use temporal_sdk_core_api::worker::{WorkerDeploymentOptions, WorkerDeploymentVersion};
1796+
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode;
1797+
1798+
let expected_mode = if use_worker_versioning {
1799+
WorkerVersioningMode::Versioned
1800+
} else {
1801+
WorkerVersioningMode::Unversioned
1802+
};
1803+
1804+
#[derive(Clone)]
1805+
struct MyFakeServices {
1806+
client_worker_set: Arc<ClientWorkerSet>,
1807+
expected_mode: WorkerVersioningMode,
1808+
}
1809+
impl RawGrpcCaller for MyFakeServices {}
1810+
impl RawClientProducer for MyFakeServices {
1811+
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
1812+
Some(self.client_worker_set.clone())
1813+
}
1814+
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
1815+
Box::new(MyFakeWfClient {
1816+
expected_mode: self.expected_mode,
1817+
})
1818+
}
1819+
fn operator_client(&mut self) -> Box<dyn OperatorService> {
1820+
unimplemented!()
1821+
}
1822+
fn cloud_client(&mut self) -> Box<dyn CloudService> {
1823+
unimplemented!()
1824+
}
1825+
fn test_client(&mut self) -> Box<dyn TestService> {
1826+
unimplemented!()
1827+
}
1828+
fn health_client(&mut self) -> Box<dyn HealthService> {
1829+
unimplemented!()
1830+
}
1831+
}
1832+
1833+
let deployment_opts = WorkerDeploymentOptions {
1834+
version: WorkerDeploymentVersion {
1835+
deployment_name: "test-deployment".to_string(),
1836+
build_id: "test-build-123".to_string(),
1837+
},
1838+
use_worker_versioning,
1839+
default_versioning_behavior: None,
1840+
};
1841+
1842+
let mut mock_provider = MockClientWorker::new();
1843+
mock_provider
1844+
.expect_namespace()
1845+
.return_const("test-namespace".to_string());
1846+
mock_provider
1847+
.expect_task_queue()
1848+
.return_const("test-task-queue".to_string());
1849+
let mut mock_slot = MockSlot::new();
1850+
mock_slot.expect_schedule_wft().returning(|_| Ok(()));
1851+
mock_provider
1852+
.expect_try_reserve_wft_slot()
1853+
.return_once(|| Some(Box::new(mock_slot)));
1854+
mock_provider
1855+
.expect_deployment_options()
1856+
.return_const(Some(deployment_opts.clone()));
1857+
1858+
let client_worker_set = Arc::new(ClientWorkerSet::new());
1859+
client_worker_set
1860+
.register_worker(Arc::new(mock_provider), true)
1861+
.unwrap();
1862+
1863+
#[derive(Clone)]
1864+
struct MyFakeWfClient {
1865+
expected_mode: WorkerVersioningMode,
1866+
}
1867+
impl WorkflowService for MyFakeWfClient {
1868+
fn start_workflow_execution(
1869+
&mut self,
1870+
request: tonic::Request<StartWorkflowExecutionRequest>,
1871+
) -> BoxFuture<'_, Result<tonic::Response<StartWorkflowExecutionResponse>, tonic::Status>>
1872+
{
1873+
let req = request.into_inner();
1874+
let expected_mode = self.expected_mode;
1875+
1876+
assert!(
1877+
req.eager_worker_deployment_options.is_some(),
1878+
"eager_worker_deployment_options should be populated"
1879+
);
1880+
1881+
let opts = req.eager_worker_deployment_options.as_ref().unwrap();
1882+
assert_eq!(opts.deployment_name, "test-deployment");
1883+
assert_eq!(opts.build_id, "test-build-123");
1884+
assert_eq!(opts.worker_versioning_mode, expected_mode as i32);
1885+
1886+
async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed()
1887+
}
1888+
}
1889+
1890+
let mut mfs = MyFakeServices {
1891+
client_worker_set,
1892+
expected_mode,
1893+
};
1894+
1895+
// Create a request with eager execution enabled
1896+
let req = StartWorkflowExecutionRequest {
1897+
namespace: "test-namespace".to_string(),
1898+
workflow_id: "test-wf-id".to_string(),
1899+
workflow_type: Some(
1900+
temporal_sdk_core_protos::temporal::api::common::v1::WorkflowType {
1901+
name: "test-workflow".to_string(),
1902+
},
1903+
),
1904+
task_queue: Some(TaskQueue {
1905+
name: "test-task-queue".to_string(),
1906+
kind: 0,
1907+
normal_name: String::new(),
1908+
}),
1909+
request_eager_execution: true,
1910+
..Default::default()
1911+
};
1912+
1913+
mfs.start_workflow_execution(req.into_request())
1914+
.await
1915+
.unwrap();
1916+
}
17631917
}

client/src/worker_registry/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ pub trait Slot {
2323
) -> Result<(), anyhow::Error>;
2424
}
2525

26+
/// Result of reserving a workflow task slot, including deployment options if applicable.
27+
pub(crate) struct SlotReservation {
28+
/// The reserved slot for processing the workflow task
29+
pub slot: Box<dyn Slot + Send>,
30+
/// Worker deployment options, if the worker is using deployment-based versioning
31+
pub deployment_options: Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>,
32+
}
33+
2634
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
2735
struct SlotKey {
2836
namespace: String,
@@ -62,13 +70,17 @@ impl ClientWorkerSetImpl {
6270
&self,
6371
namespace: String,
6472
task_queue: String,
65-
) -> Option<Box<dyn Slot + Send>> {
73+
) -> Option<SlotReservation> {
6674
let key = SlotKey::new(namespace, task_queue);
6775
if let Some(p) = self.slot_providers.get(&key)
6876
&& let Some(worker) = self.all_workers.get(p)
6977
&& let Some(slot) = worker.try_reserve_wft_slot()
7078
{
71-
return Some(slot);
79+
let deployment_options = worker.deployment_options();
80+
return Some(SlotReservation {
81+
slot,
82+
deployment_options,
83+
});
7284
}
7385
None
7486
}
@@ -197,11 +209,12 @@ impl ClientWorkerSet {
197209
}
198210

199211
/// Try to reserve a compatible processing slot in any of the registered workers.
212+
/// Returns the slot and the worker's deployment options (if using deployment-based versioning).
200213
pub(crate) fn try_reserve_wft_slot(
201214
&self,
202215
namespace: String,
203216
task_queue: String,
204-
) -> Option<Box<dyn Slot + Send>> {
217+
) -> Option<SlotReservation> {
205218
self.worker_manager
206219
.read()
207220
.try_reserve_wft_slot(namespace, task_queue)
@@ -273,6 +286,9 @@ pub trait ClientWorker: Send + Sync {
273286
/// to process exactly one workflow task.
274287
fn try_reserve_wft_slot(&self) -> Option<Box<dyn Slot + Send>>;
275288

289+
/// Get the worker deployment options for this worker, if using deployment-based versioning.
290+
fn deployment_options(&self) -> Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>;
291+
276292
/// Unique identifier for this worker instance.
277293
/// This must be stable across the worker's lifetime and unique per instance.
278294
fn worker_instance_key(&self) -> Uuid;
@@ -324,6 +340,7 @@ mod tests {
324340
});
325341
mock_provider.expect_namespace().return_const(namespace);
326342
mock_provider.expect_task_queue().return_const(task_queue);
343+
mock_provider.expect_deployment_options().return_const(None);
327344
mock_provider
328345
.expect_heartbeat_enabled()
329346
.return_const(heartbeat_enabled);

0 commit comments

Comments
 (0)