Skip to content

Commit 23c1374

Browse files
committed
Implement CHASM List/Count Executions
1 parent a288d1e commit 23c1374

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2897
-250
lines changed

chasm/engine.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ package chasm
44

55
import (
66
"context"
7+
"reflect"
8+
"time"
9+
10+
commonpb "go.temporal.io/api/common/v1"
11+
"google.golang.org/protobuf/proto"
712
)
813

914
// NoValue is a sentinel type representing no value.
@@ -46,6 +51,59 @@ type Engine interface {
4651
func(MutableContext, Component, any) error,
4752
...TransitionOption,
4853
) ([]byte, error)
54+
55+
ListExecutions(
56+
context.Context,
57+
reflect.Type,
58+
*ListExecutionsRequest,
59+
) (*ListExecutionsResponse[*commonpb.Payload], error)
60+
61+
CountExecutions(
62+
context.Context,
63+
reflect.Type,
64+
*CountExecutionsRequest,
65+
) (*CountExecutionsResponse, error)
66+
}
67+
68+
type ListExecutionsResponse[M proto.Message] struct {
69+
Executions []*ExecutionInfo[M]
70+
NextPageToken []byte
71+
}
72+
type ExecutionInfo[M proto.Message] struct {
73+
BusinessID string
74+
RunID string
75+
StartTime time.Time
76+
CloseTime time.Time
77+
HistoryLength int64
78+
HistorySizeBytes int64
79+
StateTransitionCount int64
80+
ChasmSearchAttributes SearchAttributesMap
81+
CustomSearchAttributes map[string]*commonpb.Payload
82+
Memo *commonpb.Memo
83+
ChasmMemo M
84+
}
85+
86+
type ListExecutionsRequest struct {
87+
NamespaceID string
88+
NamespaceName string
89+
Query string
90+
PageSize int
91+
NextPageToken []byte
92+
}
93+
94+
type ListExecutionsOptions struct {
95+
PageSize int
96+
NextPageToken []byte
97+
}
98+
99+
type CountExecutionsRequest struct {
100+
NamespaceID string
101+
NamespaceName string
102+
Query string
103+
}
104+
105+
type CountExecutionsResponse struct {
106+
Count int64
49107
}
50108

51109
type BusinessIDReusePolicy int
@@ -304,3 +362,48 @@ func engineFromContext(
304362
}
305363
return e
306364
}
365+
366+
func ListExecutions[C Component, M proto.Message](
367+
ctx context.Context,
368+
request *ListExecutionsRequest,
369+
) (*ListExecutionsResponse[M], error) {
370+
archetypeType := reflect.TypeFor[C]()
371+
response, err := engineFromContext(ctx).ListExecutions(ctx, archetypeType, request)
372+
if err != nil {
373+
return nil, err
374+
}
375+
376+
// Convert response, unmarshaling ChasmMemo to type M
377+
executions := make([]*ExecutionInfo[M], len(response.Executions))
378+
for i, execution := range response.Executions {
379+
chasmMemo := reflect.New(reflect.TypeFor[M]().Elem()).Interface().(M)
380+
if err := proto.Unmarshal(execution.ChasmMemo.Data, chasmMemo); err == nil {
381+
executions[i] = &ExecutionInfo[M]{
382+
BusinessID: execution.BusinessID,
383+
RunID: execution.RunID,
384+
StartTime: execution.StartTime,
385+
CloseTime: execution.CloseTime,
386+
HistoryLength: execution.HistoryLength,
387+
HistorySizeBytes: execution.HistorySizeBytes,
388+
StateTransitionCount: execution.StateTransitionCount,
389+
ChasmSearchAttributes: execution.ChasmSearchAttributes,
390+
CustomSearchAttributes: execution.CustomSearchAttributes,
391+
Memo: execution.Memo,
392+
ChasmMemo: chasmMemo,
393+
}
394+
}
395+
}
396+
397+
return &ListExecutionsResponse[M]{
398+
Executions: executions,
399+
NextPageToken: response.NextPageToken,
400+
}, nil
401+
}
402+
403+
func CountExecutions[C Component](
404+
ctx context.Context,
405+
request *CountExecutionsRequest,
406+
) (*CountExecutionsResponse, error) {
407+
archetypeType := reflect.TypeFor[C]()
408+
return engineFromContext(ctx).CountExecutions(ctx, archetypeType, request)
409+
}

