Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/src/Temporal/Activity/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ data ActivityInfo = ActivityInfo
, retryPolicy :: Maybe RetryPolicy
, isLocal :: Bool
, taskToken :: TaskToken
, taskQueue :: TaskQueue
}
deriving stock (Eq)
8 changes: 5 additions & 3 deletions sdk/src/Temporal/Activity/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ execute worker = runActivityWorker worker go
go


activityInfoFromProto :: MonadIO m => TaskToken -> AT.Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto tt msg = do
activityInfoFromProto :: MonadIO m => TaskToken -> TaskQueue -> AT.Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto tt tq msg = do
w <- ask
hdrs <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.headerFields))
heartbeats <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.vec'heartbeatDetails))
Expand All @@ -119,6 +119,7 @@ activityInfoFromProto tt msg = do
, retryPolicy = fmap retryPolicyFromProto (msg ^. AT.maybe'retryPolicy)
, isLocal = msg ^. AT.isLocal
, taskToken = tt
, taskQueue = tq
}


Expand Down Expand Up @@ -158,7 +159,8 @@ applyActivityTaskStart :: (MonadUnliftIO m, MonadLogger m) => AT.ActivityTask ->
applyActivityTaskStart tsk tt msg = do
w <- ask
let c = Core.getWorkerConfig w.workerCore
info <- activityInfoFromProto tt msg
tq = Core.taskQueue c
info <- activityInfoFromProto tt (TaskQueue tq) msg
Logging.logInfo $
T.concat
[ "Starting activity: "
Expand Down
1 change: 1 addition & 0 deletions sdk/src/Temporal/Contrib/OpenTelemetry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ makeOpenTelemetryInterceptor = do
, ("temporal.attempt", toAttribute $ fromIntegral @Word32 @Int attempt)
, -- , ("temporal.namespace", toAttribute $ rawNamespace $ input.activityInfo.namespace)
("temporal.activity_is_local", toAttribute isLocal)
, ("temporal.task_queue", toAttribute $ rawTaskQueue taskQueue)
]
}

Expand Down
1 change: 1 addition & 0 deletions sdk/src/Temporal/Testing/MockActivityEnvironment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mkMockActivityEnvironment env = do
, scheduledTime = utcToSystemTime $ UTCTime systemEpochDay 0
, startToCloseTimeout = Just $ Duration.seconds 1
, startedTime = utcToSystemTime $ UTCTime systemEpochDay 0
, taskQueue = "test"
, taskToken = "test"
, workflowId = "test"
, workflowNamespace = "default"
Expand Down
Loading