diff --git a/sdk/package.yaml b/sdk/package.yaml index fdc3bcb6..6861de9c 100644 --- a/sdk/package.yaml +++ b/sdk/package.yaml @@ -140,6 +140,7 @@ tests: - require-callstack - hs-opentelemetry-sdk - resourcet + - pqueue main: Main.hs source-dirs: test other-modules: diff --git a/sdk/temporal-sdk.cabal b/sdk/temporal-sdk.cabal index 706565f5..01e6c1df 100644 --- a/sdk/temporal-sdk.cabal +++ b/sdk/temporal-sdk.cabal @@ -209,6 +209,7 @@ test-suite temporal-sdk-tests , monad-logger , mtl , network + , pqueue , proto-lens , proto-lens-protobuf-types , random diff --git a/sdk/test/IntegrationSpec.hs b/sdk/test/IntegrationSpec.hs index 3546205f..2197d69e 100644 --- a/sdk/test/IntegrationSpec.hs +++ b/sdk/test/IntegrationSpec.hs @@ -1482,6 +1482,33 @@ needsClient = do 2 lift $ C.waitWorkflowResult wfH `shouldReturn` 25 + it "processes signals in order of delivery" $ \TestEnv {..} -> do + let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) baseConf + withWorker conf $ do + let opts = + (C.startWorkflowOptions taskQueue) + { C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate + , C.timeouts = + C.TimeoutOptions + { C.runTimeout = Just $ seconds 4 + , C.executionTimeout = Nothing + , C.taskTimeout = Nothing + } + } + useClient $ do + liftIO $ putStrLn "signalWithStart call" + wfH <- + C.signalWithStart + SignalEnqueuesItemWorkflow + "signalWithStartWithQueue" + opts + signalWithArgs + 1 + + C.signal wfH signalWithArgs C.defaultSignalOptions 2 + lift $ C.waitWorkflowResult wfH `shouldReturn` [1, 2] + + -- specify "works as intended and returns correct runId" pending describe "RetryPolicy" $ do specify "is used for retryable failures" $ \TestEnv {..} -> do diff --git a/sdk/test/IntegrationSpec/Signals.hs b/sdk/test/IntegrationSpec/Signals.hs index b10bfd68..21aacf1f 100644 --- a/sdk/test/IntegrationSpec/Signals.hs +++ b/sdk/test/IntegrationSpec/Signals.hs @@ -17,6 +17,7 @@ import Temporal.Duration import Temporal.Payload import Temporal.TH import Temporal.Workflow +import qualified Data.PQueue.Prio.Min as MinPQueue unblockWorkflowSignal :: KnownSignal '[] @@ -53,3 +54,19 @@ signalWithArgsWorkflow init = provideCallStack do registerWorkflow 'signalWithArgsWorkflow + + +signalEnqueuesItemWorkflow :: Workflow [Int] +signalEnqueuesItemWorkflow = provideCallStack do + var <- newStateVar MinPQueue.empty + setSignalHandler signalWithArgs $ \i -> do + $(logDebug) "updating value and unblocking workflow" + t <- now + readStateVar var >>= \s -> writeStateVar var (MinPQueue.insert t i s) + + sleep (seconds 2) + + waitCondition $ (not . MinPQueue.null) <$> readStateVar var + (map snd . MinPQueue.toList) <$> readStateVar var + +registerWorkflow 'signalEnqueuesItemWorkflow