Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ library:
# hackage dependencies
- aeson
- async
- atomic-primops
- lens-family
- monad-logger
- network-bsd
Expand Down
3 changes: 2 additions & 1 deletion core/src/Temporal/Core/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import Control.Exception
import Control.Monad
import Data.Aeson
import Data.Aeson.TH
import Data.Atomics (atomicModifyIORefCAS)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL
import Data.IORef
Expand Down Expand Up @@ -258,7 +259,7 @@ instance Exception WorkerAlreadyClosed
-- After calling this, the worker must not be used again.
closeWorker :: Worker ty -> IO ()
closeWorker (Worker w _ _ _) = mask_ $ do
wp <- atomicModifyIORef' w $ \wp -> (throw WorkerAlreadyClosed, wp)
wp <- liftIO $ atomicModifyIORefCAS w $ \wp -> (throw WorkerAlreadyClosed, wp)
Comment on lines -261 to +262
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, atomicModifyIORefCAS is speculative but this is probably fine because only the computation that produces this is retried which is basically just "swap vars"..?

raw_closeWorker wp


Expand Down
1 change: 1 addition & 0 deletions core/temporal-sdk-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ library
build-depends:
aeson
, async
, atomic-primops
, base >=4.14 && <5
, bytestring
, containers
Expand Down
1 change: 1 addition & 0 deletions sdk/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- proto-lens-protobuf-types
- temporal-api-protos
- temporal-sdk-core
- atomic-primops
- bytestring
- discover-instances
- mtl
Expand Down
13 changes: 7 additions & 6 deletions sdk/src/Temporal/Workflow/Internal/Instance.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Temporal.Workflow.Internal.Instance (
) where

import Control.Monad.Reader
import Data.Atomics (atomicModifyIORefCAS)
import Data.ProtoLens
import qualified Data.Text as T
import GHC.Stack
Expand Down Expand Up @@ -62,46 +63,46 @@ flushCommands = do
nextExternalCancelSequence :: InstanceM Sequence
nextExternalCancelSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = externalCancel seqs
in (seqs {externalCancel = succ seq'}, Sequence seq')


nextChildWorkflowSequence :: InstanceM Sequence
nextChildWorkflowSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = childWorkflow seqs
in (seqs {childWorkflow = succ seq'}, Sequence seq')


nextExternalSignalSequence :: InstanceM Sequence
nextExternalSignalSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = externalSignal seqs
in (seqs {externalSignal = succ seq'}, Sequence seq')


nextTimerSequence :: InstanceM Sequence
nextTimerSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = timer seqs
in (seqs {timer = succ seq'}, Sequence seq')


nextActivitySequence :: InstanceM Sequence
nextActivitySequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = activity seqs
in (seqs {activity = succ seq'}, Sequence seq')


nextConditionSequence :: InstanceM Sequence
nextConditionSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = condition seqs
in (seqs {condition = succ seq'}, Sequence seq')
6 changes: 3 additions & 3 deletions sdk/src/Temporal/Workflow/Internal/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Control.Monad
import qualified Control.Monad.Catch as Catch
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Kind
Expand All @@ -19,7 +20,6 @@ import Data.Time.Clock.System (SystemTime)
import Data.Vault.Strict
import Data.Vector (Vector)
import Data.Word (Word32)
-- import Debug.Trace
import GHC.Stack
import GHC.TypeLits
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation
Expand Down Expand Up @@ -343,7 +343,7 @@ instance FrozenGen StdGen Workflow where
{-# INLINE addJob #-}
addJob :: ContinuationEnv -> Workflow b -> IVar b -> IVar a -> InstanceM ()
addJob env !wf !resultIVar IVar {ivarRef = !ref} =
join $ atomicModifyIORef' ref $ \case
join $ liftIO $ atomicModifyIORefCAS ref $ \case
IVarEmpty list -> (IVarEmpty (JobCons env wf resultIVar list), pure ())
full -> (full, modifyIORef' env.runQueueRef (JobCons env wf resultIVar))

Expand Down Expand Up @@ -921,6 +921,6 @@ instance Monoid WorkflowOutboundInterceptor where
nextVarIdSequence :: InstanceM Sequence
nextVarIdSequence = do
inst <- ask
atomicModifyIORef' inst.workflowSequences $ \seqs ->
liftIO $ atomicModifyIORefCAS inst.workflowSequences $ \seqs ->
let seq' = varId seqs
in (seqs {varId = succ seq'}, Sequence seq')
2 changes: 2 additions & 0 deletions sdk/temporal-sdk.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ library
aeson
, annotated-exception
, async
, atomic-primops
, base >=4.7 && <5
, base64
, bytestring
Expand Down Expand Up @@ -187,6 +188,7 @@ test-suite temporal-sdk-tests
aeson
, annotated-exception
, async
, atomic-primops
, base
, base64
, bytestring
Expand Down
Loading