chasm/engine_mock.go

Lines changed: 31 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/export_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ func (r *Registry) ComponentOf(componentGoType reflect.Type) (*RegistrableCompon
2121
return r.componentOf(componentGoType)
2222
}
2323

24-
func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
25-
return r.componentByID(id)
26-
}
27-
2824
func (r *Registry) TaskFor(taskInstance any) (*RegistrableTask, bool) {
2925
return r.taskFor(taskInstance)
3026
}

chasm/lib/tests/payload.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"go.temporal.io/server/chasm"
77
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
88
"go.temporal.io/server/common"
9+
"google.golang.org/protobuf/proto"
910
"google.golang.org/protobuf/types/known/timestamppb"
1011
)
1112

@@ -154,11 +155,6 @@ func (s *PayloadStore) SearchAttributes(
154155
}
155156

156157
// Memo implements chasm.VisibilityMemoProvider interface
157-
func (s *PayloadStore) Memo(
158-
_ chasm.Context,
159-
) map[string]chasm.VisibilityValue {
160-
return map[string]chasm.VisibilityValue{
161-
TotalCountMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalCount),
162-
TotalSizeMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalSize),
163-
}
158+
func (s *PayloadStore) Memo(_ chasm.Context) proto.Message {
159+
return s.State
164160
}

chasm/ref_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (s *componentRefSuite) TestArchetypeID() {
5151
archetypeID, err := ref.ArchetypeID(s.registry)
5252
s.NoError(err)
5353

54-
rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
54+
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
5555
s.True(ok)
5656

5757
s.Equal(rc.componentID, archetypeID)
@@ -68,7 +68,7 @@ func (s *componentRefSuite) TestShardingKey() {
6868
shardingKey, err := ref.ShardingKey(s.registry)
6969
s.NoError(err)
7070

71-
rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
71+
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
7272
s.True(ok)
7373

7474
s.Equal(rc.shardingFn(entityKey), shardingKey)

chasm/registrable_component.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ func (rc *RegistrableComponent) registerToLibrary(
115115
return fqn, rc.componentID, nil
116116
}
117117

118+
// SearchAttributesMapper returns the search attributes mapper for this component.
119+
func (rc *RegistrableComponent) SearchAttributesMapper() *VisibilitySearchAttributesMapper {
120+
return rc.searchAttributesMapper
121+
}
122+
118123
// fqType returns the fully qualified name of the component, which is a combination of
119124
// the library name and the component type. This is used to uniquely identify
120125
// the component in the registry.

chasm/registry.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ func (r *Registry) ComponentFqnByID(id uint32) (string, bool) {
8181
return fqn, ok
8282
}
8383

84+
// ComponentByID returns the registrable component for a given archetype ID.
85+
func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
86+
fqn, ok := r.componentFqnByID[id]
87+
if !ok {
88+
return nil, false
89+
}
90+
return r.component(fqn)
91+
}
92+
8493
// ComponentIDFor converts registered component instance to component type ID.
8594
// This method should only be used by CHASM framework internal code,
8695
// NOT CHASM library developers.
@@ -136,6 +145,15 @@ func (r *Registry) componentOf(componentGoType reflect.Type) (*RegistrableCompon
136145
return rc, ok
137146
}
138147

148+
// ArchetypeIDOf returns the ArchetypeID for the given component Go type.
149+
func (r *Registry) ArchetypeIDOf(componentGoType reflect.Type) (ArchetypeID, bool) {
150+
rc, ok := r.componentByGoType[componentGoType]
151+
if !ok {
152+
return UnspecifiedArchetypeID, false
153+
}
154+
return rc.componentID, true
155+
}
156+
139157
func (r *Registry) taskOf(taskGoType reflect.Type) (*RegistrableTask, bool) {
140158
rt, ok := r.taskByGoType[taskGoType]
141159
return rt, ok

0 commit comments

Comments
 (0)