Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions chasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ package chasm

import (
"context"
"reflect"
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"google.golang.org/protobuf/proto"
)

// NoValue is a sentinel type representing no value.
Expand Down Expand Up @@ -46,6 +52,54 @@ type Engine interface {
func(MutableContext, Component, any) error,
...TransitionOption,
) ([]byte, error)

ListExecutions(
context.Context,
reflect.Type,
*ListExecutionsRequest,
) (*ListExecutionsResponse[*commonpb.Payload], error)

CountExecutions(
context.Context,
reflect.Type,
*CountExecutionsRequest,
) (*CountExecutionsResponse, error)
}

type ListExecutionsResponse[M proto.Message] struct {
Executions []*ExecutionInfo[M]
NextPageToken []byte
}
type ExecutionInfo[M proto.Message] struct {
BusinessID string
RunID string
StartTime time.Time
CloseTime time.Time
HistoryLength int64
HistorySizeBytes int64
StateTransitionCount int64
ChasmSearchAttributes SearchAttributesMap
CustomSearchAttributes map[string]*commonpb.Payload
Memo *commonpb.Memo
ChasmMemo M
}

type ListExecutionsRequest struct {
NamespaceID string
NamespaceName string
Comment on lines +88 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to take in both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NamespaceID is what we store in persistence, and NamespaceName is needed for dynamic config and custom search attributes mapper.

Query string
PageSize int
NextPageToken []byte
}

type CountExecutionsRequest struct {
NamespaceID string
NamespaceName string
Query string
}

type CountExecutionsResponse struct {
Count int64
}

type BusinessIDReusePolicy int
Expand Down Expand Up @@ -304,3 +358,73 @@ func engineFromContext(
}
return e
}

// ListExecutions lists the executions of a CHASM archetype given an initial query.
// The query string can specify any combination of CHASM, custom, and predefined/system search attributes.
// The generic parameter C is the CHASM component type used for executions and search attribute filtering.
// The generic parameter M is the type of the memo payload to be unmarshaled from the execution.
// PageSize is required, must be greater than 0.
// NextPageToken is optional, set on subsequent requests to continue listing the next page of executions.
// Note: For CHASM executions, TemporalNamespaceDivision is the predefined search attribute
// that is used to identify the archetype of the execution.
// If the query string does not specify TemporalNamespaceDivision, the archetype C of the request will be used to filter the executions.
// If the initial query already specifies TemporalNamespaceDivision, the archetype C of the request will
// only be used to get the registered SearchAttributes.
func ListExecutions[C Component, M proto.Message](
ctx context.Context,
request *ListExecutionsRequest,
) (*ListExecutionsResponse[M], error) {
archetypeType := reflect.TypeFor[C]()
response, err := engineFromContext(ctx).ListExecutions(ctx, archetypeType, request)
if err != nil {
return nil, err
}

// Convert response, unmarshaling ChasmMemo to type M
executions := make([]*ExecutionInfo[M], len(response.Executions))
for i, execution := range response.Executions {
chasmMemoInterface := reflect.New(reflect.TypeFor[M]().Elem()).Interface()
chasmMemo, ok := chasmMemoInterface.(M)
if !ok {
return nil, serviceerror.NewInternalf("failed to cast chasm memo to type %s", reflect.TypeFor[M]().String())
}
err := proto.Unmarshal(execution.ChasmMemo.Data, chasmMemo)
if err != nil {
return nil, serviceerror.NewInternalf("failed to unmarshal chasm memo: %v", err)
}
executions[i] = &ExecutionInfo[M]{
BusinessID: execution.BusinessID,
RunID: execution.RunID,
StartTime: execution.StartTime,
CloseTime: execution.CloseTime,
HistoryLength: execution.HistoryLength,
HistorySizeBytes: execution.HistorySizeBytes,
StateTransitionCount: execution.StateTransitionCount,
ChasmSearchAttributes: execution.ChasmSearchAttributes,
CustomSearchAttributes: execution.CustomSearchAttributes,
Memo: execution.Memo,
ChasmMemo: chasmMemo,
}
}

return &ListExecutionsResponse[M]{
Executions: executions,
NextPageToken: response.NextPageToken,
}, nil
}

// CountExecutions counts the executions of a CHASM archetype given an initial query.
// The generic parameter C is the CHASM component type used for executions and search attribute filtering.
// The query string can specify any combination of CHASM, custom, and predefined/system search attributes.
// Note: For CHASM executions, TemporalNamespaceDivision is the predefined search attribute
// that is used to identify the archetype of the execution.
// If the query string does not specify TemporalNamespaceDivision, the archetype C of the request will be used to count the executions.
// If the initial query already specifies TemporalNamespaceDivision, the archetype C of the request will
// only be used to get the registered SearchAttributes.
func CountExecutions[C Component](
ctx context.Context,
request *CountExecutionsRequest,
) (*CountExecutionsResponse, error) {
archetypeType := reflect.TypeFor[C]()
return engineFromContext(ctx).CountExecutions(ctx, archetypeType, request)
}
31 changes: 31 additions & 0 deletions chasm/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions chasm/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ func (r *Registry) ComponentOf(componentGoType reflect.Type) (*RegistrableCompon
return r.componentOf(componentGoType)
}

