Skip to content

Commit fdbae50

Browse files
authored
CHASM: Use ParentPtr in scheduler and callback (#8705)
## What changed? - Use ParentPtr in scheduler and callback ## Why? - Avoid the overhead of a persisted pointer ## How did you test it? - [x] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
1 parent 78caede commit fdbae50

File tree

9 files changed

+129
-105
lines changed

9 files changed

+129
-105
lines changed

chasm/lib/callback/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type Callback struct {
2929
*callbackspb.CallbackState
3030

3131
// Interface to retrieve Nexus operation completion data
32-
CompletionSource chasm.Field[CompletionSource]
32+
CompletionSource chasm.ParentPtr[CompletionSource]
3333
}
3434

3535
func NewCallback(

chasm/lib/callback/executors_test.go

Lines changed: 116 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"encoding/base64"
66
"errors"
77
"net/http"
8-
"reflect"
98
"testing"
109
"time"
11-
"unsafe"
1210

1311
"github.com/nexus-rpc/sdk-go/nexus"
1412
"github.com/stretchr/testify/require"
@@ -34,38 +32,41 @@ import (
3432
"google.golang.org/grpc/codes"
3533
"google.golang.org/grpc/status"
3634
"google.golang.org/protobuf/proto"
35+
"google.golang.org/protobuf/types/known/emptypb"
3736
"google.golang.org/protobuf/types/known/timestamppb"
3837
)
3938

40-
// mockNexusCompletionGetter implements CanGetNexusCompletion for testing
41-
type mockNexusCompletionGetter struct {
39+
type mockNexusCompletionGetterComponent struct {
40+
chasm.UnimplementedComponent
41+
42+
Empty *emptypb.Empty
43+
4244
completion nexusrpc.OperationCompletion
4345
err error
46+
47+
Callback chasm.Field[*Callback]
4448
}
4549

46-
func (m *mockNexusCompletionGetter) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
50+
func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
4751
return m.completion, m.err
4852
}
4953

50-
// setFieldValue is a test helper that uses reflection to set the internal value of a chasm.Field.
51-
// This is necessary for testing because:
52-
// 1. The Field API (NewComponentField, NewDataField) only supports chasm.Component and proto.Message types
53-
// 2. CanGetNexusCompletion is a plain interface, not a Component
54-
// 3. The fieldInternal struct and its fields are unexported
55-
//
56-
// In production, Fields are typically initialized through proper CHASM lifecycle methods or
57-
// by using NewComponentField/NewDataField with appropriate types.
58-
// TODO (seankane): Move this helper to the chasm/chasmtest package
59-
func setFieldValue[T any](field *chasm.Field[T], value T) {
60-
// Get the Internal field (which is exported)
61-
internalField := reflect.ValueOf(field).Elem().FieldByName("Internal")
62-
63-
// Get the unexported 'v' field using unsafe pointer manipulation
64-
vField := internalField.FieldByName("v")
65-
vField = reflect.NewAt(vField.Type(), unsafe.Pointer(vField.UnsafeAddr())).Elem()
66-
67-
// Set the value
68-
vField.Set(reflect.ValueOf(value))
54+
func (m *mockNexusCompletionGetterComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState {
55+
return chasm.LifecycleStateRunning
56+
}
57+
58+
type mockNexusCompletionGetterLibrary struct {
59+
chasm.UnimplementedLibrary
60+
}
61+
62+
func (l *mockNexusCompletionGetterLibrary) Name() string {
63+
return "mock"
64+
}
65+
66+
func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComponent {
67+
return []*chasm.RegistrableComponent{
68+
chasm.NewRegistrableComponent[*mockNexusCompletionGetterComponent]("nexusCompletionGetter"),
69+
}
6970
}
7071

7172
// Test the full executeInvocationTask flow with direct executor calls
@@ -153,16 +154,44 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
153154
metrics.DestinationTag("http://localhost"),
154155
metrics.OutcomeTag(tc.expectedMetricOutcome))
155156

156-
// Create completion
157-
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
158-
require.NoError(t, err)
159-
160157
// Setup logger and time source
161-
logger := log.NewNoopLogger()
158+
logger := log.NewTestLogger()
162159
timeSource := clock.NewEventTimeSource()
163160
timeSource.Update(time.Now())
164161

165-
// Create callback in SCHEDULED state
162+
// Create task executor with mock namespace registry
163+
nsRegistry := namespace.NewMockRegistry(ctrl)
164+
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
165+
166+
// Create mock engine
167+
mockEngine := chasm.NewMockEngine(ctrl)
168+
executor := &InvocationTaskExecutor{
169+
config: &Config{
170+
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
171+
RetryPolicy: func() backoff.RetryPolicy {
172+
return backoff.NewExponentialRetryPolicy(time.Second)
173+
},
174+
},
175+
namespaceRegistry: nsRegistry,
176+
metricsHandler: metricsHandler,
177+
logger: logger,
178+
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
179+
return tc.caller
180+
},
181+
chasmEngine: mockEngine,
182+
}
183+
184+
chasmRegistry := chasm.NewRegistry(logger)
185+
err := chasmRegistry.Register(&Library{
186+
InvocationTaskExecutor: executor,
187+
})
188+
require.NoError(t, err)
189+
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
190+
require.NoError(t, err)
191+
192+
nodeBackend := &chasm.MockNodeBackend{}
193+
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)
194+
166195
callback := &Callback{
167196
CallbackState: &callbackspb.CallbackState{
168197
RequestId: "request-id",
@@ -179,21 +208,21 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
179208
},
180209
}
181210

182-
// Set up the CompletionSource field to return our mock
183-
completionGetter := &mockNexusCompletionGetter{
184-
completion: completion,
185-
}
186-
// Set the CanGetNexusCompletion field using reflection.
187-
// This is necessary because CanGetNexusCompletion is a plain interface,
188-
// not a chasm.Component, so we can't use NewComponentField.
189-
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))
190-
191-
// Create task executor with mock namespace registry
192-
nsRegistry := namespace.NewMockRegistry(ctrl)
193-
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
211+
// Create completion
212+
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
213+
require.NoError(t, err)
194214

