Skip to content
Open
Show file tree
Hide file tree
Changes from 86 commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
f8ccafb
PollComponent and PollActivityExecution
dandavison Nov 12, 2025
46eb18e
Changes from @fretz12's review
dandavison Nov 13, 2025
11cac65
Edit docstrings
dandavison Nov 13, 2025
11c6b59
Non-error response on long-poll timeout
dandavison Nov 13, 2025
b24a022
Remove unused opts parameter of PollComponent
dandavison Nov 13, 2025
0db9858
Error on runID mismatch
dandavison Nov 14, 2025
c888969
Revert comment
dandavison Nov 14, 2025
5740320
Use framework-provided predicate to wait for any state change
dandavison Nov 14, 2025
1d27df4
Address code review comments
dandavison Nov 14, 2025
1107a99
Use component last updated VT
dandavison Nov 14, 2025
2ee2b75
Cleanup
dandavison Nov 15, 2025
ea12fcc
Refactor
dandavison Nov 15, 2025
5c2c41e
Refcator: combine handlers
dandavison Nov 15, 2025
c02c869
Rename: ComponentStateToken
dandavison Nov 15, 2025
e1a52d4
entity -> execution
dandavison Nov 15, 2025
24ef87c
Rename: ref.componentVT
dandavison Nov 15, 2025
5412f7e
Subscribe to notifications at component level, not execution level
dandavison Nov 15, 2025
612426f
Revert "Subscribe to notifications at component level, not execution …
dandavison Nov 17, 2025
7958efd
Add non-functional test
dandavison Nov 17, 2025
3904355
Add tests of cross-component subscribe/notify
dandavison Nov 17, 2025
00ef0ff
Rename: ComponentStateChanged
dandavison Nov 17, 2025
da60095
Revert to using ref as long-poll token
dandavison Nov 18, 2025
6d5fc28
Rename: ChasmExecutionNotification
dandavison Nov 18, 2025
a604d80
Functional test env
dandavison Nov 19, 2025
6917450
DEBUG
dandavison Nov 18, 2025
27aafa7
TestStartToClose
dandavison Nov 18, 2025
c623906
ScheduleToStart timeout
dandavison Nov 19, 2025
e05af5a
Revert "ScheduleToStart timeout"
dandavison Nov 19, 2025
6bd6d57
Evolve test
dandavison Nov 19, 2025
f325df8
Evolve test
dandavison Nov 19, 2025
099e6da
Fix expectation
dandavison Nov 19, 2025
7b23898
Revert "DEBUG"
dandavison Nov 19, 2025
b120e9c
PS
dandavison Nov 19, 2025
7e67862
Fix timeout code
dandavison Nov 20, 2025
055044f
Update test
dandavison Nov 20, 2025
affc857
TEMP: eliminate buffer
dandavison Nov 20, 2025
fa4e0ea
PS
dandavison Nov 20, 2025
f6c5290
WIP: transactionImpl
dandavison Nov 18, 2025
b540eb0
Revert "WIP: transactionImpl"
dandavison Nov 20, 2025
d1045de
Fix test
dandavison Nov 21, 2025
f48a2b8
Do not notify from UpdateComponent
dandavison Nov 20, 2025
c98c8f9
Wire through chasm notification to history engine
dandavison Nov 20, 2025
18159b6
Change signature
dandavison Nov 20, 2025
c4a0f1a
Send notification from MS transaction layer
dandavison Nov 20, 2025
310ae97
Update test
dandavison Nov 20, 2025
4ca1d33
Don't send ref in chasm execution notification
dandavison Nov 20, 2025
ccd63a4
Cleanup
dandavison Nov 20, 2025
7178c01
Test
dandavison Nov 20, 2025
a42fb44
Fix IncludeOutcome
dandavison Nov 20, 2025
c0ed2df
Test
dandavison Nov 20, 2025
5bd95d7
Revert accidental change
dandavison Nov 20, 2025
ecf4139
Cleanup
dandavison Nov 21, 2025
74e0a69
Update mock
dandavison Nov 21, 2025
e904d87
Rewrite chasm notifier
dandavison Nov 21, 2025
5b8a8be
subscribe outside
dandavison Nov 21, 2025
88e153d
Resubscribe
dandavison Nov 21, 2025
a500aa0
Fix mock
dandavison Nov 21, 2025
23fe864
PS
dandavison Nov 21, 2025
04a462d
Update test: becomes satisfied on 3rd state transition
dandavison Nov 21, 2025
e22f108
Fix test
dandavison Nov 21, 2025
ff48584
Increment in real Update
dandavison Nov 22, 2025
a84784c
DoAndReturn -> Return
dandavison Nov 22, 2025
3485fb9
Return constant empty workflow data
dandavison Nov 22, 2025
2f0b863
Times(1)
dandavison Nov 22, 2025
0fe23be
Add missing mock call expectation
dandavison Nov 22, 2025
8a410b6
Do Updates synchronously in main goroutine
dandavison Nov 22, 2025
8e83fb0
Notify on both MS snapshot and mutation
dandavison Nov 22, 2025
d8d7721
Don't resubscribe unless necessary; code golf
dandavison Nov 22, 2025
65b83a0
ungolf
dandavison Nov 23, 2025
8dbd866
Cleanup
dandavison Nov 22, 2025
f34f222
- PS
dandavison Nov 23, 2025
4146871
Partial revert 27aafa7cc2a91d8f24706b4ebf327b1d935a5f97
dandavison Nov 23, 2025
ddff1b5
Cleanup test
dandavison Nov 24, 2025
dc248f4
Delete cross-component polling tests
dandavison Nov 24, 2025
1b46b23
Use testcore.RandomizeStr(t.Name())
dandavison Nov 24, 2025
c7f4ab0
Check RunID
dandavison Nov 24, 2025
270c13b
code golf
dandavison Nov 24, 2025
fbc77f3
Revert "- PS"
dandavison Nov 24, 2025
e3d74a7
monotonicPredicateFn
dandavison Nov 24, 2025
1f169fb
Tests
dandavison Nov 24, 2025
2f10ee2
New deadline logic
dandavison Nov 24, 2025
0dda6cc
Validation & error messages fixes
dandavison Nov 24, 2025
d3c709c
Fix CHASM not found errors
dandavison Nov 25, 2025
1e5ca51
Test Outcome
dandavison Nov 25, 2025
36fb610
Evove tests
dandavison Nov 25, 2025
e5d900b
Move StaleState test to unit tests
dandavison Nov 25, 2025
07cc8cf
Rename
dandavison Nov 25, 2025
b7e5c98
Cleanup
dandavison Nov 25, 2025
5cb1637
Revert addition of new metrics
dandavison Nov 25, 2025
561b090
Subscribe does not return an error
dandavison Nov 25, 2025
8087fa5
Fix memory leak
dandavison Nov 25, 2025
e4dd35d
Clean up, fix validation
dandavison Nov 25, 2025
1e43970
Test absent RunID
dandavison Nov 25, 2025
1ad72b5
Don't return unused ref
dandavison Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions chasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ type Engine interface {
PollComponent(
context.Context,
ComponentRef,
func(Context, Component) (any, bool, error),
func(MutableContext, Component, any) error,
...TransitionOption,
func(Context, Component) (bool, error),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed operationFn from PollComponent since we don't need poll-with-mutation yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This totally works for me. We can discuss how to introduce this when the requirement comes up.

) ([]byte, error)
}

