1515#if canImport(Testing)
1616import GRPCNIOTransportHTTP2Posix
1717import GRPCCore
18+ import GRPCServiceLifecycle
1819import NIOPosix
20+ import SwiftProtobuf
1921public import Logging
2022public import Temporal
2123public import Testing
@@ -108,7 +110,20 @@ public struct TemporalTestServer: Sendable {
108110 @TaskLocal
109111 public static var timeSkippingTestServer : TemporalTestServer ? = nil
110112
113+ /// Whether this server supports time skipping.
114+ public var supportsTimeSkipping : Bool {
115+ self . testServiceClient != nil
116+ }
117+
111118 private let serverTarget : String
119+ private let testServiceClient : Api . Testservice . V1 . TestService . Client < HTTP2ClientTransport . Posix > ?
120+ @TaskLocal
121+ private static var autoTimeSkippingDisabled : Bool = false
122+
123+ /// Whether automatic time skipping is currently enabled.
124+ var isAutoTimeSkippingEnabled : Bool {
125+ self . supportsTimeSkipping && !Self. autoTimeSkippingDisabled
126+ }
112127
113128 // tune the dev server for high throughput during parallel testing, otherwise running into "resource exhausted" errors
114129 private static let devServerOptions : BridgeTestServer . DevServerOptions = {
@@ -159,7 +174,8 @@ public struct TemporalTestServer: Sendable {
159174 devServerOptions: Self . devServerOptions,
160175 ) { bridgeTestServer, target in
161176 let testServer = TemporalTestServer (
162- serverTarget: target
177+ serverTarget: target,
178+ testServiceClient: nil
163179 )
164180 return try await body ( testServer)
165181 }
@@ -205,10 +221,38 @@ public struct TemporalTestServer: Sendable {
205221 _ body: ( borrowing TemporalTestServer ) async throws -> Void
206222 ) async throws {
207223 try await BridgeTestServer . withBridgeTestServer { bridgeTestServer, target in
208- let testServer = TemporalTestServer (
209- serverTarget: target
224+ let parts = target. split ( separator: " : " , maxSplits: 1 , omittingEmptySubsequences: false )
225+ guard let host = parts. first. map ( String . init) ,
226+ parts. count > 1 ,
227+ let port = Int ( parts [ 1 ] )
228+ else {
229+ fatalError ( " Invalid host and port received from test server. " )
230+ }
231+
232+ let grpcClient = GRPCClient (
233+ transport: try . http2NIOPosix(
234+ target: . dns( host: host, port: port) ,
235+ transportSecurity: . plaintext,
236+ config: . defaults,
237+ resolverRegistry: . defaults,
238+ serviceConfig: . init( ) ,
239+ eventLoopGroup: . singletonMultiThreadedEventLoopGroup
240+ )
210241 )
211- return try await body ( testServer)
242+
243+ return try await withThrowingTaskGroup ( of: Void . self) { group in
244+ group. addTask {
245+ try await grpcClient. run ( )
246+ }
247+
248+ let testServer = TemporalTestServer (
249+ serverTarget: target,
250+ testServiceClient: . init( wrapping: grpcClient)
251+ )
252+ try await body ( testServer)
253+
254+ grpcClient. beginGracefulShutdown ( )
255+ }
212256 }
213257 }
214258
@@ -273,6 +317,14 @@ public struct TemporalTestServer: Sendable {
273317 ) async throws -> Result {
274318 let ( host, port) = self . hostAndPort ( )
275319
320+ // Auto-inject the time-skipping interceptor when connected to a
321+ // time-skipping server. It is placed first so it wraps all other
322+ // interceptors, matching the behavior of the C# and Ruby SDKs.
323+ var allInterceptors : [ any Temporal . ClientInterceptor ] = interceptors
324+ if self . supportsTimeSkipping {
325+ allInterceptors. insert ( TimeSkippingClientInterceptor ( testServer: self ) , at: 0 )
326+ }
327+
276328 return try await TemporalClient . connect (
277329 transport: . http2NIOPosix(
278330 target: . dns( host: host, port: port) ,
@@ -284,7 +336,7 @@ public struct TemporalTestServer: Sendable {
284336 ) ,
285337 configuration: . init(
286338 instrumentation: . init( serverHostname: host) ,
287- interceptors: interceptors
339+ interceptors: allInterceptors
288340 ) ,
289341 logger: logger,
290342 ) { client in
@@ -424,6 +476,98 @@ public struct TemporalTestServer: Sendable {
424476 }
425477 }
426478
479+ // MARK: - Time Control
480+
481+ /// Advances the test server time by the specified duration.
482+ ///
483+ /// When connected to a time-skipping test server, this method instructs the server to
484+ /// fast-forward its internal clock by the given duration. This causes any pending timers
485+ /// or scheduled events within that time window to fire immediately.
486+ ///
487+ /// - Parameter duration: The amount of time to advance the server clock.
488+ /// - Throws: An error if the server does not support time skipping or the RPC fails.
489+ public func sleep( _ duration: Duration ) async throws {
490+ guard let testServiceClient else {
491+ preconditionFailure ( " sleep(_:) is only supported on time-skipping test servers " )
492+ }
493+ _ = try await testServiceClient. unlockTimeSkippingWithSleep (
494+ . with {
495+ $0. duration = . with {
496+ $0. seconds = duration. components. seconds
497+ $0. nanos = Int32 ( duration. components. attoseconds / 1_000_000_000 )
498+ }
499+ }
500+ )
501+ }
502+
503+ /// Returns the current time as known to the test server.
504+ ///
505+ /// When connected to a time-skipping test server, this returns the server's internal
506+ /// time, which may differ from wall-clock time due to time skipping.
507+ ///
508+ /// - Returns: The current server time.
509+ /// - Throws: An error if the server does not support time skipping or the RPC fails.
510+ public func currentTime( ) async throws -> Date {
511+ guard let testServiceClient else {
512+ preconditionFailure ( " currentTime() is only supported on time-skipping test servers " )
513+ }
514+ return try await testServiceClient. getCurrentTime ( . init( ) ) . time. date
515+ }
516+
517+ /// Executes a closure with automatic time skipping temporarily disabled.
518+ ///
519+ /// While automatic time skipping is disabled, calls to `handle.result`
520+ /// will not unlock the time-skipping server's clock. Time will only advance through
521+ /// explicit calls to ``sleep(_:)`` or real-time passage.
522+ ///
523+ /// This is useful when you need to observe intermediate workflow states without
524+ /// time being advanced automatically.
525+ ///
526+ /// - Parameter body: The closure to execute with auto-time-skipping disabled.
527+ /// - Returns: The result of the closure.
528+ /// - Throws: Any error thrown by the closure.
529+ public func withAutoTimeSkippingDisabled< Result: Sendable > (
530+ _ body: ( ) async throws -> Result
531+ ) async rethrows -> Result {
532+ try await Self . $autoTimeSkippingDisabled. withValue ( true ) {
533+ try await body ( )
534+ }
535+ }
536+
537+ /// Unlocks time skipping for the duration of the given closure, then re-locks it.
538+ ///
539+ /// This is used internally by ``TimeSkippingClientInterceptor`` to allow time to
540+ /// advance while waiting for workflow results.
541+ func withTimeSkippingUnlocked< Result: Sendable > (
542+ _ body: ( ) async throws -> Result
543+ ) async throws -> Result {
544+ guard let testServiceClient else {
545+ return try await body ( )
546+ }
547+
548+ _ = try await testServiceClient. unlockTimeSkipping ( . init( ) )
549+
550+ var userCodeSucceeded = false
551+ do {
552+ let result = try await body ( )
553+ userCodeSucceeded = true
554+ _ = try await testServiceClient. lockTimeSkipping ( . init( ) )
555+ return result
556+ } catch {
557+ // Re-lock time skipping. If user code failed, swallow lock errors
558+ // to preserve the original error.
559+ do {
560+ _ = try await testServiceClient. lockTimeSkipping ( . init( ) )
561+ } catch {
562+ if userCodeSucceeded {
563+ throw error
564+ }
565+ // Swallow lock error when user code already failed
566+ }
567+ throw error
568+ }
569+ }
570+
427571 /// Provides the host and port information from the server target address.
428572 ///
429573 /// - Returns: A tuple containing the host string and port number for the test server.
0 commit comments