Skip to content

Commit e9a1223

Browse files
committed
Move StaleState test to unit tests
1 parent 49c50bd commit e9a1223

File tree

2 files changed

+50
-29
lines changed

2 files changed

+50
-29
lines changed

service/history/chasm_engine_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,53 @@ func (s *chasmEngineSuite) TestPollComponent_Success_Wait() {
719719
s.Equal(activityID, <-newActivityID)
720720
}
721721

722+
// TestPollComponent_StaleState tests that PollComponent returns a user-friendly Unavailable error
723+
// when the submitted component reference is ahead of persisted state (e.g. due to namespace
724+
// failover).
725+
func (s *chasmEngineSuite) TestPollComponent_StaleState() {
726+
tv := testvars.New(s.T())
727+
tv = tv.WithRunID(tv.Any().RunID())
728+
729+
entityKey := chasm.EntityKey{
730+
NamespaceID: string(tests.NamespaceID),
731+
BusinessID: tv.WorkflowID(),
732+
EntityID: tv.RunID(),
733+
}
734+
735+
s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
736+
Return(&persistence.GetWorkflowExecutionResponse{
737+
State: s.buildPersistenceMutableState(entityKey, &persistencespb.ActivityInfo{}),
738+
}, nil).AnyTimes()
739+
740+
pRef := &persistencespb.ChasmComponentRef{
741+
NamespaceId: entityKey.NamespaceID,
742+
BusinessId: entityKey.BusinessID,
743+
EntityId: entityKey.EntityID,
744+
Archetype: "TestLibrary.test_component",
745+
EntityVersionedTransition: &persistencespb.VersionedTransition{
746+
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 1, // ahead of persisted state
747+
TransitionCount: testTransitionCount,
748+
},
749+
}
750+
staleToken, err := pRef.Marshal()
751+
s.NoError(err)
752+
staleRef, err := chasm.DeserializeComponentRef(staleToken)
753+
s.NoError(err)
754+
755+
_, err = s.engine.PollComponent(
756+
context.Background(),
757+
staleRef,
758+
func(ctx chasm.Context, component chasm.Component) (bool, error) {
759+
s.Fail("predicate should not be called with stale ref")
760+
return false, nil
761+
},
762+
)
763+
s.Error(err)
764+
var unavailable *serviceerror.Unavailable
765+
s.ErrorAs(err, &unavailable)
766+
s.Equal("please retry", unavailable.Message)
767+
}
768+
722769
func (s *chasmEngineSuite) buildPersistenceMutableState(
723770
key chasm.EntityKey,
724771
componentState proto.Message,
@@ -736,7 +783,7 @@ func (s *chasmEngineSuite) buildPersistenceMutableState(
736783
TransitionHistory: []*persistencespb.VersionedTransition{
737784
{
738785
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
739-
TransitionCount: 10,
786+
TransitionCount: testTransitionCount,
740787
},
741788
},
742789
ExecutionStats: &persistencespb.ExecutionStats{},
@@ -756,7 +803,7 @@ func (s *chasmEngineSuite) buildPersistenceMutableState(
756803
},
757804
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{
758805
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
759-
TransitionCount: 10,
806+
TransitionCount: testTransitionCount,
760807
},
761808
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
762809
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
@@ -781,6 +828,7 @@ func (s *chasmEngineSuite) serializeComponentState(
781828
const (
782829
testComponentPausedSAName = "PausedSA"
783830
testComponentPausedMemoName = "PausedMemo"
831+
testTransitionCount = 10
784832
)
785833

786834
var (

tests/standalone_activity_test.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"go.temporal.io/api/serviceerror"
1515
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1616
"go.temporal.io/api/workflowservice/v1"
17-
persistencespb "go.temporal.io/server/api/persistence/v1"
1817
"go.temporal.io/server/chasm"
1918
"go.temporal.io/server/common/dynamicconfig"
2019
"go.temporal.io/server/common/testing/testvars"
@@ -366,32 +365,6 @@ func (s *standaloneActivityTestSuite) Test_PollActivityExecution_WaitAnyStateCha
366365

367366
err = <-taskQueuePollErr
368367
require.NoError(t, err)
369-
370-
// Manipulate the token to verify token staleness checks (simulate ErrStaleReference). To do so
371-
// we make use of the internal implementation detail that the bytes are a serialized ref.
372-
token := firstPollResp.StateChangeLongPollToken
373-
var pRef persistencespb.ChasmComponentRef
374-
err = pRef.Unmarshal(token)
375-
require.NoError(t, err)
376-
if pRef.EntityVersionedTransition != nil {
377-
pRef.EntityVersionedTransition.NamespaceFailoverVersion += 1
378-
}
379-
token, err = pRef.Marshal()
380-
require.NoError(t, err)
381-
382-
_, err = s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{
383-
Namespace: s.Namespace().String(),
384-
ActivityId: activityID,
385-
RunId: startResp.RunId,
386-
WaitPolicy: &workflowservice.PollActivityExecutionRequest_WaitAnyStateChange{
387-
WaitAnyStateChange: &workflowservice.PollActivityExecutionRequest_StateChangeWaitOptions{
388-
LongPollToken: token,
389-
},
390-
},
391-
})
392-
var unavailableErr *serviceerror.Unavailable
393-
require.ErrorAs(t, err, &unavailableErr)
394-
require.ErrorContains(t, err, "please retry")
395368
}
396369

397370
func (s *standaloneActivityTestSuite) Test_PollActivityExecution_WaitCompletion() {

0 commit comments

Comments
 (0)