Expand Down Expand Up @@ -176,6 +174,9 @@ func UpdateWithNewEntity[C Component, I any, O1 any, O2 any](
// - consider remove ComponentRef from the return value and allow components to get
// the ref in the transition function. There are some caveats there, check the
// comment of the NewRef method in MutableContext.
//
// UpdateComponent applies updateFn to the component identified by the supplied component reference.
// It returns the result, along with the new component reference.
func UpdateComponent[C Component, R []byte | ComponentRef, I any, O any](
ctx context.Context,
r R,
Expand Down Expand Up @@ -207,6 +208,8 @@ func UpdateComponent[C Component, R []byte | ComponentRef, I any, O any](
return output, newSerializedRef, err
}

// ReadComponent returns the result of evaluating readFn against the current state of the component
// identified by the supplied component reference.
func ReadComponent[C Component, R []byte | ComponentRef, I any, O any](
ctx context.Context,
r R,
Expand Down Expand Up @@ -234,20 +237,18 @@ func ReadComponent[C Component, R []byte | ComponentRef, I any, O any](
return output, err
}

type PollComponentRequest[C Component, I any, O any] struct {
Ref ComponentRef
PredicateFn func(C, Context, I) bool
OperationFn func(C, MutableContext, I) (O, error)
Input I
}

func PollComponent[C Component, R []byte | ComponentRef, I any, O any, T any](
// PollComponent waits until the predicate is true when evaluated against the component identified
// by the supplied component reference. If it times out due to a server-imposed long-poll timeout
// then it returns (nil, nil, nil). Otherwise it returns (output, ref, err), where output is the
// output of the predicate function, and ref is a component reference identifying the state at which
// the predicate was satisfied. The predicate must be monotonic: if it returns true at execution
// state transition s it must return true at all transitions t > s. If the predicate is true at the
// outset then PollComponent returns immediately.
func PollComponent[C Component, R []byte | ComponentRef, I any, O any](
ctx context.Context,
r R,
predicateFn func(C, Context, I) (T, bool, error),
operationFn func(C, MutableContext, I, T) (O, error),
monotonicPredicateFn func(C, Context, I) (O, bool, error),
input I,
opts ...TransitionOption,
) (O, []byte, error) {
var output O

Expand All @@ -259,15 +260,13 @@ func PollComponent[C Component, R []byte | ComponentRef, I any, O any, T any](
newSerializedRef, err := engineFromContext(ctx).PollComponent(
ctx,
ref,
func(ctx Context, c Component) (any, bool, error) {
return predicateFn(c.(C), ctx, input)
func(ctx Context, c Component) (bool, error) {
out, satisfied, err := monotonicPredicateFn(c.(C), ctx, input)
if satisfied {
output = out
}
return satisfied, err
},
func(ctx MutableContext, c Component, t any) error {
var err error
output, err = operationFn(c.(C), ctx, input, t.(T))
return err
},
opts...,
)
if err != nil {
return output, nil, err
Expand Down
13 changes: 4 additions & 9 deletions chasm/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 142 additions & 3 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package activity

import (
"errors"
"fmt"
"time"

"go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand Down Expand Up @@ -37,14 +40,12 @@ type Activity struct {

*activitypb.ActivityState

// Standalone only
Visibility chasm.Field[*chasm.Visibility]
Attempt chasm.Field[*activitypb.ActivityAttemptState]
LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState]
Outcome chasm.Field[*activitypb.ActivityOutcome]
// Standalone only
RequestData chasm.Field[*activitypb.ActivityRequestData]
Outcome chasm.Field[*activitypb.ActivityOutcome]

// Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to
// the workflow. For a standalone activity this would be nil.
// TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently.
Expand Down Expand Up @@ -314,3 +315,141 @@ func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.P
})
return nil, nil
}

func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) {
if a.ActivityState == nil {
return nil, errors.New("activity state is nil")
}

// TODO(dan): support pause states
var status enumspb.ActivityExecutionStatus
var runState enumspb.PendingActivityState
switch a.GetStatus() {
case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_STARTED
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_FAILED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT:
status = enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
default:
return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus())
}

requestData, err := a.RequestData.Get(ctx)
if err != nil {
return nil, err
}

key := ctx.ExecutionKey()

info := &activity.ActivityExecutionInfo{
ActivityId: key.BusinessID,
RunId: key.EntityID,
ActivityType: a.GetActivityType(),
Status: status,
RunState: runState,
ScheduledTime: a.GetScheduledTime(),
Priority: a.GetPriority(),
Header: requestData.GetHeader(),
// TODO(dan): populate remaining fields
}

return info, nil
}

func (a *Activity) buildPollActivityExecutionResponse(
ctx chasm.Context,
req *activitypb.PollActivityExecutionRequest,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just pass in the frontend request here and save the conversion and confusion between req and request.

) (*activitypb.PollActivityExecutionResponse, error) {
request := req.GetFrontendRequest()

token, err := ctx.Ref(a)
if err != nil {
return nil, err
}

var info *activity.ActivityExecutionInfo
if request.GetIncludeInfo() {
info, err = a.buildActivityExecutionInfo(ctx)
if err != nil {
return nil, err
}
}

var input *commonpb.Payloads
if request.GetIncludeInput() {
activityRequest, err := a.RequestData.Get(ctx)
if err != nil {
return nil, err
}
input = activityRequest.GetInput()
}

response := &workflowservice.PollActivityExecutionResponse{
Info: info,
RunId: ctx.ExecutionKey().EntityID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I see we have the run ID here too but here you used the right one.

Input: input,
StateChangeLongPollToken: token,
}

if request.GetIncludeOutcome() {
activityOutcome, err := a.Outcome.Get(ctx)
if err != nil {
return nil, err
}
if activityOutcome != nil {
switch v := activityOutcome.GetVariant().(type) {
case *activitypb.ActivityOutcome_Failed_:
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
Failure: v.Failed.GetFailure(),
}
case *activitypb.ActivityOutcome_Successful_:
response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{
Result: v.Successful.GetOutput(),
}
}
} else {
shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)

if shouldHaveFailure {
attempt, err := a.Attempt.Get(ctx)
if err != nil {
return nil, err
}
if details := attempt.GetLastFailureDetails(); details != nil {
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
Failure: details.GetFailure(),
}
}
}
}
}

return &activitypb.PollActivityExecutionResponse{
FrontendResponse: response,
}, nil
}
26 changes: 26 additions & 0 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf
return resp.GetFrontendResponse(), err
}

// PollActivityExecution handles PollActivityExecutionRequest. This method supports querying current
// activity state, optionally as a long-poll that waits for certain state changes. It is used by
// clients to poll for activity state and/or result.
func (h *frontendHandler) PollActivityExecution(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're missing validation here. At minimum, we'll need to validate that the activity ID is not empty and a check that run ID is provided if long poll token is provided.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there flag combinations that are invalid maybe?

ctx context.Context,
req *workflowservice.PollActivityExecutionRequest,
) (*workflowservice.PollActivityExecutionResponse, error) {
// Validate the request
if err := ValidatePollActivityExecutionRequest(
req,
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
); err != nil {
return nil, err
}

namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
}
resp, err := h.client.PollActivityExecution(ctx, &activitypb.PollActivityExecutionRequest{
NamespaceId: namespaceID.String(),
FrontendRequest: req,
})
return resp.GetFrontendResponse(), err
}

func (h *frontendHandler) validateAndPopulateStartRequest(
req *workflowservice.StartActivityExecutionRequest,
namespaceID namespace.ID,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading