Skip to content

Commit e63973b

Browse files
committed
Separate Chasm ListExecutions interface, combine user and chasm memo, add support for legacy converters
1 parent 6998005 commit e63973b

File tree

72 files changed

+1265
-1127
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1265
-1127
lines changed

chasm/engine.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ package chasm
44

55
import (
66
"context"
7+
"reflect"
8+
"time"
9+
10+
commonpb "go.temporal.io/api/common/v1"
11+
"google.golang.org/protobuf/proto"
712
)
813

914
// NoValue is a sentinel type representing no value.
@@ -46,6 +51,83 @@ type Engine interface {
4651
func(MutableContext, Component, any) error,
4752
...TransitionOption,
4853
) ([]byte, error)
54+
55+
ListExecutions(
56+
context.Context,
57+
reflect.Type,
58+
*ListChasmExecutionsRequest,
59+
...ListChasmExecutionsOption,
60+
) (*ListChasmExecutionsResponse, error)
61+
62+
CountExecutions(
63+
context.Context,
64+
reflect.Type,
65+
*CountChasmExecutionsRequest,
66+
) (*CountChasmExecutionsResponse, error)
67+
}
68+
69+
type ChasmExecutionInfo struct {
70+
BusinessID string
71+
RunID string
72+
StartTime time.Time
73+
CloseTime time.Time
74+
HistoryLength int64
75+
HistorySizeBytes int64
76+
StateTransitionCount int64
77+
ChasmSearchAttributes SearchAttributesMap
78+
CustomSearchAttributes map[string]*commonpb.Payload
79+
Memo *commonpb.Memo
80+
ChasmMemo *commonpb.Payload
81+
}
82+
83+
type ListChasmExecutionsRequest struct {
84+
NamespaceID string
85+
NamespaceName string
86+
Query string
87+
}
88+
89+
type ListChasmExecutionsOptions struct {
90+
PageSize int
91+
NextPageToken []byte
92+
}
93+
94+
type ListChasmExecutionsOption func(*ListChasmExecutionsOptions)
95+
96+
func WithPagination(
97+
pageSize int,
98+
nextPageToken []byte,
99+
) ListChasmExecutionsOption {
100+
return func(req *ListChasmExecutionsOptions) {
101+
req.PageSize = pageSize
102+
req.NextPageToken = nextPageToken
103+
}
104+
}
105+
106+
type ListChasmExecutionsResponse struct {
107+
Executions []*ChasmExecutionInfo
108+
NextPageToken []byte
109+
}
110+
111+
type CountChasmExecutionsRequest struct {
112+
NamespaceID string
113+
NamespaceName string
114+
Query string
115+
}
116+
117+
type CountChasmExecutionsResponse struct {
118+
Count int64
119+
}
120+
121+
// TypedChasmRunInfo provides type-safe access to ChasmMemo
122+
type TypedChasmExecutionInfo[M proto.Message] struct {
123+
*ChasmExecutionInfo
124+
TypedChasmMemo M
125+
}
126+
127+
// TypedListChasmRunsResponse provides type-safe response with unmarshaled memos
128+
type TypedListChasmExecutionsResponse[M proto.Message] struct {
129+
Executions []*TypedChasmExecutionInfo[M]
130+
NextPageToken []byte
49131
}
50132