195-
// Create mock engine
196-
mockEngine := chasm.NewMockEngine(ctrl)
215+
// Set up the CompletionSource field to return our mock completion
216+
root.SetRootComponent(&mockNexusCompletionGetterComponent{
217+
completion: completion,
218+
// Create callback in SCHEDULED state
219+
Callback: chasm.NewComponentField(
220+
chasm.NewMutableContext(context.Background(), root),
221+
callback,
222+
),
223+
})
224+
_, err = root.CloseTransaction()
225+
require.NoError(t, err)
197226

198227
// Setup engine expectations to directly call executor logic with MockMutableContext
199228
mockEngine.EXPECT().ReadComponent(
@@ -231,22 +260,6 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
231260
return nil, err
232261
})
233262

234-
executor := &InvocationTaskExecutor{
235-
config: &Config{
236-
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
237-
RetryPolicy: func() backoff.RetryPolicy {
238-
return backoff.NewExponentialRetryPolicy(time.Second)
239-
},
240-
},
241-
namespaceRegistry: nsRegistry,
242-
metricsHandler: metricsHandler,
243-
logger: logger,
244-
httpCallerProvider: func(nid common.NamespaceIDAndDestination) HTTPCaller {
245-
return tc.caller
246-
},
247-
chasmEngine: mockEngine,
248-
}
249-
250263
// Create ComponentRef
251264
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
252265
NamespaceID: "namespace-id",
@@ -275,7 +288,7 @@ func TestProcessBackoffTask(t *testing.T) {
275288
ctrl := gomock.NewController(t)
276289
defer ctrl.Finish()
277290

278-
logger := log.NewNoopLogger()
291+
logger := log.NewTestLogger()
279292
timeSource := clock.NewEventTimeSource()
280293
timeSource.Update(time.Now())
281294

@@ -545,10 +558,41 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
545558
historyClient := tc.setupHistoryClient(t, ctrl)
546559

547560
// Setup logger and time source
548-
logger := log.NewNoopLogger()
561+
logger := log.NewTestLogger()
549562
timeSource := clock.NewEventTimeSource()
550563
timeSource.Update(time.Now())
551564

565+
// Create mock namespace registry
566+
nsRegistry := namespace.NewMockRegistry(ctrl)
567+
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
568+
569+
// Create mock engine and setup expectations
570+
mockEngine := chasm.NewMockEngine(ctrl)
571+
executor := &InvocationTaskExecutor{
572+
config: &Config{
573+
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
574+
RetryPolicy: func() backoff.RetryPolicy {
575+
return backoff.NewExponentialRetryPolicy(time.Second)
576+
},
577+
},
578+
namespaceRegistry: nsRegistry,
579+
metricsHandler: metrics.NoopMetricsHandler,
580+
logger: logger,
581+
historyClient: historyClient,
582+
chasmEngine: mockEngine,
583+
}
584+
585+
chasmRegistry := chasm.NewRegistry(logger)
586+
err := chasmRegistry.Register(&Library{
587+
InvocationTaskExecutor: executor,
588+
})
589+
require.NoError(t, err)
590+
err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{})
591+
require.NoError(t, err)
592+
593+
nodeBackend := &chasm.MockNodeBackend{}
594+
root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger)
595+
552596
// Create headers
553597
headers := nexus.Header{}
554598
if tc.headerValue != "" {
@@ -573,18 +617,18 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
573617
},
574618
}
575619

576-
// Set up the CompletionSource field
577-
completionGetter := &mockNexusCompletionGetter{
620+
// Set up the CompletionSource field to return our mock completion
621+
root.SetRootComponent(&mockNexusCompletionGetterComponent{
578622
completion: tc.completion,
579-
}
580-
setFieldValue(&callback.CompletionSource, CompletionSource(completionGetter))
581-
582-
// Create mock namespace registry
583-
nsRegistry := namespace.NewMockRegistry(ctrl)
584-
nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
623+
// Create callback in SCHEDULED state
624+
Callback: chasm.NewComponentField(
625+
chasm.NewMutableContext(context.Background(), root),
626+
callback,
627+
),
628+
})
629+
_, err = root.CloseTransaction()
630+
require.NoError(t, err)
585631

