Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[env]
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"

[alias]
# Not sure why --all-features doesn't work
Expand Down
20 changes: 16 additions & 4 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub use temporal_sdk_core_protos::temporal::api::{
},
};
pub use tonic;
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey};
pub use worker_registry::{
ClientWorker, ClientWorkerSet, HeartbeatCallback, SharedNamespaceWorkerTrait, Slot,
};
pub use workflow_handle::{
GetWorkflowResultOpts, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle,
};
Expand Down Expand Up @@ -390,7 +392,7 @@ pub struct ConfiguredClient<C> {
headers: Arc<RwLock<ClientHeaders>>,
/// Capabilities as read from the `get_system_info` RPC call made on client connection
capabilities: Option<get_system_info_response::Capabilities>,
workers: Arc<SlotManager>,
workers: Arc<ClientWorkerSet>,
}

impl<C> ConfiguredClient<C> {
Expand Down Expand Up @@ -440,9 +442,14 @@ impl<C> ConfiguredClient<C> {
}

/// Returns a cloned reference to a registry with workers using this client instance
pub fn workers(&self) -> Arc<SlotManager> {
pub fn workers(&self) -> Arc<ClientWorkerSet> {
self.workers.clone()
}

/// Returns the worker grouping key, this should be unique across each client
pub fn worker_grouping_key(&self) -> Uuid {
self.workers.worker_grouping_key()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -584,7 +591,7 @@ impl ClientOptions {
client: TemporalServiceClient::new(svc),
options: Arc::new(self.clone()),
capabilities: None,
workers: Arc::new(SlotManager::new()),
workers: Arc::new(ClientWorkerSet::new()),
};
if !self.skip_get_system_info {
match client
Expand Down Expand Up @@ -866,6 +873,11 @@ impl Client {
pub fn into_inner(self) -> ConfiguredClient<TemporalServiceClient> {
self.inner
}

/// Returns the client-wide key
pub fn worker_grouping_key(&self) -> Uuid {
self.inner.worker_grouping_key()
}
}

impl NamespacedClient for Client {
Expand Down
40 changes: 24 additions & 16 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
Client, ConfiguredClient, LONG_POLL_TIMEOUT, RequestExt, RetryClient, SharedReplaceableClient,
TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
metrics::namespace_kv,
worker_registry::{Slot, SlotManager},
worker_registry::{ClientWorkerSet, Slot},
};
use dyn_clone::DynClone;
use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
Expand All @@ -33,7 +33,7 @@ use tonic::{
trait RawClientProducer {
/// Returns information about workers associated with this client. Implementers outside of
/// core can safely return `None`.
fn get_workers_info(&self) -> Option<Arc<SlotManager>>;
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;

/// Return a workflow service client instance
fn workflow_client(&mut self) -> Box<dyn WorkflowService>;
Expand Down Expand Up @@ -175,7 +175,7 @@ impl<RC> RawClientProducer for RetryClient<RC>
where
RC: RawClientProducer + 'static,
{
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
self.get_client().get_workers_info()
}

Expand Down Expand Up @@ -253,7 +253,7 @@ impl<RC> RawClientProducer for SharedReplaceableClient<RC>
where
RC: RawClientProducer + Clone + Send + Sync + 'static,
{
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
self.inner_cow().get_workers_info()
}
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<RC> RawGrpcCaller for SharedReplaceableClient<RC> where
}

impl RawClientProducer for TemporalServiceClient {
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
None
}

Expand Down Expand Up @@ -312,7 +312,7 @@ impl RawClientProducer for TemporalServiceClient {
impl RawGrpcCaller for TemporalServiceClient {}

impl RawClientProducer for ConfiguredClient<TemporalServiceClient> {
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
Some(self.workers())
}

Expand Down Expand Up @@ -340,7 +340,7 @@ impl RawClientProducer for ConfiguredClient<TemporalServiceClient> {
impl RawGrpcCaller for ConfiguredClient<TemporalServiceClient> {}

impl RawClientProducer for Client {
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
self.inner.get_workers_info()
}

Expand Down Expand Up @@ -491,7 +491,7 @@ macro_rules! proxy_impl {
mut request: tonic::Request<$req>,
) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
type_closure_arg(&mut request, $closure_request);
let data = type_closure_two_arg(&mut request, Option::<Arc<SlotManager>>::None,
let data = type_closure_two_arg(&mut request, Option::<Arc<ClientWorkerSet>>::None,
$closure_before);
async move {
type_closure_two_arg(<$client_type<_>>::$method(self, request).await,
Expand Down Expand Up @@ -1601,6 +1601,7 @@ mod tests {
operatorservice::v1::DeleteNamespaceRequest, workflowservice::v1::ListNamespacesRequest,
};
use tonic::IntoRequest;
use uuid::Uuid;

// Just to help make sure some stuff compiles. Not run.
#[allow(dead_code)]
Expand Down Expand Up @@ -1791,7 +1792,7 @@ mod tests {
#[case::without_versioning(false)]
#[tokio::test]
async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
use crate::worker_registry::{MockSlot, MockSlotProvider};
use crate::worker_registry::{MockClientWorker, MockSlot};
use temporal_sdk_core_api::worker::{WorkerDeploymentOptions, WorkerDeploymentVersion};
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode;

Expand All @@ -1803,13 +1804,13 @@ mod tests {

#[derive(Clone)]
struct MyFakeServices {
slot_manager: Arc<SlotManager>,
client_worker_set: Arc<ClientWorkerSet>,
expected_mode: WorkerVersioningMode,
}
impl RawGrpcCaller for MyFakeServices {}
impl RawClientProducer for MyFakeServices {
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
Some(self.slot_manager.clone())
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
Some(self.client_worker_set.clone())
}
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
Box::new(MyFakeWfClient {
Expand Down Expand Up @@ -1839,7 +1840,7 @@ mod tests {
default_versioning_behavior: None,
};

let mut mock_provider = MockSlotProvider::new();
let mut mock_provider = MockClientWorker::new();
mock_provider
.expect_namespace()
.return_const("test-namespace".to_string());
Expand All @@ -1854,9 +1855,16 @@ mod tests {
mock_provider
.expect_deployment_options()
.return_const(Some(deployment_opts.clone()));
mock_provider.expect_heartbeat_enabled().return_const(false);
let uuid = Uuid::new_v4();
mock_provider
.expect_worker_instance_key()
.return_const(uuid);

let slot_manager = Arc::new(SlotManager::new());
slot_manager.register(Box::new(mock_provider));
let client_worker_set = Arc::new(ClientWorkerSet::new());
client_worker_set
.register_worker(Arc::new(mock_provider), true)
.unwrap();

#[derive(Clone)]
struct MyFakeWfClient {
Expand Down Expand Up @@ -1886,7 +1894,7 @@ mod tests {
}

let mut mfs = MyFakeServices {
slot_manager,
client_worker_set,
expected_mode,
};

Expand Down
Loading