51133
type BusinessIDReusePolicy int
@@ -302,3 +384,44 @@ func engineFromContext(
302384
}
303385
return e
304386
}
387+
388+
func ListExecutions[C Component, M proto.Message](
389+
ctx context.Context,
390+
request *ListChasmExecutionsRequest,
391+
opts ...ListChasmExecutionsOption,
392+
) (*TypedListChasmExecutionsResponse[M], error) {
393+
archetypeType := reflect.TypeFor[C]()
394+
response, err := engineFromContext(ctx).ListExecutions(ctx, archetypeType, request, opts...)
395+
if err != nil {
396+
return nil, err
397+
}
398+
399+
// Convert response, unmarshaling ChasmMemo to type M
400+
typedExecutions := make([]*TypedChasmExecutionInfo[M], len(response.Executions))
401+
for i, execution := range response.Executions {
402+
var typedMemo M
403+
if len(execution.ChasmMemo.Data) > 0 {
404+
msg := reflect.New(reflect.TypeFor[M]()).Interface().(M)
405+
if err := proto.Unmarshal(execution.ChasmMemo.Data, msg); err == nil {
406+
typedMemo = msg
407+
}
408+
}
409+
typedExecutions[i] = &TypedChasmExecutionInfo[M]{
410+
ChasmExecutionInfo: execution,
411+
TypedChasmMemo: typedMemo,
412+
}
413+
}
414+
415+
return &TypedListChasmExecutionsResponse[M]{
416+
Executions: typedExecutions,
417+
NextPageToken: response.NextPageToken,
418+
}, nil
419+
}
420+
421+
func CountExecutions[C Component](
422+
ctx context.Context,
423+
request *CountChasmExecutionsRequest,
424+
) (*CountChasmExecutionsResponse, error) {
425+
archetypeType := reflect.TypeFor[C]()
426+
return engineFromContext(ctx).CountExecutions(ctx, archetypeType, request)
427+
}

chasm/engine_mock.go

Lines changed: 29 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/export_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ func (r *Registry) ComponentFor(componentInstance any) (*RegistrableComponent, b
1414
return r.componentFor(componentInstance)
1515
}
1616

17-
func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
18-
return r.componentByID(id)
19-
}
20-
2117
func (r *Registry) TaskFor(taskInstance any) (*RegistrableTask, bool) {
2218
return r.taskFor(taskInstance)
2319
}

chasm/lib/tests/payload.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"go.temporal.io/server/chasm"
77
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
88
"go.temporal.io/server/common"
9+
"google.golang.org/protobuf/proto"
910
"google.golang.org/protobuf/types/known/timestamppb"
1011
)
1112

@@ -157,11 +158,6 @@ func (s *PayloadStore) SearchAttributes(
157158
}
158159

159160
// Memo implements chasm.VisibilityMemoProvider interface
160-
func (s *PayloadStore) Memo(
161-
_ chasm.Context,
162-
) map[string]chasm.VisibilityValue {
163-
return map[string]chasm.VisibilityValue{
164-
TotalCountMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalCount),
165-
TotalSizeMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalSize),
166-
}
161+
func (s *PayloadStore) Memo(_ chasm.Context) proto.Message {
162+
return s.State
167163
}

chasm/ref.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ func (r *ComponentRef) ArchetypeID(
8585
return r.archetypeID, nil
8686
}
8787

88+
func GetArchetypeIDFromType(registry *Registry, archetypeType reflect.Type) (ArchetypeID, error) {
89+
rc, ok := registry.ComponentOf(archetypeType)
90+
if !ok {
91+
return 0, serviceerror.NewInternal("unknown chasm component type: " + archetypeType.String())
92+
}
93+
return rc.componentID, nil
94+
}
95+
8896
// ShardingKey returns the sharding key used for determining the shardID of the run
8997
// that contains the referenced component.
9098
// TODO: remove this method and ShardingKey concept, we don't need this functionality.

chasm/registry.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ func (r *Registry) ComponentFqnByID(id uint32) (string, bool) {
8181
return fqn, ok
8282
}
8383

84+
// ComponentByID returns the registrable component for a given archetype ID.
85+
func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
86+
fqn, ok := r.componentFqnByID[id]
87+
if !ok {
88+
return nil, false
89+
}
90+
return r.component(fqn)
91+
}
92+
8493
// ComponentIDFor converts registered component instance to component type ID.
8594
// This method should only be used by CHASM framework internal code,
8695
// NOT CHASM library developers.

chasm/test_component_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
commonpb "go.temporal.io/api/common/v1"
1111
enumspb "go.temporal.io/api/enums/v1"
1212
persistencespb "go.temporal.io/server/api/persistence/v1"
13+
"google.golang.org/protobuf/proto"
1314
)
1415

