Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var targets: [PackageDescription.Target] = [
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
.product(name: "Backtrace", package: "swift-backtrace"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
]
),

Expand Down Expand Up @@ -179,26 +180,27 @@ var targets: [PackageDescription.Target] = [
var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"),

.package(url: "https://github.com/apple/swift-cluster-membership.git", from: "0.3.0"),
.package(url: "https://github.com/apple/swift-cluster-membership", from: "0.3.0"),

.package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.16.1"),
.package(url: "https://github.com/apple/swift-nio", from: "2.40.0"),
.package(url: "https://github.com/apple/swift-nio-extras", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.16.1"),

.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.7.0"),
.package(url: "https://github.com/apple/swift-protobuf", from: "1.7.0"),

// ~~~ backtraces ~~~
// TODO: optimally, library should not pull swift-backtrace
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),
.package(url: "https://github.com/swift-server/swift-backtrace", from: "1.1.1"),

// ~~~ Swift Collections ~~~
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.1"),
// ~~~ Swift libraries ~~~
.package(url: "https://github.com/apple/swift-async-algorithms", from: "0.0.3"),
.package(url: "https://github.com/apple/swift-collections", from: "1.0.1"),

// ~~~ Observability ~~~
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-service-discovery.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"),

// ~~~ SwiftPM Plugins ~~~
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
Expand Down
45 changes: 28 additions & 17 deletions Samples/Sources/SampleDiningPhilosophers/Philosopher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import Distributed
import DistributedActors
import Logging
Expand All @@ -24,7 +25,8 @@ distributed actor Philosopher: CustomStringConvertible {
private let rightFork: Fork
private var state: State = .thinking

private lazy var timers = DistributedActors.ActorTimers<Philosopher>(self)
private var becomeHungryTimerTask: Task<Void, Error>?
private var finishEatingTimerTask: Task<Void, Error>?

init(name: String, leftFork: Fork, rightFork: Fork, actorSystem: ActorSystem) {
self.actorSystem = actorSystem
Expand Down Expand Up @@ -58,15 +60,19 @@ distributed actor Philosopher: CustomStringConvertible {
}

self.state = .thinking
self.timers.startSingle(key: .becomeHungry, delay: .seconds(1)) {
await self.attemptToTakeForks()
self.becomeHungryTimerTask = Task {
for await _ in AsyncTimerSequence(interval: .seconds(1), clock: ContinuousClock()) {
await self.attemptToTakeForks()
self.becomeHungryTimerTask?.cancel()
break
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okey those are just a "single" timers, so we can do these a bit simpler:

Task { 
  try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we also don't need the cancel then -- just run it once :)

Copy link
Member Author

@yim-lee yim-lee Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amended ef06ce6

}
self.log.info("\(self.self.name) is thinking...")
self.log.info("\(self.name) is thinking...")
}

distributed func attemptToTakeForks() async {
guard self.state == .thinking else {
self.log.error("\(self.self.name) tried to take a fork but was not in the thinking state!")
self.log.error("\(self.name) tried to take a fork but was not in the thinking state!")
return
}

Expand All @@ -89,14 +95,14 @@ distributed actor Philosopher: CustomStringConvertible {
}
self.forkTaken(self.rightFork)
} catch {
self.log.info("\(self.self.name) wasn't able to take both forks!")
self.log.info("\(self.name) wasn't able to take both forks!")
self.think()
}
}

/// Message sent to oneself after a timer exceeds and we're done `eating` and can become `thinking` again.
distributed func stopEating() {
self.log.info("\(self.self.name) is done eating and replaced both forks!")
self.log.info("\(self.name) is done eating and replaced both forks!")
Task {
do {
try await self.leftFork.putBack()
Expand Down Expand Up @@ -128,10 +134,10 @@ distributed actor Philosopher: CustomStringConvertible {

switch fork {
case self.leftFork:
self.log.info("\(self.self.name) received their left fork!")
self.log.info("\(self.name) received their left fork!")
self.state = .takingForks(leftTaken: true, rightTaken: rightForkIsTaken)
case self.rightFork:
self.log.info("\(self.self.name) received their right fork!")
self.log.info("\(self.name) received their right fork!")
self.state = .takingForks(leftTaken: leftForkIsTaken, rightTaken: true)
default:
self.log.error("Received unknown fork! Got: \(fork). Known forks: \(self.leftFork), \(self.rightFork)")
Expand All @@ -144,16 +150,21 @@ distributed actor Philosopher: CustomStringConvertible {

private func becomeEating() {
self.state = .eating
self.log.notice("\(self.self.name) began eating!")
self.timers.startSingle(key: .becomeHungry, delay: .seconds(3)) {
await self.stopEating()
self.log.notice("\(self.name) began eating!")
self.finishEatingTimerTask = Task {
for await _ in AsyncTimerSequence(interval: .seconds(3), clock: ContinuousClock()) {
self.stopEating()
self.finishEatingTimerTask?.cancel()
break
}
}
}
}

extension TimerKey {
static let becomeHungry: Self = "become-hungry"
static let finishEating: Self = "finish-eating"
deinit {
// FIXME: these are async
// self.becomeHungryTimerTask?.cancel()
// self.finishEatingTimerTask?.cancel()
}
}

extension Philosopher {
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ extension ClusterShell {
"handshake/retryDelay": "\(retryDelay)",
])
context.timers.startSingle(
key: TimerKey("handshake-timer-\(remoteNode)"),
key: _TimerKey("handshake-timer-\(remoteNode)"),
message: .command(.retryHandshake(initiated)),
delay: retryDelay
)
Expand Down
45 changes: 30 additions & 15 deletions Sources/DistributedActors/Cluster/Downing/DowningStrategy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import Distributed
import Logging

Expand All @@ -35,8 +36,8 @@ public struct DowningStrategyDirective {
internal enum Repr {
case none
case markAsDown(Set<Cluster.Member>)
case startTimer(key: TimerKey, member: Cluster.Member, delay: Duration)
case cancelTimer(key: TimerKey)
case startTimer(member: Cluster.Member, delay: Duration)
case cancelTimer(member: Cluster.Member)
}

internal init(_ underlying: Repr) {
Expand All @@ -47,12 +48,12 @@ public struct DowningStrategyDirective {
.init(.none)
}

public static func startTimer(key: TimerKey, member: Cluster.Member, delay: Duration) -> Self {
.init(.startTimer(key: key, member: member, delay: delay))
public static func startTimer(member: Cluster.Member, delay: Duration) -> Self {
.init(.startTimer(member: member, delay: delay))
}

public static func cancelTimer(key: TimerKey) -> Self {
.init(.cancelTimer(key: key))
public static func cancelTimer(member: Cluster.Member) -> Self {
.init(.cancelTimer(member: member))
}

public static func markAsDown(members: Set<Cluster.Member>) -> Self {
Expand Down Expand Up @@ -86,8 +87,8 @@ internal distributed actor DowningStrategyShell {

/// `Task` for subscribing to cluster events.
private var eventsListeningTask: Task<Void, Error>?

private lazy var timers = ActorTimers<DowningStrategyShell>(self)
/// `Task` for timers
private var memberTimerTasks: [Cluster.Member: Task<Void, Error>] = [:]

init(_ strategy: DowningStrategy, system: ActorSystem) async {
self.strategy = strategy
Expand All @@ -101,6 +102,9 @@ internal distributed actor DowningStrategyShell {

deinit {
self.eventsListeningTask?.cancel()
self.memberTimerTasks.values.forEach { timerTask in
timerTask.cancel()
}
}

func receiveClusterEvent(_ event: Cluster.Event) throws {
Expand All @@ -113,20 +117,31 @@ internal distributed actor DowningStrategyShell {
case .markAsDown(let members):
self.markAsDown(members: members)

case .startTimer(let key, let member, let delay):
self.log.trace("Start timer \(key), member: \(member), delay: \(delay)")
self.timers.startSingle(key: key, delay: delay) {
self.onTimeout(member: member)
case .startTimer(let member, let delay):
self.log.trace("Start timer for member: \(member), delay: \(delay)")
self.memberTimerTasks[member] = Task {
for await _ in AsyncTimerSequence(interval: delay, clock: ContinuousClock()) {
self.onTimeout(member: member)
// Single-shot; cancel task immediately after it has been fired
self.cancelTimer(for: member)
break
}
}
case .cancelTimer(let key):
self.log.trace("Cancel timer \(key)")
self.timers.cancel(for: key)
case .cancelTimer(let member):
self.log.trace("Cancel timer for member: \(member)")
self.cancelTimer(for: member)

case .none:
() // nothing to be done
}
}

private func cancelTimer(for member: Cluster.Member) {
if let timerTask = self.memberTimerTasks.removeValue(forKey: member) {
timerTask.cancel()
}
}

func markAsDown(members: Set<Cluster.Member>) {
for member in members {
self.log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
// it was marked as down by someone, we don't need to track it anymore
_ = self._markAsDown.remove(change.member)
_ = self._unreachable.remove(change.member)
return .cancelTimer(key: self.timerKey(change.member))
return .cancelTimer(member: change.member)
} else if let replaced = change.replaced {
_ = self._markAsDown.remove(replaced)
_ = self._unreachable.remove(replaced)
Expand Down Expand Up @@ -95,7 +95,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {

self._unreachable.insert(member)

return .startTimer(key: self.timerKey(member), member: member, delay: self.settings.downUnreachableMembersAfter)
return .startTimer(member: member, delay: self.settings.downUnreachableMembersAfter)
}

func onMemberReachable(_ change: Cluster.ReachabilityChange) -> DowningStrategyDirective {
Expand All @@ -104,16 +104,12 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {

_ = self._markAsDown.remove(member)
if self._unreachable.remove(member) != nil {
return .cancelTimer(key: self.timerKey(member))
return .cancelTimer(member: member)
}

return .none
}

func timerKey(_ member: Cluster.Member) -> TimerKey {
TimerKey(member.uniqueNode)
}

func onLeaderChange(to leader: Cluster.Member?) throws -> DowningStrategyDirective {
_ = try self.membership.applyLeadershipChange(to: leader)

Expand All @@ -129,7 +125,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy {
self._markAsDown.remove(member)

if self._unreachable.remove(member) != nil {
return .cancelTimer(key: self.timerKey(member))
return .cancelTimer(member: member)
}

return .none
Expand Down
Loading