Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ tests:
- require-callstack
- hs-opentelemetry-sdk
- resourcet
- pqueue
main: Main.hs
source-dirs: test
other-modules:
Expand Down
1 change: 1 addition & 0 deletions sdk/temporal-sdk.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ test-suite temporal-sdk-tests
, monad-logger
, mtl
, network
, pqueue
, proto-lens
, proto-lens-protobuf-types
, random
Expand Down
27 changes: 27 additions & 0 deletions sdk/test/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,33 @@ needsClient = do
2
lift $ C.waitWorkflowResult wfH `shouldReturn` 25

it "processes signals in order of delivery" $ \TestEnv {..} -> do
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) baseConf
withWorker conf $ do
let opts =
(C.startWorkflowOptions taskQueue)
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
, C.timeouts =
C.TimeoutOptions
{ C.runTimeout = Just $ seconds 4
, C.executionTimeout = Nothing
, C.taskTimeout = Nothing
}
}
useClient $ do
liftIO $ putStrLn "signalWithStart call"
wfH <-
C.signalWithStart
SignalEnqueuesItemWorkflow
"signalWithStartWithQueue"
opts
signalWithArgs
1

C.signal wfH signalWithArgs C.defaultSignalOptions 2
lift $ C.waitWorkflowResult wfH `shouldReturn` [1, 2]


-- specify "works as intended and returns correct runId" pending
describe "RetryPolicy" $ do
specify "is used for retryable failures" $ \TestEnv {..} -> do
Expand Down
17 changes: 17 additions & 0 deletions sdk/test/IntegrationSpec/Signals.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Temporal.Duration
import Temporal.Payload
import Temporal.TH
import Temporal.Workflow
import qualified Data.PQueue.Prio.Min as MinPQueue


unblockWorkflowSignal :: KnownSignal '[]
Expand Down Expand Up @@ -53,3 +54,19 @@ signalWithArgsWorkflow init = provideCallStack do


registerWorkflow 'signalWithArgsWorkflow


signalEnqueuesItemWorkflow :: Workflow [Int]
signalEnqueuesItemWorkflow = provideCallStack do
var <- newStateVar MinPQueue.empty
setSignalHandler signalWithArgs $ \i -> do
$(logDebug) "updating value and unblocking workflow"
t <- now
readStateVar var >>= \s -> writeStateVar var (MinPQueue.insert t i s)
Comment on lines +64 to +65
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fact that this works implies that things are getting processed in-order, so it looks like statevar operations are messed up.

fwiw i also tried this with modifyStateVar & it reproduced the issue which means it's not a read *> write sequence issue.


sleep (seconds 2)

waitCondition $ (not . MinPQueue.null) <$> readStateVar var
(map snd . MinPQueue.toList) <$> readStateVar var

registerWorkflow 'signalEnqueuesItemWorkflow