1516
type (
@@ -113,10 +114,8 @@ func (tc *TestComponent) SearchAttributes(_ Context) []SearchAttributeKeyValue {
113114
}
114115

115116
// Memo implements VisibilityMemoProvider interface.
116-
func (tc *TestComponent) Memo(_ Context) map[string]VisibilityValue {
117-
return map[string]VisibilityValue{
118-
TestComponentStartTimeMemoKey: VisibilityValueTime(tc.ComponentData.GetStartTime().AsTime()),
119-
}
117+
func (tc *TestComponent) Memo(_ Context) proto.Message {
118+
return tc.ComponentData
120119
}
121120

122121
func (tsc1 *TestSubComponent1) LifecycleState(_ Context) LifecycleState {

chasm/tree.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ type (
144144
// Root component's search attributes and memo at the start of a transaction.
145145
// They will be updated upon CloseTransaction() if they are changed.
146146
currentSA map[string]VisibilityValue
147-
currentMemo map[string]VisibilityValue
147+
currentMemo proto.Message
148148

149149
needsPointerResolution bool
150150
}
@@ -1473,7 +1473,7 @@ func (n *Node) closeTransactionForceUpdateVisibility(
14731473
memoProvider, ok := rootComponent.(VisibilityMemoProvider)
14741474
if ok {
14751475
newMemo := memoProvider.Memo(immutableContext)
1476-
if !maps.EqualFunc(n.currentMemo, newMemo, isVisibilityValueEqual) {
1476+
if !proto.Equal(n.currentMemo, newMemo) {
14771477
needUpdate = true
14781478
}
14791479
n.currentMemo = newMemo

chasm/visibility.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.temporal.io/api/serviceerror"
1111
persistencespb "go.temporal.io/server/api/persistence/v1"
1212
"go.temporal.io/server/common/payload"
13+
"google.golang.org/protobuf/proto"
1314
)
1415

1516
const (
@@ -35,7 +36,7 @@ type VisibilitySearchAttributesProvider interface {
3536
// a transaction, if a visibility task needs to be generated to update the
3637
// visibility record with the returned memo.
3738
type VisibilityMemoProvider interface {
38-
Memo(Context) map[string]VisibilityValue
39+
Memo(Context) proto.Message
3940
}
4041

4142
// VisibilitySearchAttributesMapper is a mapper for CHASM search attributes.

common/persistence/tests/visibility_persistence_suite.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
enumspb "go.temporal.io/api/enums/v1"
1212
workflowpb "go.temporal.io/api/workflow/v1"
1313
"go.temporal.io/api/workflowservice/v1"
14+
"go.temporal.io/server/chasm"
1415
"go.temporal.io/server/common/debug"
1516
"go.temporal.io/server/common/dynamicconfig"
1617
"go.temporal.io/server/common/log/tag"
@@ -23,6 +24,7 @@ import (
2324
"go.temporal.io/server/common/persistence/visibility/manager"
2425
"go.temporal.io/server/common/primitives/timestamp"
2526
"go.temporal.io/server/common/resolver"
27+
"go.temporal.io/server/common/searchattribute"
2628
"go.temporal.io/server/common/searchattribute/defs"
2729
"go.uber.org/mock/gomock"
2830
)
@@ -69,6 +71,7 @@ func (s *VisibilityPersistenceSuite) SetupSuite() {
6971
s.SearchAttributesProvider,
7072
s.SearchAttributesMapperProvider,
7173
s.NamespaceRegistry,
74+
chasm.NewRegistry(nil),
7275
dynamicconfig.GetIntPropertyFn(1000),
7376
dynamicconfig.GetIntPropertyFn(1000),
7477
dynamicconfig.GetFloatPropertyFn(0.2),
@@ -820,7 +823,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
820823
Memo: nil,
821824
SearchAttributes: &commonpb.SearchAttributes{
822825
IndexedFields: map[string]*commonpb.Payload{
823-
searchattribute.TemporalChangeVersion: temporalChangeVersionPayload,
826+
defs.TemporalChangeVersion: temporalChangeVersionPayload,
824827
},
825828
},
826829
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,

0 commit comments

Comments
 (0)