Skip to content

Commit 3904355

Browse files
committed
Add tests of cross-component subscribe/notify
1 parent 7958efd commit 3904355

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed

tests/standalone_activity_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.temporal.io/server/chasm/lib/activity"
1818
chasmactivitypb "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
1919
"go.temporal.io/server/common/dynamicconfig"
20+
"go.temporal.io/server/common/payload"
2021
"go.temporal.io/server/tests/testcore"
2122
"google.golang.org/protobuf/proto"
2223
"google.golang.org/protobuf/types/known/durationpb"
@@ -320,6 +321,106 @@ func (s *standaloneActivityTestSuite) Test_PollActivityExecution_WaitAnyStateCha
320321
require.NotNil(t, secondPollResp.StateChangeLongPollToken)
321322
}
322323

324+
// Test_PollVisibility_UpdateFromParent tests that polling for visibility component is woken up when
325+
// the parent activity is updated and modifies the visibility.
326+
// TODO(dan): this test uses chasm APIs (ReadComponent, UpdateComponent, PollComponent) directly. Is
327+
// this illegitimate in tests/?
328+
func (s *standaloneActivityTestSuite) Test_PollVisibility_UpdateFromParent() {
329+
t := s.T()
330+
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
331+
defer cancel()
332+
ctx = chasm.NewEngineContext(ctx, s.chasmEngine)
333+
334+
activityID := testcore.RandomizeStr(t.Name())
335+
taskQueue := uuid.New().String()
336+
337+
startResp, err := s.startActivity(ctx, activityID, taskQueue)
338+
require.NoError(t, err)
339+
entityKey := chasm.EntityKey{
340+
NamespaceID: s.NamespaceID().String(),
341+
BusinessID: activityID,
342+
EntityID: startResp.RunId,
343+
}
344+
345+
visibilityRef, err := chasm.ReadComponent(
346+
ctx,
347+
chasm.NewComponentRef[*activity.Activity](entityKey),
348+
func(a *activity.Activity, ctx chasm.Context, _ any) ([]byte, error) {
349+
visibility, err := a.Visibility.Get(ctx)
350+
if err != nil {
351+
return nil, err
352+
}
353+
return ctx.Ref(visibility)
354+
},
355+
nil,
356+
)
357+
require.NoError(t, err)
358+
require.NotNil(t, visibilityRef)
359+
360+
pollStarted := make(chan struct{})
361+
pollCompleted := make(chan []byte, 1)
362+
pollError := make(chan error, 1)
363+
364+
go func() {
365+
close(pollStarted)
366+
367+
_, ref, err := chasm.PollComponent(
368+
ctx,
369+
visibilityRef,
370+
func(v *chasm.Visibility, ctx chasm.Context, _ any) (any, bool, error) {
371+
sa, err := v.SA.Get(ctx)
372+
if err != nil {
373+
return nil, false, err
374+
}
375+
if sa != nil && len(sa.IndexedFields) > 0 {
376+
// State has changed, stop waiting
377+
return nil, true, nil
378+
}
379+
// State hasn't changed yet, keep waiting
380+
return nil, false, nil
381+
},
382+
nil,
383+
)
384+
if err != nil {
385+
pollError <- err
386+
} else {
387+
pollCompleted <- ref
388+
}
389+
}()
390+
391+
<-pollStarted
392+
// Hope that subscription has been established after an arbitrary amount of time
393+
// TODO(dan)
394+
time.Sleep(100 * time.Millisecond)
395+
396+
// Modify the visibility component via an update targeting its parent activity component
397+
_, _, err = chasm.UpdateComponent(
398+
ctx,
399+
chasm.NewComponentRef[*activity.Activity](entityKey),
400+
func(a *activity.Activity, ctx chasm.MutableContext, _ any) (any, error) {
401+
visibility, err := a.Visibility.Get(ctx)
402+
if err != nil {
403+
return nil, err
404+
}
405+
visibility.SA = chasm.NewDataField(ctx, &commonpb.SearchAttributes{
406+
IndexedFields: map[string]*commonpb.Payload{
407+
"TestField": payload.EncodeString("updated from parent"),
408+
},
409+
})
410+
return nil, nil
411+
}, nil)
412+
require.NoError(t, err)
413+
414+
select {
415+
case ref := <-pollCompleted:
416+
require.NotNil(t, ref)
417+
case err := <-pollError:
418+
t.Fatalf("Poll failed with error: %v", err)
419+
case <-time.After(5 * time.Second):
420+
t.Fatal("Poll did not complete within timeout")
421+
}
422+
}
423+
323424
func (s *standaloneActivityTestSuite) assertActivityExecutionInfo(
324425
t *testing.T,
325426
info *activitypb.ActivityExecutionInfo,

0 commit comments

Comments
 (0)