586-
// Create mock engine and setup expectations
587-
mockEngine := chasm.NewMockEngine(ctrl)
588632
mockEngine.EXPECT().ReadComponent(
589633
gomock.Any(),
590634
gomock.Any(),
@@ -633,20 +677,6 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
633677
return nil, err
634678
})
635679

636-
executor := InvocationTaskExecutor{
637-
config: &Config{
638-
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
639-
RetryPolicy: func() backoff.RetryPolicy {
640-
return backoff.NewExponentialRetryPolicy(time.Second)
641-
},
642-
},
643-
namespaceRegistry: nsRegistry,
644-
metricsHandler: metrics.NoopMetricsHandler,
645-
logger: logger,
646-
historyClient: historyClient,
647-
chasmEngine: mockEngine,
648-
}
649-
650680
// Create ComponentRef
651681
ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{
652682
NamespaceID: "namespace-id",

chasm/lib/scheduler/backfiller.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Backfiller struct {
1717

1818
*schedulerpb.BackfillerState
1919

20-
Scheduler chasm.Field[*Scheduler]
20+
Scheduler chasm.ParentPtr[*Scheduler]
2121
}
2222

2323
type BackfillRequestType int
@@ -39,7 +39,6 @@ func newBackfiller(
3939
BackfillId: id,
4040
LastProcessedTime: timestamppb.New(ctx.Now(scheduler)),
4141
},
42-
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
4342
}
4443

4544
// Immediately schedule the first backfiller task.

chasm/lib/scheduler/backfiller_tasks_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,10 @@ func (s *backfillerTasksSuite) TestBackfillTask_PartialFill() {
215215
}
216216

217217
func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
218-
sched := s.scheduler
219218
ctx := s.newMutableContext()
219+
schedComponent, err := s.node.Component(ctx, chasm.ComponentRef{})
220+
s.NoError(err)
221+
sched := schedComponent.(*scheduler.Scheduler)
220222
invoker := sched.Invoker.Get(ctx)
221223

222224
// Exactly one type of request can be set per Backfiller.
@@ -232,7 +234,7 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
232234
}
233235

234236
// Either type of request will spawn a Backfiller and schedule an immediate pure task.
235-
_, err := s.node.CloseTransaction()
237+
_, err = s.node.CloseTransaction()
236238
s.NoError(err)
237239

238240
// Run a backfill task.

chasm/lib/scheduler/generator.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,16 @@ type Generator struct {
1313

1414
*schedulerpb.GeneratorState
1515

16-
Scheduler chasm.Field[*Scheduler]
16+
Scheduler chasm.ParentPtr[*Scheduler]
1717
}
1818

1919
// NewGenerator returns an intialized Generator component, which should
2020
// be parented under a Scheduler root node.
21-
func NewGenerator(ctx chasm.MutableContext, scheduler *Scheduler, invoker *Invoker) *Generator {
21+
func NewGenerator(ctx chasm.MutableContext) *Generator {
2222
generator := &Generator{
2323
GeneratorState: &schedulerpb.GeneratorState{
2424
LastProcessedTime: nil,
2525
},
26-
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
2726
}
2827

2928
// Kick off initial generator run.

chasm/lib/scheduler/invoker.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Invoker struct {
1818

1919
*schedulerpb.InvokerState
2020

21-
Scheduler chasm.Field[*Scheduler]
21+
Scheduler chasm.ParentPtr[*Scheduler]
2222
}
2323

2424
func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
@@ -27,13 +27,12 @@ func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
2727

2828
// NewInvoker returns an intialized Invoker component, which should
2929
// be parented under a Scheduler root component.
30-
func NewInvoker(ctx chasm.MutableContext, scheduler *Scheduler) *Invoker {
30+
func NewInvoker(ctx chasm.MutableContext) *Invoker {
3131
return &Invoker{
3232
InvokerState: &schedulerpb.InvokerState{
3333
BufferedStarts: []*schedulespb.BufferedStart{},
3434
RequestIdToWorkflowId: make(map[string]string),
3535
},
36-
Scheduler: chasm.ComponentPointerTo(ctx, scheduler),
3736
}
3837
}
3938

chasm/lib/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ func NewScheduler(
9797
sched.Info.CreateTime = timestamppb.New(ctx.Now(sched))
9898
sched.Schedule.State = &schedulepb.ScheduleState{}
9999

100-
invoker := NewInvoker(ctx, sched)
100+
invoker := NewInvoker(ctx)
101101
sched.Invoker = chasm.NewComponentField(ctx, invoker)
102102

103-
generator := NewGenerator(ctx, sched, invoker)
103+
generator := NewGenerator(ctx)
104104
sched.Generator = chasm.NewComponentField(ctx, generator)
105105

106106
// Create backfillers to fulfill initialPatch.

0 commit comments

Comments
 (0)