Change generic EventStream<Event> to ClusterEventStream and convert to actor#1012
Change generic EventStream<Event> to ClusterEventStream and convert to actor#1012ktoso merged 7 commits intoapple:mainfrom
Conversation
|
Hmm, test failures were all in [Edit]: Fixed in 7460615 |
Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift
Outdated
Show resolved
Hide resolved
| } | ||
| Task { | ||
| await context.system.cluster.events.subscribe(onClusterEventRef) | ||
| } |
There was a problem hiding this comment.
| } | |
| } // TODO(send): anther call site benefitting from `send` |
There was a problem hiding this comment.
Again racy -- can't access context from other task/thread.
Maybe we should make a nonisolated func subscribe() and that should do the Task{} inside there? this is too risky;
| context.subReceive(Cluster.Event.self) { event in | ||
| self.onClusterEvent(context, event: event) | ||
| } | ||
| ) |
There was a problem hiding this comment.
This isn't thread-safe now 😱
Cannot access context from a different task.
Oh man the lack of send hurts real bad here...
There was a problem hiding this comment.
let eventStream = context.system.cluster.events
let subReceive = context.subReceive(Cluster.Event.self) { event in
self.onClusterEvent(context, event: event)
}
Task { await eventStream.subscribe(subReceive) } // TODO(send): lack of send was super problematic here, we'd have caused a race condition by accessing `context` from `Task` (!!!)There was a problem hiding this comment.
FYI @DougGregor some "fun" examples how missing send and therefore forcing people into entering a Task{} just to call a void async function is causing all kinds of ugly code and race conditions when integrating with other code.
| context.log.trace("Configured with \(self.election)") | ||
| context.system.cluster.events.subscribe(context.myself) | ||
| Task { | ||
| await context.system.cluster.events.subscribe(context.myself) |
There was a problem hiding this comment.
We can't touch behavior actor context from a different task ,this is racy -- needs the same fix as the other place, get the events into a let before the task
There was a problem hiding this comment.
Eh sorry I merged before you had a chance to address this one @yim-lee
| } | ||
| context.system.cluster.events.subscribe(onClusterEventRef) | ||
| Task { | ||
| await context.system.cluster.events.subscribe(onClusterEventRef) |
There was a problem hiding this comment.
Same issue; let's move to nonisolated func subscribe() { Task { so we can avoid this issue
ktoso
left a comment
There was a problem hiding this comment.
LGTM, just need to fix those context accesses - preferably by a nonisolated not async func on the event stream that does the Task { for us
Co-authored-by: Konrad `ktoso` Malawski <konrad.malawski@project13.pl>
|
LGTM, very nice cleanup -- less and less behaviors in the codebase :) A little bitter sweet... 😅 🥲 |
Resolves #821