Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Callback struct {
*callbackspb.CallbackState

// Interface to retrieve Nexus operation completion data
CompletionSource chasm.Field[CompletionSource]
CompletionSource chasm.ParentPtr[CompletionSource]
}

func NewCallback(
Expand Down
204 changes: 117 additions & 87 deletions chasm/lib/callback/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/base64"
"errors"
"net/http"
"reflect"
"testing"
"time"
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/nexus-rpc/sdk-go/nexus"
"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
Expand All @@ -33,39 +32,41 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// mockNexusCompletionGetter implements CanGetNexusCompletion for testing
type mockNexusCompletionGetter struct {
type mockNexusCompletionGetterComponent struct {
chasm.UnimplementedComponent

Empty *emptypb.Empty

completion nexusrpc.OperationCompletion
err error

Callback chasm.Field[*Callback]
}

func (m *mockNexusCompletionGetter) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
return m.completion, m.err
}

// setFieldValue is a test helper that uses reflection to set the internal value of a chasm.Field.
// This is necessary for testing because:
// 1. The Field API (NewComponentField, NewDataField) only supports chasm.Component and proto.Message types
// 2. CanGetNexusCompletion is a plain interface, not a Component
// 3. The fieldInternal struct and its fields are unexported
//
// In production, Fields are typically initialized through proper CHASM lifecycle methods or
// by using NewComponentField/NewDataField with appropriate types.
// TODO (seankane): Move this helper to the chasm/chasmtest package
func setFieldValue[T any](field *chasm.Field[T], value T) {
// Get the Internal field (which is exported)
internalField := reflect.ValueOf(field).Elem().FieldByName("Internal")

// Get the unexported 'v' field using unsafe pointer manipulation
vField := internalField.FieldByName("v")
vField = reflect.NewAt(vField.Type(), unsafe.Pointer(vField.UnsafeAddr())).Elem()

// Set the value
vField.Set(reflect.ValueOf(value))
func (m *mockNexusCompletionGetterComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState {
return chasm.LifecycleStateRunning
}

type mockNexusCompletionGetterLibrary struct {
chasm.UnimplementedLibrary
}

func (l *mockNexusCompletionGetterLibrary) Name() string {
return "mock"
}

func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*mockNexusCompletionGetterComponent]("nexusCompletionGetter"),
}
}

// Test the full executeInvocationTask flow with direct executor calls
Expand Down Expand Up @@ -153,16 +154,44 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
metrics.DestinationTag("http://localhost"),
metrics.OutcomeTag(tc.expectedMetricOutcome))

// Create completion
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
require.NoError(t, err)

// Setup logger and time source
logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

// Create callback in SCHEDULED state
// Create task executor with mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)

// Create mock engine
mockEngine := chasm.NewMockEngine(ctrl)
executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metricsHandler,
logger: logger,
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
return tc.caller
},
chasmEngine: mockEngine,
}

chasmRegistry := chasm.NewRegistry(logger)
err := chasmRegistry.Register(&Library{
InvocationTaskExecutor: executor,
})
require.NoError(t, err)
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
require.NoError(t, err)

nodeBackend := &chasm.MockNodeBackend{}
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)

callback := &Callback{
CallbackState: &callbackspb.CallbackState{
RequestId: "request-id",
Expand All @@ -179,21 +208,21 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
},
}

// Set up the CompletionSource field to return our mock
completionGetter := &mockNexusCompletionGetter{
completion: completion,
}
// Set the CanGetNexusCompletion field using reflection.
// This is necessary because CanGetNexusCompletion is a plain interface,
// not a chasm.Component, so we can't use NewComponentField.
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))

// Create task executor with mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
// Create completion
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
require.NoError(t, err)

// Create mock engine
mockEngine := chasm.NewMockEngine(ctrl)
// Set up the CompletionSource field to return our mock completion
root.SetRootComponent(&mockNexusCompletionGetterComponent{
completion: completion,
// Create callback in SCHEDULED state
Callback: chasm.NewComponentField(
chasm.NewMutableContext(context.Background(), root),
callback,
),
})
_, err = root.CloseTransaction()
require.NoError(t, err)

// Setup engine expectations to directly call executor logic with MockMutableContext
mockEngine.EXPECT().ReadComponent(
Expand Down Expand Up @@ -231,22 +260,6 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
return nil, err
})

executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metricsHandler,
logger: logger,
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
return tc.caller
},
chasmEngine: mockEngine,
}

// Create ComponentRef
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
NamespaceID: "namespace-id",
Expand Down Expand Up @@ -275,7 +288,7 @@ func TestProcessBackoffTask(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

Expand Down Expand Up @@ -545,10 +558,41 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
historyClient := tc.setupHistoryClient(t, ctrl)

// Setup logger and time source
logger := log.NewNoopLogger()
logger := log.NewTestLogger()
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())

// Create mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)

