Skip to content

Commit 2946fc5

Browse files
committed
fix signal delivery with priority-ordering
1 parent 6c0e06b commit 2946fc5

File tree

4 files changed

+8
-6
lines changed

4 files changed

+8
-6
lines changed

sdk/package.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ tests:
140140
- require-callstack
141141
- hs-opentelemetry-sdk
142142
- resourcet
143+
- pqueue
143144
main: Main.hs
144145
source-dirs: test
145146
other-modules:

sdk/temporal-sdk.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ test-suite temporal-sdk-tests
209209
, monad-logger
210210
, mtl
211211
, network
212+
, pqueue
212213
, proto-lens
213214
, proto-lens-protobuf-types
214215
, random

sdk/test/IntegrationSpec.hs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,8 +1505,6 @@ needsClient = do
15051505
signalWithArgs
15061506
1
15071507

1508-
-- liftIO $ threadDelay 1_000
1509-
15101508
C.signal wfH signalWithArgs C.defaultSignalOptions 2
15111509
lift $ C.waitWorkflowResult wfH `shouldReturn` [1, 2]
15121510

sdk/test/IntegrationSpec/Signals.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import Temporal.Duration
1717
import Temporal.Payload
1818
import Temporal.TH
1919
import Temporal.Workflow
20+
import qualified Data.PQueue.Prio.Min as MinPQueue
2021

2122

2223
unblockWorkflowSignal :: KnownSignal '[]
@@ -57,14 +58,15 @@ registerWorkflow 'signalWithArgsWorkflow
5758

5859
signalEnqueuesItemWorkflow :: Workflow [Int]
5960
signalEnqueuesItemWorkflow = provideCallStack do
60-
var <- newStateVar []
61+
var <- newStateVar MinPQueue.empty
6162
setSignalHandler signalWithArgs $ \i -> do
6263
$(logDebug) "updating value and unblocking workflow"
63-
readStateVar var >>= \s -> writeStateVar var (s <> [i])
64+
t <- now
65+
readStateVar var >>= \s -> writeStateVar var (MinPQueue.insert t i s)
6466

6567
sleep (seconds 2)
6668

67-
waitCondition $ (not . null) <$> readStateVar var
68-
readStateVar var
69+
waitCondition $ (not . MinPQueue.null) <$> readStateVar var
70+
(map snd . MinPQueue.toList) <$> readStateVar var
6971

7072
registerWorkflow 'signalEnqueuesItemWorkflow

0 commit comments

Comments
 (0)