Skip to content

Commit 2c9211f

Browse files
authored
CHASM: implement UseExisting ID conflict policy (#8544)
## What changed? - Implement ID conflict policy UseExisting - Introduce chasm.ExecutionAlreadyStartedError, same semantic as serviceerror.WorkflowExecutionAlreadyStarted. ## Why? - UseExisting conflict will be used by components we are building. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [x] added new functional test(s)
1 parent 5288c39 commit 2c9211f

File tree

6 files changed

+186
-18
lines changed

6 files changed

+186
-18
lines changed

chasm/engine.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ type BusinessIDConflictPolicy int
6060

6161
const (
6262
BusinessIDConflictPolicyFail BusinessIDConflictPolicy = iota
63-
BusinessIDConflictPolicyTermiateExisting
64-
// TODO: Do we want to support UseExisting conflict policy?
65-
// BusinessIDConflictPolicyUseExisting
63+
BusinessIDConflictPolicyTerminateExisting
64+
BusinessIDConflictPolicyUseExisting
6665
)
6766

6867
type TransitionOptions struct {
@@ -87,7 +86,9 @@ func WithSpeculative() TransitionOption {
8786
}
8887
}
8988

90-
// this only applies to NewEntity and UpdateWithNewEntity
89+
// WithBusinessIDPolicy sets the businessID reuse and conflict policy
90+
// used in the transition when creating a new entity.
91+
// This option only applies to NewEntity() and UpdateWithNewEntity().
9192
func WithBusinessIDPolicy(
9293
reusePolicy BusinessIDReusePolicy,
9394
conflictPolicy BusinessIDConflictPolicy,
@@ -98,7 +99,8 @@ func WithBusinessIDPolicy(
9899
}
99100
}
100101

101-
// this only applies to NewEntity and UpdateWithNewEntity
102+
// WithRequestID sets the requestID used when creating a new entity.
103+
// This option only applies to NewEntity() and UpdateWithNewEntity().
102104
func WithRequestID(
103105
requestID string,
104106
) TransitionOption {

chasm/errors.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package chasm
2+
3+
type ExecutionAlreadyStartedError struct {
4+
Message string
5+
CurrentRequestID string
6+
CurrentRunID string
7+
}
8+
9+
func NewExecutionAlreadyStartedErr(
10+
message, currentRequestID, currentRunID string,
11+
) *ExecutionAlreadyStartedError {
12+
return &ExecutionAlreadyStartedError{
13+
Message: message,
14+
CurrentRequestID: currentRequestID,
15+
CurrentRunID: currentRunID,
16+
}
17+
}
18+
19+
func (e *ExecutionAlreadyStartedError) Error() string {
20+
return e.Message
21+
}

chasm/lib/tests/handler.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ import (
1212

1313
type (
1414
NewPayloadStoreRequest struct {
15-
NamespaceID namespace.ID
16-
StoreID string
15+
NamespaceID namespace.ID
16+
StoreID string
17+
IDReusePolicy chasm.BusinessIDReusePolicy
18+
IDConflictPolicy chasm.BusinessIDConflictPolicy
1719
}
1820

1921
NewPayloadStoreResponse struct {
@@ -84,6 +86,7 @@ func NewPayloadStoreHandler(
8486
return store, nil, err
8587
},
8688
nil,
89+
chasm.WithBusinessIDPolicy(request.IDReusePolicy, request.IDConflictPolicy),
8790
)
8891
if err != nil {
8992
return NewPayloadStoreResponse{}, err

service/history/chasm_engine.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (e *ChasmEngine) handleConflictPolicy(
447447
) (chasm.EntityKey, []byte, error) {
448448
switch conflictPolicy {
449449
case chasm.BusinessIDConflictPolicyFail:
450-
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
450+
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
451451
fmt.Sprintf(
452452
"CHASM execution still running. BusinessID: %s, RunID: %s, ID Conflict Policy: %v",
453453
newEntityParams.entityRef.EntityKey.BusinessID,
@@ -457,11 +457,26 @@ func (e *ChasmEngine) handleConflictPolicy(
457457
currentRunInfo.createRequestID,
458458
currentRunInfo.RunID,
459459
)
460-
case chasm.BusinessIDConflictPolicyTermiateExisting:
461-
// TODO: handle BusinessIDConflictPolicyTermiateExisting
460+
case chasm.BusinessIDConflictPolicyTerminateExisting:
461+
// TODO: handle BusinessIDConflictPolicyTerminateExisting and update TestNewEntity_ConflictPolicy_TerminateExisting.
462+
//
463+
// Today's state-based replication logic can not existly handle this policy correctly
464+
// (or any operation that close and starts a new run in one transaction).
465+
// The termination and creation of new run can not be replicated transactionally.
466+
//
467+
// The main blocker is that state-based replication works on the current state,
468+
// and we may have a chain of runs all created via TerminateExisting policy, meaning
469+
// replication has to replicated all of them transactionally.
470+
// We need a way to break this chain into consistent pieces and replicate them one by one.
462471
return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Terminate Existing is not yet supported")
463-
// case chasm.BusinessIDConflictPolicyUseExisting:
464-
// return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Use Existing is not yet supported")
472+
case chasm.BusinessIDConflictPolicyUseExisting:
473+
existingEntityRef := newEntityParams.entityRef
474+
existingEntityRef.EntityID = currentRunInfo.RunID
475+
serializedRef, err := existingEntityRef.Serialize(e.registry)
476+
if err != nil {
477+
return chasm.EntityKey{}, nil, err
478+
}
479+
return existingEntityRef.EntityKey, serializedRef, nil
465480
default:
466481
return chasm.EntityKey{}, nil, serviceerror.NewInternal(
467482
fmt.Sprintf("unknown business ID conflict policy for newEntity: %v", conflictPolicy),
@@ -482,7 +497,7 @@ func (e *ChasmEngine) handleReusePolicy(
482497
// Fallthrough to persist the new entity as current run.
483498
case chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly:
484499
if _, ok := consts.FailedWorkflowStatuses[currentRunInfo.Status]; !ok {
485-
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
500+
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
486501
fmt.Sprintf(
487502
"CHASM execution already completed successfully. BusinessID: %s, RunID: %s, ID Reuse Policy: %v",
488503
newEntityParams.entityRef.EntityKey.BusinessID,
@@ -495,7 +510,7 @@ func (e *ChasmEngine) handleReusePolicy(
495510
}
496511
// Fallthrough to persist the new entity as current run.
497512
case chasm.BusinessIDReusePolicyRejectDuplicate:
498-
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
513+
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
499514
fmt.Sprintf(
500515
"CHASM execution already finished. BusinessID: %s, RunID: %s, ID Reuse Policy: %v",
501516
newEntityParams.entityRef.EntityKey.BusinessID,

service/history/chasm_engine_test.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_FailedOnly_Fail() {
349349
chasm.BusinessIDConflictPolicyFail,
350350
),
351351
)
352-
s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err)
352+
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))
353353
}
354354

355355
func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() {
@@ -383,7 +383,87 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() {
383383
chasm.BusinessIDConflictPolicyFail,
384384
),
385385
)
386-
s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err)
386+
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))
387+
}
388+
389+
func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_UseExisting() {
390+
tv := testvars.New(s.T())
391+
tv = tv.WithRunID(tv.Any().RunID())
392+
393+
ref := chasm.NewComponentRef[*testComponent](
394+
chasm.EntityKey{
395+
NamespaceID: string(tests.NamespaceID),
396+
BusinessID: tv.WorkflowID(),
397+
EntityID: "",
398+
},
399+
)
400+
newActivityID := tv.ActivityID()
401+
// Current run is still running, conflict policy will be used.
402+
currentRunConditionFailedErr := s.currentRunConditionFailedErr(
403+
tv,
404+
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
405+
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
406+
)
407+
408+
s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
409+
nil,
410+
currentRunConditionFailedErr,
411+
).Times(1)
412+
413+
entityKey, serializedRef, err := s.engine.NewEntity(
414+
context.Background(),
415+
ref,
416+
s.newTestEntityFn(newActivityID),
417+
chasm.WithBusinessIDPolicy(
418+
chasm.BusinessIDReusePolicyAllowDuplicate,
419+
chasm.BusinessIDConflictPolicyUseExisting,
420+
),
421+
)
422+
s.NoError(err)
423+
424+
expectedEntityKey := chasm.EntityKey{
425+
NamespaceID: string(tests.NamespaceID),
426+
BusinessID: tv.WorkflowID(),
427+
EntityID: tv.RunID(),
428+
}
429+
s.Equal(expectedEntityKey, entityKey)
430+
s.validateNewEntityResponseRef(serializedRef, expectedEntityKey)
431+
}
432+
433+
func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_TerminateExisting() {
434+
tv := testvars.New(s.T())
435+
tv = tv.WithRunID(tv.Any().RunID())
436+
437+
ref := chasm.NewComponentRef[*testComponent](
438+
chasm.EntityKey{
439+
NamespaceID: string(tests.NamespaceID),
440+
BusinessID: tv.WorkflowID(),
441+
EntityID: "",
442+
},
443+
)
444+
newActivityID := tv.ActivityID()
445+
// Current run is still running, conflict policy will be used.
446+
currentRunConditionFailedErr := s.currentRunConditionFailedErr(
447+
tv,
448+
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
449+
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
450+
)
451+
452+
s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
453+
nil,
454+
currentRunConditionFailedErr,
455+
).Times(1)
456+
457+
_, _, err := s.engine.NewEntity(
458+
context.Background(),
459+
ref,
460+
s.newTestEntityFn(newActivityID),
461+
chasm.WithBusinessIDPolicy(
462+
chasm.BusinessIDReusePolicyAllowDuplicate,
463+
chasm.BusinessIDConflictPolicyTerminateExisting,
464+
),
465+
)
466+
s.ErrorAs(err, new(*serviceerror.Unimplemented))
387467
}
388468

389469
func (s *chasmEngineSuite) newTestEntityFn(

tests/chasm_test.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,58 @@ func (s *ChasmTestSuite) TestNewPayloadStore() {
5656
_, err := tests.NewPayloadStoreHandler(
5757
chasm.NewEngineContext(ctx, s.chasmEngine),
5858
tests.NewPayloadStoreRequest{
59-
NamespaceID: s.NamespaceID(),
60-
StoreID: tv.Any().String(),
59+
NamespaceID: s.NamespaceID(),
60+
StoreID: tv.Any().String(),
61+
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
62+
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
63+
},
64+
)
65+
s.NoError(err)
66+
}
67+
68+
func (s *ChasmTestSuite) TestNewPayloadStore_ConflictPolicy_UseExisting() {
69+
tv := testvars.New(s.T())
70+
71+
ctx, cancel := context.WithTimeout(context.Background(), chasmTestTimeout)
72+
defer cancel()
73+
74+
storeID := tv.Any().String()
75+
76+
resp, err := tests.NewPayloadStoreHandler(
77+
chasm.NewEngineContext(ctx, s.chasmEngine),
78+
tests.NewPayloadStoreRequest{
79+
NamespaceID: s.NamespaceID(),
80+
StoreID: storeID,
81+
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
82+
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
83+
},
84+
)
85+
s.NoError(err)
86+
87+
currentRunID := resp.RunID
88+
89+
resp, err = tests.NewPayloadStoreHandler(
90+
chasm.NewEngineContext(ctx, s.chasmEngine),
91+
tests.NewPayloadStoreRequest{
92+
NamespaceID: s.NamespaceID(),
93+
StoreID: storeID,
94+
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
95+
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
96+
},
97+
)
98+
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))
99+
100+
resp, err = tests.NewPayloadStoreHandler(
101+
chasm.NewEngineContext(ctx, s.chasmEngine),
102+
tests.NewPayloadStoreRequest{
103+
NamespaceID: s.NamespaceID(),
104+
StoreID: storeID,
105+
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
106+
IDConflictPolicy: chasm.BusinessIDConflictPolicyUseExisting,
61107
},
62108
)
63109
s.NoError(err)
110+
s.Equal(currentRunID, resp.RunID)
64111
}
65112

66113
func (s *ChasmTestSuite) TestPayloadStore_UpdateComponent() {

0 commit comments

Comments
 (0)