func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
return r.componentByID(id)
}

func (r *Registry) TaskFor(taskInstance any) (*RegistrableTask, bool) {
return r.taskFor(taskInstance)
}
Expand Down
10 changes: 3 additions & 7 deletions chasm/lib/tests/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
"go.temporal.io/server/common"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -154,11 +155,6 @@ func (s *PayloadStore) SearchAttributes(
}

// Memo implements chasm.VisibilityMemoProvider interface
func (s *PayloadStore) Memo(
_ chasm.Context,
) map[string]chasm.VisibilityValue {
return map[string]chasm.VisibilityValue{
TotalCountMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalCount),
TotalSizeMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalSize),
}
func (s *PayloadStore) Memo(_ chasm.Context) proto.Message {
return s.State
}
4 changes: 2 additions & 2 deletions chasm/ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *componentRefSuite) TestArchetypeID() {
archetypeID, err := ref.ArchetypeID(s.registry)
s.NoError(err)

rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
s.True(ok)

s.Equal(rc.componentID, archetypeID)
Expand All @@ -68,7 +68,7 @@ func (s *componentRefSuite) TestShardingKey() {
shardingKey, err := ref.ShardingKey(s.registry)
s.NoError(err)

rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
s.True(ok)

s.Equal(rc.shardingFn(executionKey), shardingKey)
Expand Down
5 changes: 5 additions & 0 deletions chasm/registrable_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (rc *RegistrableComponent) registerToLibrary(
return fqn, rc.componentID, nil
}

// SearchAttributesMapper returns the search attributes mapper for this component.
func (rc *RegistrableComponent) SearchAttributesMapper() *VisibilitySearchAttributesMapper {
return rc.searchAttributesMapper
}

// fqType returns the fully qualified name of the component, which is a combination of
// the library name and the component type. This is used to uniquely identify
// the component in the registry.
Expand Down
20 changes: 20 additions & 0 deletions chasm/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ func (r *Registry) ComponentFqnByID(id uint32) (string, bool) {
return fqn, ok
}

// ComponentByID returns the registrable component for a given archetype ID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz also comment that those methods should not be used by chasm library developer. and also for RegistrableComponent.SearchAttributesMapper()

we probably need to create a separate interface for developer facing registry later.

func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
fqn, ok := r.componentFqnByID[id]
if !ok {
return nil, false
}
return r.component(fqn)
}

// ComponentIDFor converts registered component instance to component type ID.
// This method should only be used by CHASM framework internal code,
// NOT CHASM library developers.
Expand Down Expand Up @@ -136,6 +145,17 @@ func (r *Registry) componentOf(componentGoType reflect.Type) (*RegistrableCompon
return rc, ok
}

// ArchetypeIDOf returns the ArchetypeID for the given component Go type.
// This method should only be used by CHASM framework internal,
// NOT CHASM library developers.
func (r *Registry) ArchetypeIDOf(componentGoType reflect.Type) (ArchetypeID, bool) {
rc, ok := r.componentByGoType[componentGoType]
if !ok {
return UnspecifiedArchetypeID, false
}
return rc.componentID, true
}

func (r *Registry) taskOf(taskGoType reflect.Type) (*RegistrableTask, bool) {
rt, ok := r.taskByGoType[taskGoType]
return rt, ok
Expand Down
41 changes: 41 additions & 0 deletions chasm/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,47 @@ func (s *RegistryTestSuite) TestRegistry_RegisterComponents_Error() {
require.Contains(t, err.Error(), "must be struct or pointer to struct")
})

s.Run("duplicate search attribute alias panics", func() {
s.Require().PanicsWithValue("registrable component validation error: search attribute alias \"MyAlias\" is already defined",
func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("MyAlias", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeInt("MyAlias", chasm.SearchAttributeFieldInt01),
),
)
},
)
})

s.Run("duplicate search attribute field panics", func() {
s.Require().PanicsWithValue("registrable component validation error: search attribute field \"TemporalBool01\" is already defined",
func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("Alias1", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeBool("Alias2", chasm.SearchAttributeFieldBool01),
),
)
},
)
})

s.Run("valid search attributes do not panic", func() {
s.Require().NotPanics(func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("Completed", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeInt("Count", chasm.SearchAttributeFieldInt01),
chasm.NewSearchAttributeKeyword("Status", chasm.SearchAttributeFieldKeyword01),
),
)
})
})

}

func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Error() {
Expand Down
Loading
Loading