Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Callback struct {
*callbackspb.CallbackState

// Interface to retrieve Nexus operation completion data
CompletionSource chasm.Field[CompletionSource]
CompletionSource chasm.ParentPtr[CompletionSource]
}

func NewCallback(
Expand Down
202 changes: 116 additions & 86 deletions chasm/lib/callback/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"encoding/base64"
"errors"
"net/http"
"reflect"
"testing"
"time"
"unsafe"

"github.com/nexus-rpc/sdk-go/nexus"
"github.com/stretchr/testify/require"
Expand All @@ -34,38 +32,41 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// mockNexusCompletionGetter implements CanGetNexusCompletion for testing
type mockNexusCompletionGetter struct {
type mockNexusCompletionGetterComponent struct {
chasm.UnimplementedComponent

Empty *emptypb.Empty

completion nexusrpc.OperationCompletion
err error

Callback chasm.Field[*Callback]
}

func (m *mockNexusCompletionGetter) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
return m.completion, m.err
}

// setFieldValue is a test helper that uses reflection to set the internal value of a chasm.Field.
// This is necessary for testing because:
// 1. The Field API (NewComponentField, NewDataField) only supports chasm.Component and proto.Message types
// 2. CanGetNexusCompletion is a plain interface, not a Component
// 3. The fieldInternal struct and its fields are unexported
//
// In production, Fields are typically initialized through proper CHASM lifecycle methods or
// by using NewComponentField/NewDataField with appropriate types.
// TODO (seankane): Move this helper to the chasm/chasmtest package
func setFieldValue[T any](field *chasm.Field[T], value T) {
// Get the Internal field (which is exported)
internalField := reflect.ValueOf(field).Elem().FieldByName("Internal")

// Get the unexported 'v' field using unsafe pointer manipulation
vField := internalField.FieldByName("v")
vField = reflect.NewAt(vField.Type(), unsafe.Pointer(vField.UnsafeAddr())).Elem()

// Set the value
vField.Set(reflect.ValueOf(value))
func (m *mockNexusCompletionGetterComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState {
return chasm.LifecycleStateRunning
}

type mockNexusCompletionGetterLibrary struct {
chasm.UnimplementedLibrary
}

func (l *mockNexusCompletionGetterLibrary) Name() string {
return "mock"
}

func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*mockNexusCompletionGetterComponent]("nexusCompletionGetter"),
}
}

// Test the full executeInvocationTask flow with direct executor calls
Expand Down Expand Up @@ -153,16 +154,44 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
metrics.DestinationTag("http://localhost"),
metrics.OutcomeTag(tc.expectedMetricOutcome))

// Create completion
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
require.NoError(t, err)

// Setup logger and time source
logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

// Create callback in SCHEDULED state
// Create task executor with mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)

// Create mock engine
mockEngine := chasm.NewMockEngine(ctrl)
executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metricsHandler,
logger: logger,
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
return tc.caller
},
chasmEngine: mockEngine,
}

chasmRegistry := chasm.NewRegistry(logger)
err := chasmRegistry.Register(&Library{
InvocationTaskExecutor: executor,
})
require.NoError(t, err)
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
require.NoError(t, err)

nodeBackend := &chasm.MockNodeBackend{}
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)

callback := &Callback{
CallbackState: &callbackspb.CallbackState{
RequestId: "request-id",
Expand All @@ -179,21 +208,21 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
},
}

// Set up the CompletionSource field to return our mock
completionGetter := &mockNexusCompletionGetter{
completion: completion,
}
// Set the CanGetNexusCompletion field using reflection.
// This is necessary because CanGetNexusCompletion is a plain interface,
// not a chasm.Component, so we can't use NewComponentField.
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))

// Create task executor with mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
// Create completion
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
require.NoError(t, err)

// Create mock engine
mockEngine := chasm.NewMockEngine(ctrl)
// Set up the CompletionSource field to return our mock completion
root.SetRootComponent(&mockNexusCompletionGetterComponent{
completion: completion,
// Create callback in SCHEDULED state
Callback: chasm.NewComponentField(
chasm.NewMutableContext(context.Background(), root),
callback,
),
})
_, err = root.CloseTransaction()
require.NoError(t, err)

// Setup engine expectations to directly call executor logic with MockMutableContext
mockEngine.EXPECT().ReadComponent(
Expand Down Expand Up @@ -231,22 +260,6 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
return nil, err
})

executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metricsHandler,
logger: logger,
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
return tc.caller
},
chasmEngine: mockEngine,
}