// Create mock engine and setup expectations
mockEngine := chasm.NewMockEngine(ctrl)
executor := &InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metrics.NoopMetricsHandler,
logger: logger,
historyClient: historyClient,
chasmEngine: mockEngine,
}

chasmRegistry := chasm.NewRegistry(logger)
err := chasmRegistry.Register(&Library{
InvocationTaskExecutor: executor,
})
require.NoError(t, err)
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
require.NoError(t, err)

nodeBackend := &chasm.MockNodeBackend{}
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)

// Create headers
headers := nexus.Header{}
if tc.headerValue != "" {
Expand All @@ -573,18 +617,18 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
},
}

// Set up the CompletionSource field
completionGetter := &mockNexusCompletionGetter{
// Set up the CompletionSource field to return our mock completion
root.SetRootComponent(&mockNexusCompletionGetterComponent{
completion: tc.completion,
}
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))

// Create mock namespace registry
nsRegistry := namespace.NewMockRegistry(ctrl)
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
// Create callback in SCHEDULED state
Callback: chasm.NewComponentField(
chasm.NewMutableContext(context.Background(), root),
callback,
),
})
_, err = root.CloseTransaction()
require.NoError(t, err)

// Create mock engine and setup expectations
mockEngine := chasm.NewMockEngine(ctrl)
mockEngine.EXPECT().ReadComponent(
gomock.Any(),
gomock.Any(),
Expand Down Expand Up @@ -633,20 +677,6 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
return nil, err
})

executor := InvocationTaskExecutor{
config: &Config{
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
RetryPolicy: func() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(time.Second)
},
},
namespaceRegistry: nsRegistry,
metricsHandler: metrics.NoopMetricsHandler,
logger: logger,
historyClient: historyClient,
chasmEngine: mockEngine,
}

// Create ComponentRef
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
NamespaceID: "namespace-id",
Expand Down
3 changes: 1 addition & 2 deletions chasm/lib/scheduler/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Backfiller struct {

*schedulerpb.BackfillerState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

type BackfillRequestType int
Expand All @@ -39,7 +39,6 @@ func newBackfiller(
BackfillId: id,
LastProcessedTime: timestamppb.New(ctx.Now(scheduler)),
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}

// Immediately schedule the first backfiller task.
Expand Down
6 changes: 4 additions & 2 deletions chasm/lib/scheduler/backfiller_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ func (s *backfillerTasksSuite) TestBackfillTask_PartialFill() {
}

func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
sched := s.scheduler
ctx := s.newMutableContext()
schedComponent, err := s.node.Component(ctx, chasm.ComponentRef{})
s.NoError(err)
sched := schedComponent.(*scheduler.Scheduler)
invoker := sched.Invoker.Get(ctx)

// Exactly one type of request can be set per Backfiller.
Expand All @@ -232,7 +234,7 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
}

// Either type of request will spawn a Backfiller and schedule an immediate pure task.
_, err := s.node.CloseTransaction()
_, err = s.node.CloseTransaction()
s.NoError(err)

// Run a backfill task.
Expand Down
5 changes: 2 additions & 3 deletions chasm/lib/scheduler/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ type Generator struct {

*schedulerpb.GeneratorState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

// NewGenerator returns an intialized Generator component, which should
// be parented under a Scheduler root node.
func NewGenerator(ctx chasm.MutableContext, scheduler *Scheduler, invoker *Invoker) *Generator {
func NewGenerator(ctx chasm.MutableContext) *Generator {
generator := &Generator{
GeneratorState: &schedulerpb.GeneratorState{
LastProcessedTime: nil,
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}

// Kick off initial generator run.
Expand Down
5 changes: 2 additions & 3 deletions chasm/lib/scheduler/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Invoker struct {

*schedulerpb.InvokerState

Scheduler chasm.Field[*Scheduler]
Scheduler chasm.ParentPtr[*Scheduler]
}

func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
Expand All @@ -27,13 +27,12 @@ func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {

// NewInvoker returns an intialized Invoker component, which should
// be parented under a Scheduler root component.
func NewInvoker(ctx chasm.MutableContext, scheduler *Scheduler) *Invoker {
func NewInvoker(ctx chasm.MutableContext) *Invoker {
return &Invoker{
InvokerState: &schedulerpb.InvokerState{
BufferedStarts: []*schedulespb.BufferedStart{},
RequestIdToWorkflowId: make(map[string]string),
},
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
}
}

Expand Down
4 changes: 2 additions & 2 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func NewScheduler(
sched.Info.CreateTime = timestamppb.New(ctx.Now(sched))
sched.Schedule.State = &schedulepb.ScheduleState{}

invoker := NewInvoker(ctx, sched)
invoker := NewInvoker(ctx)
sched.Invoker = chasm.NewComponentField(ctx, invoker)

generator := NewGenerator(ctx, sched, invoker)
generator := NewGenerator(ctx)
sched.Generator = chasm.NewComponentField(ctx, generator)

// Create backfillers to fulfill initialPatch.
Expand Down
Loading
Loading