diff --git a/chasm/lib/callback/component.go b/chasm/lib/callback/component.go index efeb5752447..9c6aa4facef 100644 --- a/chasm/lib/callback/component.go +++ b/chasm/lib/callback/component.go @@ -29,7 +29,7 @@ type Callback struct { *callbackspb.CallbackState // Interface to retrieve Nexus operation completion data - CompletionSource chasm.Field[CompletionSource] + CompletionSource chasm.ParentPtr[CompletionSource] } func NewCallback( diff --git a/chasm/lib/callback/executors_test.go b/chasm/lib/callback/executors_test.go index 71df6f46804..220cdcbe8e8 100644 --- a/chasm/lib/callback/executors_test.go +++ b/chasm/lib/callback/executors_test.go @@ -5,10 +5,8 @@ import ( "encoding/base64" "errors" "net/http" - "reflect" "testing" "time" - "unsafe" "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" @@ -34,38 +32,41 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" ) -// mockNexusCompletionGetter implements CanGetNexusCompletion for testing -type mockNexusCompletionGetter struct { +type mockNexusCompletionGetterComponent struct { + chasm.UnimplementedComponent + + Empty *emptypb.Empty + completion nexusrpc.OperationCompletion err error + + Callback chasm.Field[*Callback] } -func (m *mockNexusCompletionGetter) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) { +func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) { return m.completion, m.err } -// setFieldValue is a test helper that uses reflection to set the internal value of a chasm.Field. -// This is necessary for testing because: -// 1. The Field API (NewComponentField, NewDataField) only supports chasm.Component and proto.Message types -// 2. CanGetNexusCompletion is a plain interface, not a Component -// 3. The fieldInternal struct and its fields are unexported -// -// In production, Fields are typically initialized through proper CHASM lifecycle methods or -// by using NewComponentField/NewDataField with appropriate types. -// TODO (seankane): Move this helper to the chasm/chasmtest package -func setFieldValue[T any](field *chasm.Field[T], value T) { - // Get the Internal field (which is exported) - internalField := reflect.ValueOf(field).Elem().FieldByName("Internal") - - // Get the unexported 'v' field using unsafe pointer manipulation - vField := internalField.FieldByName("v") - vField = reflect.NewAt(vField.Type(), unsafe.Pointer(vField.UnsafeAddr())).Elem() - - // Set the value - vField.Set(reflect.ValueOf(value)) +func (m *mockNexusCompletionGetterComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState { + return chasm.LifecycleStateRunning +} + +type mockNexusCompletionGetterLibrary struct { + chasm.UnimplementedLibrary +} + +func (l *mockNexusCompletionGetterLibrary) Name() string { + return "mock" +} + +func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComponent { + return []*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*mockNexusCompletionGetterComponent]("nexusCompletionGetter"), + } } // Test the full executeInvocationTask flow with direct executor calls @@ -153,16 +154,44 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { metrics.DestinationTag("http://localhost"), metrics.OutcomeTag(tc.expectedMetricOutcome)) - // Create completion - completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{}) - require.NoError(t, err) - // Setup logger and time source - logger := log.NewNoopLogger() + logger := log.NewTestLogger() timeSource := clock.NewEventTimeSource() timeSource.Update(time.Now()) - // Create callback in SCHEDULED state + // Create task executor with mock namespace registry + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil) + + // Create mock engine + mockEngine := chasm.NewMockEngine(ctrl) + executor := &InvocationTaskExecutor{ + config: &Config{ + RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second), + RetryPolicy: func() backoff.RetryPolicy { + return backoff.NewExponentialRetryPolicy(time.Second) + }, + }, + namespaceRegistry: nsRegistry, + metricsHandler: metricsHandler, + logger: logger, + httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller { + return tc.caller + }, + chasmEngine: mockEngine, + } + + chasmRegistry := chasm.NewRegistry(logger) + err := chasmRegistry.Register(&Library{ + InvocationTaskExecutor: executor, + }) + require.NoError(t, err) + err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{}) + require.NoError(t, err) + + nodeBackend := &chasm.MockNodeBackend{} + root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger) + callback := &Callback{ CallbackState: &callbackspb.CallbackState{ RequestId: "request-id", @@ -179,21 +208,21 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { }, } - // Set up the CompletionSource field to return our mock - completionGetter := &mockNexusCompletionGetter{ - completion: completion, - } - // Set the CanGetNexusCompletion field using reflection. - // This is necessary because CanGetNexusCompletion is a plain interface, - // not a chasm.Component, so we can't use NewComponentField. - setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter)) - - // Create task executor with mock namespace registry - nsRegistry := namespace.NewMockRegistry(ctrl) - nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil) + // Create completion + completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{}) + require.NoError(t, err) - // Create mock engine - mockEngine := chasm.NewMockEngine(ctrl) + // Set up the CompletionSource field to return our mock completion + root.SetRootComponent(&mockNexusCompletionGetterComponent{ + completion: completion, + // Create callback in SCHEDULED state + Callback: chasm.NewComponentField( + chasm.NewMutableContext(context.Background(), root), + callback, + ), + }) + _, err = root.CloseTransaction() + require.NoError(t, err) // Setup engine expectations to directly call executor logic with MockMutableContext mockEngine.EXPECT().ReadComponent( @@ -231,22 +260,6 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { return nil, err }) - executor := &InvocationTaskExecutor{ - config: &Config{ - RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second), - RetryPolicy: func() backoff.RetryPolicy { - return backoff.NewExponentialRetryPolicy(time.Second) - }, - }, - namespaceRegistry: nsRegistry, - metricsHandler: metricsHandler, - logger: logger, - httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller { - return tc.caller - }, - chasmEngine: mockEngine, - } - // Create ComponentRef ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{ NamespaceID: "namespace-id", @@ -275,7 +288,7 @@ func TestProcessBackoffTask(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - logger := log.NewNoopLogger() + logger := log.NewTestLogger() timeSource := clock.NewEventTimeSource() timeSource.Update(time.Now()) @@ -545,10 +558,41 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) { historyClient := tc.setupHistoryClient(t, ctrl) // Setup logger and time source - logger := log.NewNoopLogger() + logger := log.NewTestLogger() timeSource := clock.NewEventTimeSource() timeSource.Update(time.Now()) + // Create mock namespace registry + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil) + + // Create mock engine and setup expectations + mockEngine := chasm.NewMockEngine(ctrl) + executor := &InvocationTaskExecutor{ + config: &Config{ + RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second), + RetryPolicy: func() backoff.RetryPolicy { + return backoff.NewExponentialRetryPolicy(time.Second) + }, + }, + namespaceRegistry: nsRegistry, + metricsHandler: metrics.NoopMetricsHandler, + logger: logger, + historyClient: historyClient, + chasmEngine: mockEngine, + } + + chasmRegistry := chasm.NewRegistry(logger) + err := chasmRegistry.Register(&Library{ + InvocationTaskExecutor: executor, + }) + require.NoError(t, err) + err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{}) + require.NoError(t, err) + + nodeBackend := &chasm.MockNodeBackend{} + root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger) + // Create headers headers := nexus.Header{} if tc.headerValue != "" { @@ -573,18 +617,18 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) { }, } - // Set up the CompletionSource field - completionGetter := &mockNexusCompletionGetter{ + // Set up the CompletionSource field to return our mock completion + root.SetRootComponent(&mockNexusCompletionGetterComponent{ completion: tc.completion, - } - setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter)) - - // Create mock namespace registry - nsRegistry := namespace.NewMockRegistry(ctrl) - nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil) + // Create callback in SCHEDULED state + Callback: chasm.NewComponentField( + chasm.NewMutableContext(context.Background(), root), + callback, + ), + }) + _, err = root.CloseTransaction() + require.NoError(t, err) - // Create mock engine and setup expectations - mockEngine := chasm.NewMockEngine(ctrl) mockEngine.EXPECT().ReadComponent( gomock.Any(), gomock.Any(), @@ -633,20 +677,6 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) { return nil, err }) - executor := InvocationTaskExecutor{ - config: &Config{ - RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second), - RetryPolicy: func() backoff.RetryPolicy { - return backoff.NewExponentialRetryPolicy(time.Second) - }, - }, - namespaceRegistry: nsRegistry, - metricsHandler: metrics.NoopMetricsHandler, - logger: logger, - historyClient: historyClient, - chasmEngine: mockEngine, - } - // Create ComponentRef ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{ NamespaceID: "namespace-id", diff --git a/chasm/lib/scheduler/backfiller.go b/chasm/lib/scheduler/backfiller.go index fc0470fb12d..02022074a13 100644 --- a/chasm/lib/scheduler/backfiller.go +++ b/chasm/lib/scheduler/backfiller.go @@ -17,7 +17,7 @@ type Backfiller struct { *schedulerpb.BackfillerState - Scheduler chasm.Field[*Scheduler] + Scheduler chasm.ParentPtr[*Scheduler] } type BackfillRequestType int @@ -39,7 +39,6 @@ func newBackfiller( BackfillId: id, LastProcessedTime: timestamppb.New(ctx.Now(scheduler)), }, - Scheduler: chasm.ComponentPointerTo(ctx, scheduler), } // Immediately schedule the first backfiller task. diff --git a/chasm/lib/scheduler/backfiller_tasks_test.go b/chasm/lib/scheduler/backfiller_tasks_test.go index 408fea95aa6..e379811cd37 100644 --- a/chasm/lib/scheduler/backfiller_tasks_test.go +++ b/chasm/lib/scheduler/backfiller_tasks_test.go @@ -215,8 +215,10 @@ func (s *backfillerTasksSuite) TestBackfillTask_PartialFill() { } func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) { - sched := s.scheduler ctx := s.newMutableContext() + schedComponent, err := s.node.Component(ctx, chasm.ComponentRef{}) + s.NoError(err) + sched := schedComponent.(*scheduler.Scheduler) invoker := sched.Invoker.Get(ctx) // Exactly one type of request can be set per Backfiller. @@ -232,7 +234,7 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) { } // Either type of request will spawn a Backfiller and schedule an immediate pure task. - _, err := s.node.CloseTransaction() + _, err = s.node.CloseTransaction() s.NoError(err) // Run a backfill task. diff --git a/chasm/lib/scheduler/generator.go b/chasm/lib/scheduler/generator.go index ded2407e4eb..e8e0d20b855 100644 --- a/chasm/lib/scheduler/generator.go +++ b/chasm/lib/scheduler/generator.go @@ -13,17 +13,16 @@ type Generator struct { *schedulerpb.GeneratorState - Scheduler chasm.Field[*Scheduler] + Scheduler chasm.ParentPtr[*Scheduler] } // NewGenerator returns an intialized Generator component, which should // be parented under a Scheduler root node. -func NewGenerator(ctx chasm.MutableContext, scheduler *Scheduler, invoker *Invoker) *Generator { +func NewGenerator(ctx chasm.MutableContext) *Generator { generator := &Generator{ GeneratorState: &schedulerpb.GeneratorState{ LastProcessedTime: nil, }, - Scheduler: chasm.ComponentPointerTo(ctx, scheduler), } // Kick off initial generator run. diff --git a/chasm/lib/scheduler/invoker.go b/chasm/lib/scheduler/invoker.go index 5dcf5500cca..1f136d26f52 100644 --- a/chasm/lib/scheduler/invoker.go +++ b/chasm/lib/scheduler/invoker.go @@ -18,7 +18,7 @@ type Invoker struct { *schedulerpb.InvokerState - Scheduler chasm.Field[*Scheduler] + Scheduler chasm.ParentPtr[*Scheduler] } func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState { @@ -27,13 +27,12 @@ func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState { // NewInvoker returns an intialized Invoker component, which should // be parented under a Scheduler root component. -func NewInvoker(ctx chasm.MutableContext, scheduler *Scheduler) *Invoker { +func NewInvoker(ctx chasm.MutableContext) *Invoker { return &Invoker{ InvokerState: &schedulerpb.InvokerState{ BufferedStarts: []*schedulespb.BufferedStart{}, RequestIdToWorkflowId: make(map[string]string), }, - Scheduler: chasm.ComponentPointerTo(ctx, scheduler), } } diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index a542664b3f6..c376c5b9f1d 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -97,10 +97,10 @@ func NewScheduler( sched.Info.CreateTime = timestamppb.New(ctx.Now(sched)) sched.Schedule.State = &schedulepb.ScheduleState{} - invoker := NewInvoker(ctx, sched) + invoker := NewInvoker(ctx) sched.Invoker = chasm.NewComponentField(ctx, invoker) - generator := NewGenerator(ctx, sched, invoker) + generator := NewGenerator(ctx) sched.Generator = chasm.NewComponentField(ctx, generator) // Create backfillers to fulfill initialPatch. diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index b7308cf488c..337de487a76 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -109,11 +109,6 @@ func (w *Workflow) AddCompletionCallbacks( // Create and add callback callbackObj := callback.NewCallback(requestID, eventTime, &callbackspb.CallbackState{}, chasmCB) - // Initialize the Workflow field with a pointer to the parent Workflow component - // Use ComponentPointerTo instead of NewComponentField to avoid infinite recursion during tree sync. - // We need to manually create the Field[chasm.Component] from Field[*Workflow] since Go generics - // don't support covariance. - callbackObj.CompletionSource = chasm.Field[callback.CompletionSource](chasm.ComponentPointerTo(ctx, w)) w.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj) } return nil diff --git a/chasm/parent_pointer.go b/chasm/parent_pointer.go index 73ed14014b7..7c7cdb2c93b 100644 --- a/chasm/parent_pointer.go +++ b/chasm/parent_pointer.go @@ -19,7 +19,7 @@ const ( // // ParentPtr is only initialized and available for use **after** the transition that // creates the component using ParentPtr is completed. -type ParentPtr[T Component] struct { +type ParentPtr[T any] struct { // Exporting this field as this generic struct needs to be created via reflection, // and reflection can't set private fields. Internal parentPtrInternal