// Create ComponentRef
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
NamespaceID: "namespace-id",
Expand Down Expand Up @@ -275,7 +288,7 @@ func TestProcessBackoffTask(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

Expand Down Expand Up @@ -545,10 +558,41 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
historyClient := tc.setupHistoryClient(t, ctrl)

// Setup logger and time source
logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

// Create mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)

// Create mock engine and setup expectations
mockEngine := chasm.NewMockEngine(ctrl)
executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metrics.NoopMetricsHandler,
logger: logger,
historyClient: historyClient,
chasmEngine: mockEngine,
}

chasmRegistry := chasm.NewRegistry(logger)
err := chasmRegistry.Register(&Library{
InvocationTaskExecutor: executor,
})
require.NoError(t, err)
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
require.NoError(t, err)

nodeBackend := &chasm.MockNodeBackend{}
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)

// Create headers
headers := nexus.Header{}
if tc.headerValue != "" {
Expand All @@ -573,18 +617,18 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
},
}

// Set up the CompletionSource field
completionGetter := &mockNexusCompletionGetter{
// Set up the CompletionSource field to return our mock completion
root.SetRootComponent(&mockNexusCompletionGetterComponent{
completion: tc.completion,
}
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))

// Create mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
// Create callback in SCHEDULED state
Callback: chasm.NewComponentField(
chasm.NewMutableContext(context.Background(), root),
callback,
),
})
_, err = root.CloseTransaction()
require.NoError(t, err)

// Create mock engine and setup expectations
mockEngine := chasm.NewMockEngine(ctrl)
mockEngine.EXPECT().ReadComponent(
gomock.Any(),
gomock.Any(),
Expand Down Expand Up @@ -633,20 +677,6 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
return nil, err
})

executor := InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metrics.NoopMetricsHandler,
logger: logger,
historyClient: historyClient,
chasmEngine: mockEngine,
}

// Create ComponentRef
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
NamespaceID: "namespace-id",
Expand Down
3 changes: 1 addition & 2 deletions chasm/lib/scheduler/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Backfiller struct {

*schedulerpb.BackfillerState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

type BackfillRequestType int
Expand All @@ -39,7 +39,6 @@ func newBackfiller(
BackfillId: id,
LastProcessedTime: timestamppb.New(ctx.Now(scheduler)),
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}

// Immediately schedule the first backfiller task.
Expand Down
6 changes: 4 additions & 2 deletions chasm/lib/scheduler/backfiller_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ func (s *backfillerTasksSuite) TestBackfillTask_PartialFill() {
}

func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
sched := s.scheduler
ctx := s.newMutableContext()
schedComponent, err := s.node.Component(ctx, chasm.ComponentRef{})
s.NoError(err)
sched := schedComponent.(*scheduler.Scheduler)
invoker := sched.Invoker.Get(ctx)

// Exactly one type of request can be set per Backfiller.
Expand All @@ -232,7 +234,7 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
}

// Either type of request will spawn a Backfiller and schedule an immediate pure task.
_, err := s.node.CloseTransaction()
_, err = s.node.CloseTransaction()
s.NoError(err)

// Run a backfill task.
Expand Down
5 changes: 2 additions & 3 deletions chasm/lib/scheduler/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ type Generator struct {

*schedulerpb.GeneratorState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

// NewGenerator returns an intialized Generator component, which should
// be parented under a Scheduler root node.
func NewGenerator(ctx chasm.MutableContext, scheduler *Scheduler, invoker *Invoker) *Generator {
func NewGenerator(ctx chasm.MutableContext) *Generator {
generator := &Generator{
GeneratorState: &schedulerpb.GeneratorState{
LastProcessedTime: nil,
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}

// Kick off initial generator run.
Expand Down
5 changes: 2 additions & 3 deletions chasm/lib/scheduler/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Invoker struct {

*schedulerpb.InvokerState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
Expand All @@ -27,13 +27,12 @@ func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {

// NewInvoker returns an intialized Invoker component, which should
// be parented under a Scheduler root component.
func NewInvoker(ctx chasm.MutableContext, scheduler *Scheduler) *Invoker {
func NewInvoker(ctx chasm.MutableContext) *Invoker {
return &Invoker{
InvokerState: &schedulerpb.InvokerState{
BufferedStarts: []*schedulespb.BufferedStart{},
RequestIdToWorkflowId: make(map[string]string),
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}
}

Expand Down
4 changes: 2 additions & 2 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func NewScheduler(
sched.Info.CreateTime = timestamppb.New(ctx.Now(sched))
sched.Schedule.State = &schedulepb.ScheduleState{}

invoker := NewInvoker(ctx, sched)
invoker := NewInvoker(ctx)
sched.Invoker = chasm.NewComponentField(ctx, invoker)

generator := NewGenerator(ctx, sched, invoker)
generator := NewGenerator(ctx)
sched.Generator = chasm.NewComponentField(ctx, generator)

// Create backfillers to fulfill initialPatch.
Expand Down
Loading
Loading