Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -18,7 +18,7 @@ final class DiningPhilosophers {
private var forks: [Fork] = []
private var philosophers: [Philosopher] = []

func run(for time: TimeAmount) async throws {
func run(for duration: Duration) async throws {
let system = await ClusterSystem("Philosophers")

// prepare 5 forks, the resources, that the philosophers will compete for:
Expand All @@ -38,6 +38,6 @@ final class DiningPhilosophers {
Philosopher(name: "Erik", leftFork: fork4, rightFork: fork5, actorSystem: system),
]

_Thread.sleep(time)
_Thread.sleep(duration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -18,7 +18,7 @@ final class DistributedDiningPhilosophers {
private var forks: [Fork] = []
private var philosophers: [Philosopher] = []

func run(for time: TimeAmount) async throws {
func run(for duration: Duration) async throws {
let systemA = await ClusterSystem("Node-A") { settings in
settings.bindPort = 1111
}
Expand Down Expand Up @@ -74,6 +74,6 @@ final class DistributedDiningPhilosophers {
Philosopher(name: "Erik", leftFork: fork4, rightFork: fork5, actorSystem: systemC),
]

try systemA.park(atMost: time)
try systemA.park(atMost: duration)
}
}
8 changes: 4 additions & 4 deletions Samples/Sources/SampleDiningPhilosophers/boot.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -40,13 +40,13 @@ typealias DefaultDistributedActorSystem = ClusterSystem

LoggingSystem.bootstrap(SamplePrettyLogHandler.init)

let time = TimeAmount.seconds(20)
let duration = Duration.seconds(20)

switch CommandLine.arguments.dropFirst().first {
case "dist", "distributed":
try! await DistributedDiningPhilosophers().run(for: time)
try! await DistributedDiningPhilosophers().run(for: duration)
default:
try! await DiningPhilosophers().run(for: time)
try! await DiningPhilosophers().run(for: duration)
}
}
}
8 changes: 4 additions & 4 deletions Sources/DistributedActors/ActorContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public class _ActorContext<Message: Codable> /* TODO(sendable): NOTSendable*/ {
/// - continuation: continuation to run after `_AsyncResult` completes. It is safe to access
/// and modify actor state from here.
/// - Returns: a behavior that causes the actor to suspend until the `_AsyncResult` completes
internal func awaitResult<AR: _AsyncResult>(of _AsyncResult: AR, timeout: TimeAmount, _ continuation: @escaping (Result<AR.Value, Error>) throws -> _Behavior<Message>) -> _Behavior<Message> {
internal func awaitResult<AR: _AsyncResult>(of _AsyncResult: AR, timeout: Duration, _ continuation: @escaping (Result<AR.Value, Error>) throws -> _Behavior<Message>) -> _Behavior<Message> {
_AsyncResult.withTimeout(after: timeout)._onComplete { [weak selfRef = self.myself._unsafeUnwrapCell] result in
selfRef?.sendSystemMessage(.resume(result.map { $0 }))
}
Expand All @@ -257,7 +257,7 @@ public class _ActorContext<Message: Codable> /* TODO(sendable): NOTSendable*/ {
/// - Returns: a behavior that causes the actor to suspend until the `_AsyncResult` completes
internal func awaitResultThrowing<AR: _AsyncResult>(
of _AsyncResult: AR,
timeout: TimeAmount,
timeout: Duration,
_ continuation: @escaping (AR.Value) throws -> _Behavior<Message>
) -> _Behavior<Message> {
self.awaitResult(of: _AsyncResult, timeout: timeout) { result in
Expand All @@ -280,7 +280,7 @@ public class _ActorContext<Message: Codable> /* TODO(sendable): NOTSendable*/ {
/// - timeout: time after which the _AsyncResult will be failed if it does not complete
/// - continuation: continuation to run after `_AsyncResult` completes.
/// It is safe to access and modify actor state from here.
internal func onResultAsync<AR: _AsyncResult>(of _AsyncResult: AR, timeout: TimeAmount, file: String = #file, line: UInt = #line, _ continuation: @escaping (Result<AR.Value, Error>) throws -> _Behavior<Message>) {
internal func onResultAsync<AR: _AsyncResult>(of _AsyncResult: AR, timeout: Duration, file: String = #file, line: UInt = #line, _ continuation: @escaping (Result<AR.Value, Error>) throws -> _Behavior<Message>) {
let asyncCallback = self.makeAsynchronousCallback(for: Result<AR.Value, Error>.self, file: file, line: line) {
let nextBehavior = try continuation($0)
let shell = self._downcastUnsafe
Expand All @@ -307,7 +307,7 @@ public class _ActorContext<Message: Codable> /* TODO(sendable): NOTSendable*/ {
/// - timeout: time after which the _AsyncResult will be failed if it does not complete
/// - continuation: continuation to run after `_AsyncResult` completes. It is safe to access
/// and modify actor state from here.
internal func onResultAsyncThrowing<AR: _AsyncResult>(of _AsyncResult: AR, timeout: TimeAmount, _ continuation: @escaping (AR.Value) throws -> _Behavior<Message>) {
internal func onResultAsyncThrowing<AR: _AsyncResult>(of _AsyncResult: AR, timeout: Duration, _ continuation: @escaping (AR.Value) throws -> _Behavior<Message>) {
self.onResultAsync(of: _AsyncResult, timeout: timeout) { res in
switch res {
case .success(let value): return try continuation(value)
Expand Down
8 changes: 4 additions & 4 deletions Sources/DistributedActors/ActorRef+Ask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protocol ReceivesQuestions: Codable {
/// It may be executed concurrently with regards to the current context.
func ask<Answer>(
for type: Answer.Type,
timeout: TimeAmount,
timeout: Duration,
file: String, function: String, line: UInt,
_ makeQuestion: @escaping (_ActorRef<Answer>) -> Question
) -> AskResponse<Answer>
Expand All @@ -66,7 +66,7 @@ extension _ActorRef: ReceivesQuestions {

func ask<Answer>(
for answerType: Answer.Type = Answer.self,
timeout: TimeAmount,
timeout: Duration,
file: String = #file, function: String = #function, line: UInt = #line,
_ makeQuestion: @escaping (_ActorRef<Answer>) -> Question
) -> AskResponse<Answer> {
Expand Down Expand Up @@ -170,7 +170,7 @@ extension AskResponse: _AsyncResult {
}
}

func withTimeout(after timeout: TimeAmount) -> AskResponse<Value> {
func withTimeout(after timeout: Duration) -> AskResponse<Value> {
if timeout.isEffectivelyInfinite {
return self
}
Expand Down Expand Up @@ -245,7 +245,7 @@ internal enum AskActor {
static func behavior<Message, ResponseType>(
_ completable: EventLoopPromise<ResponseType>,
ref: _ActorRef<Message>,
timeout: TimeAmount,
timeout: Duration,
file: String,
function: String,
line: UInt
Expand Down
10 changes: 5 additions & 5 deletions Sources/DistributedActors/AsyncResult.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -44,15 +44,15 @@ internal protocol _AsyncResult {
/// value for the returned `_AsyncResult`.
///
/// - parameter after: defines a timeout after which the result should be considered failed.
func withTimeout(after timeout: TimeAmount) -> Self
func withTimeout(after timeout: Duration) -> Self
}

extension EventLoopFuture: _AsyncResult {
func _onComplete(_ callback: @escaping (Result<Value, Error>) -> Void) {
self.whenComplete(callback)
}

func withTimeout(after timeout: TimeAmount) -> EventLoopFuture<Value> {
func withTimeout(after timeout: Duration) -> EventLoopFuture<Value> {
if timeout == .effectivelyInfinite {
return self
}
Expand All @@ -73,9 +73,9 @@ extension EventLoopFuture: _AsyncResult {
/// Error that signals that an operation timed out.
internal struct TimeoutError: Error {
let message: String
let timeout: TimeAmount
let timeout: Duration

init(message: String, timeout: TimeAmount) {
init(message: String, timeout: Duration) {
self.message = message
self.timeout = timeout
}
Expand Down
40 changes: 20 additions & 20 deletions Sources/DistributedActors/Backoff.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -22,7 +22,7 @@
/// See also: `ConstantBackoffStrategy`, `ExponentialBackoffStrategy`
public protocol BackoffStrategy {
/// Returns next backoff interval to use OR `nil` if no further retries should be performed.
mutating func next() -> TimeAmount?
mutating func next() -> Duration?

/// Reset the strategy to its initial backoff amount.
mutating func reset()
Expand All @@ -46,8 +46,8 @@ public enum Backoff {
/// Backoff each time using the same, constant, time amount.
///
/// See `ConstantBackoffStrategy` for details
public static func constant(_ backoff: TimeAmount) -> ConstantBackoffStrategy {
.init(timeAmount: backoff)
public static func constant(_ backoff: Duration) -> ConstantBackoffStrategy {
.init(duration: backoff)
}

/// Creates a strategy implementing the exponential backoff pattern.
Expand All @@ -68,9 +68,9 @@ public enum Backoff {
/// - maxAttempts: An optional maximum number of times backoffs shall be attempted.
/// MUST be `> 0` if set (or `nil`).
public static func exponential(
initialInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.initialInterval,
initialInterval: Duration = ExponentialBackoffStrategy.Defaults.initialInterval,
multiplier: Double = ExponentialBackoffStrategy.Defaults.multiplier,
capInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.capInterval,
capInterval: Duration = ExponentialBackoffStrategy.Defaults.capInterval,
randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor,
maxAttempts: Int? = ExponentialBackoffStrategy.Defaults.maxAttempts
) -> ExponentialBackoffStrategy {
Expand All @@ -94,14 +94,14 @@ public enum Backoff {
/// - SeeAlso: Also used to configure `_SupervisionStrategy`.
public struct ConstantBackoffStrategy: BackoffStrategy {
/// The constant time amount to back-off by each time.
internal let timeAmount: TimeAmount
internal let duration: Duration

public init(timeAmount: TimeAmount) {
self.timeAmount = timeAmount
public init(duration: Duration) {
self.duration = duration
}

public func next() -> TimeAmount? {
self.timeAmount
public func next() -> Duration? {
self.duration
}

public func reset() {
Expand Down Expand Up @@ -150,28 +150,28 @@ public struct ExponentialBackoffStrategy: BackoffStrategy {

/// Default values for the backoff parameters.
public enum Defaults {
public static let initialInterval: TimeAmount = .milliseconds(200)
public static let initialInterval: Duration = .milliseconds(200)
public static let multiplier: Double = 1.5
public static let capInterval: TimeAmount = .effectivelyInfinite
public static let capInterval: Duration = .effectivelyInfinite
public static let randomFactor: Double = 0.25

// TODO: We could also implement taking a Clock, and using it see if there's a total limit exceeded
// public static let maxElapsedTime: TimeAmount = .minutes(30)
// public static let maxElapsedTime: Duration = .minutes(30)

public static let maxAttempts: Int? = nil
}

let initialInterval: TimeAmount
let initialInterval: Duration
let multiplier: Double
let capInterval: TimeAmount
let capInterval: Duration
let randomFactor: Double

var limitedRemainingAttempts: Int?

// interval that will be used in the `next()` call, does NOT include the random noise component
private var currentBaseInterval: TimeAmount
private var currentBaseInterval: Duration

internal init(initialInterval: TimeAmount, multiplier: Double, capInterval: TimeAmount, randomFactor: Double, maxAttempts: Int?) {
internal init(initialInterval: Duration, multiplier: Double, capInterval: Duration, randomFactor: Double, maxAttempts: Int?) {
precondition(initialInterval.nanoseconds > 0, "initialInterval MUST be > 0ns, was: [\(initialInterval.prettyDescription)]")
precondition(multiplier >= 1.0, "multiplier MUST be >= 1.0, was: [\(multiplier)]")
precondition(initialInterval <= capInterval, "capInterval MUST be >= initialInterval, was: [\(capInterval)]")
Expand All @@ -188,7 +188,7 @@ public struct ExponentialBackoffStrategy: BackoffStrategy {
self.limitedRemainingAttempts = maxAttempts
}

public mutating func next() -> TimeAmount? {
public mutating func next() -> Duration? {
defer { self.limitedRemainingAttempts? -= 1 }
if let remainingAttempts = self.limitedRemainingAttempts, remainingAttempts <= 0 {
return nil
Expand Down Expand Up @@ -226,5 +226,5 @@ public struct ExponentialBackoffStrategy: BackoffStrategy {
// MARK: Errors

enum BackoffError {
case exceededNumberOfAttempts(limit: Int, period: TimeAmount)
case exceededNumberOfAttempts(limit: Int, period: Duration)
}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/Association.swift
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ extension Association {

/// Used to create "any" tombstone, for being able to lookup in Set<TombstoneSet>
init(_ node: UniqueNode) {
self.removalDeadline = Deadline.uptimeNanoseconds(1) // ANY value here is ok, we do not use it in hash/equals
self.removalDeadline = Deadline.now() // ANY value here is ok, we do not use it in hash/equals
self.remoteNode = node
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public enum OnDownActionStrategySettings {
/// members MUST refuse communication with this down node.
case none
/// Upon noticing that this member is marked as [.down], initiate a shutdown.
case gracefulShutdown(delay: TimeAmount)
case gracefulShutdown(delay: Duration)

func make() -> (ClusterSystem) throws -> Void {
switch self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public struct DowningStrategyDirective {
internal enum Repr {
case none
case markAsDown(Set<Cluster.Member>)
case startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount)
case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration)
case cancelTimer(key: TimerKey)
}

Expand All @@ -47,7 +47,7 @@ public struct DowningStrategyDirective {
.init(.none)
}

public static func startTimer(key: TimerKey, member: Cluster.Member, delay: TimeAmount) -> Self {
public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self {
.init(.startTimer(key: key, member: member, delay: delay))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -141,7 +141,7 @@ public struct TimeoutBasedDowningStrategySettings {
///
/// Generally with a distributed failure detector such delay may not be necessary, however it is available in case
/// you want to allow noticing "tings are bad, but don't act on it" environments.
public var downUnreachableMembersAfter: TimeAmount = .seconds(1)
public var downUnreachableMembersAfter: Duration = .seconds(1)

public static var `default`: TimeoutBasedDowningStrategySettings {
.init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ internal struct HandshakeStateMachine {
internal enum RetryDirective {
/// Retry sending the returned handshake offer after the given `delay`
/// Returned in reaction to timeouts or other recoverable failures during handshake negotiation.
case scheduleRetryHandshake(delay: TimeAmount)
case scheduleRetryHandshake(delay: Duration)

/// Give up shaking hands with the remote peer.
/// Any state the handshake was keeping on the initiating node should be cleared in response to this directive.
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/Leadership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public struct LeaderElectionResult: _AsyncResult {
self.future.whenComplete(callback)
}

public func withTimeout(after timeout: TimeAmount) -> LeaderElectionResult {
public func withTimeout(after timeout: Duration) -> LeaderElectionResult {
LeaderElectionResult(self.future.withTimeout(after: timeout))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ extension OpLogDistributedReceptionist {
// We DO want to send the Ack directly here as potentially the peer still has some more
// ops it might want to send, so we want to allow it to get those over to us as quickly as possible,
// without waiting for our Ack ticks to trigger (which could be configured pretty slow).
let nextPeriodicAckAllowedIn: TimeAmount = actorSystem.settings.receptionist.ackPullReplicationIntervalSlow * 2
let nextPeriodicAckAllowedIn: Duration = actorSystem.settings.receptionist.ackPullReplicationIntervalSlow * 2
self.nextPeriodicAckPermittedDeadline[peer.id] = Deadline.fromNow(nextPeriodicAckAllowedIn) // TODO: system.timeSource
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ extension _OperationLogClusterReceptionist {
// We DO want to send the Ack directly here as potentially the peer still has some more
// ops it might want to send, so we want to allow it to get those over to us as quickly as possible,
// without waiting for our Ack ticks to trigger (which could be configured pretty slow).
let nextPeriodicAckAllowedIn: TimeAmount = context.system.settings.receptionist.ackPullReplicationIntervalSlow * 2
let nextPeriodicAckAllowedIn: Duration = context.system.settings.receptionist.ackPullReplicationIntervalSlow * 2
self.nextPeriodicAckPermittedDeadline[peer] = Deadline.fromNow(nextPeriodicAckAllowedIn) // TODO: context.system.timeSource
}

Expand Down
Loading