Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8088604
Implement CHASM List/Count Executions
awln-temporal Nov 21, 2025
e52bdbe
Address remaining comments
awln-temporal Nov 24, 2025
d89db79
Address remaining commments
awln-temporal Nov 24, 2025
660f97c
Update docs
awln-temporal Nov 24, 2025
c1190db
Update spacing after merge
awln-temporal Nov 24, 2025
65b4f4b
Update compilation issues
awln-temporal Nov 25, 2025
ab894de
Remove unnecessary test
awln-temporal Nov 25, 2025
e9d6d5b
Fix namespaceID filtering
awln-temporal Nov 25, 2025
8e1521b
Merge branch 'main' into CHASMVisibilityQuery
awln-temporal Nov 25, 2025
5f6c3fb
Address previous comment
awln-temporal Nov 25, 2025
954e13c
Fix combined name type map initialization
awln-temporal Nov 25, 2025
82c0e1b
Remove debug code
awln-temporal Nov 25, 2025
abef51c
Update chasm/search_attribute.go
awln-temporal Nov 25, 2025
e15a190
Address remaining comments
awln-temporal Nov 25, 2025
dff1a76
Rename search attribute map getter
awln-temporal Nov 25, 2025
dd33b53
Fix linter issue
awln-temporal Nov 25, 2025
3088e1c
Add Value method to VisibilityValue
awln-temporal Nov 26, 2025
817768d
Remove unused method
awln-temporal Nov 26, 2025
c1a8dc3
Set final type map in values interceptor initialization
awln-temporal Nov 26, 2025
34be577
Refactor visibility manager and store boundaries, add functional test…
awln-temporal Nov 26, 2025
f9c8803
Merge branch 'main' into CHASMVisibilityQuery
awln-temporal Nov 26, 2025
441eb3f
Remove silencing of deregistered search attributes, rename vars to be…
awln-temporal Nov 26, 2025
6464827
Remove unnecessary code
awln-temporal Nov 26, 2025
a608da9
add aliasing for workflow list/gets to manager
awln-temporal Nov 26, 2025
1853784
update unit tests
awln-temporal Nov 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions chasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ package chasm

import (
"context"
"reflect"
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"google.golang.org/protobuf/proto"
)

// NoValue is a sentinel type representing no value.
Expand Down Expand Up @@ -46,6 +52,54 @@ type Engine interface {
func(MutableContext, Component, any) error,
...TransitionOption,
) ([]byte, error)

ListExecutions(
context.Context,
reflect.Type,
*ListExecutionsRequest,
) (*ListExecutionsResponse[*commonpb.Payload], error)

CountExecutions(
context.Context,
reflect.Type,
*CountExecutionsRequest,
) (*CountExecutionsResponse, error)
}

type ListExecutionsResponse[M proto.Message] struct {
Executions []*ExecutionInfo[M]
NextPageToken []byte
}
type ExecutionInfo[M proto.Message] struct {
BusinessID string
RunID string
StartTime time.Time
CloseTime time.Time
HistoryLength int64
HistorySizeBytes int64
StateTransitionCount int64
ChasmSearchAttributes SearchAttributesMap
CustomSearchAttributes map[string]*commonpb.Payload
Memo *commonpb.Memo
ChasmMemo M
}

type ListExecutionsRequest struct {
NamespaceID string
NamespaceName string
Comment on lines +89 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to take in both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NamespaceID is what we store in persistence, and NamespaceName is needed for dynamic config and custom search attributes mapper.

Query string
PageSize int
NextPageToken []byte
}

type CountExecutionsRequest struct {
NamespaceID string
NamespaceName string
Query string
}

type CountExecutionsResponse struct {
Count int64
}

type BusinessIDReusePolicy int
Expand Down Expand Up @@ -304,3 +358,71 @@ func engineFromContext(
}
return e
}

// ListExecutions lists the executions of a CHASM archetype given an initial query.
// The query string can specify any combination of CHASM, custom, and predefined/system search attributes.
// The generic parameter C is the CHASM component type used for executions and search attribute filtering.
// The generic parameter M is the type of the memo payload to be unmarshaled from the execution.
// PageSize is required, must be greater than 0.
// NextPageToken is optional, set on subsequent requests to continue listing the next page of executions.
// Note: For CHASM executions, TemporalNamespaceDivision is the predefined search attribute
// that is used to identify the archetype of the execution.
// If the query string does not specify TemporalNamespaceDivision, the archetype C of the request will be used to filter the executions.
// If the initial query already specifies TemporalNamespaceDivision, the archetype C of the request will
// only be used to get the registered SearchAttributes.
func ListExecutions[C Component, M proto.Message](
ctx context.Context,
request *ListExecutionsRequest,
) (*ListExecutionsResponse[M], error) {
archetypeType := reflect.TypeFor[C]()
response, err := engineFromContext(ctx).ListExecutions(ctx, archetypeType, request)
if err != nil {
return nil, err
}

// Convert response, unmarshaling ChasmMemo to type M
executions := make([]*ExecutionInfo[M], len(response.Executions))
for i, execution := range response.Executions {
chasmMemoInterface := reflect.New(reflect.TypeFor[M]().Elem()).Interface()
chasmMemo, ok := chasmMemoInterface.(M)
if !ok {
return nil, serviceerror.NewInternalf("failed to cast chasm memo to type %s", reflect.TypeFor[M]().String())
}
if err := proto.Unmarshal(execution.ChasmMemo.Data, chasmMemo); err == nil {
executions[i] = &ExecutionInfo[M]{
BusinessID: execution.BusinessID,
RunID: execution.RunID,
StartTime: execution.StartTime,
CloseTime: execution.CloseTime,
HistoryLength: execution.HistoryLength,
HistorySizeBytes: execution.HistorySizeBytes,
StateTransitionCount: execution.StateTransitionCount,
ChasmSearchAttributes: execution.ChasmSearchAttributes,
CustomSearchAttributes: execution.CustomSearchAttributes,
Memo: execution.Memo,
ChasmMemo: chasmMemo,
}
}
}

return &ListExecutionsResponse[M]{
Executions: executions,
NextPageToken: response.NextPageToken,
}, nil
}

// CountExecutions counts the executions of a CHASM archetype given an initial query.
// The generic parameter C is the CHASM component type used for executions and search attribute filtering.
// The query string can specify any combination of CHASM, custom, and predefined/system search attributes.
// Note: For CHASM executions, TemporalNamespaceDivision is the predefined search attribute
// that is used to identify the archetype of the execution.
// If the query string does not specify TemporalNamespaceDivision, the archetype C of the request will be used to count the executions.
// If the initial query already specifies TemporalNamespaceDivision, the archetype C of the request will
// only be used to get the registered SearchAttributes.
func CountExecutions[C Component](
ctx context.Context,
request *CountExecutionsRequest,
) (*CountExecutionsResponse, error) {
archetypeType := reflect.TypeFor[C]()
return engineFromContext(ctx).CountExecutions(ctx, archetypeType, request)
}
31 changes: 31 additions & 0 deletions chasm/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions chasm/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ func (r *Registry) ComponentOf(componentGoType reflect.Type) (*RegistrableCompon
return r.componentOf(componentGoType)
}

func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
return r.componentByID(id)
}

func (r *Registry) TaskFor(taskInstance any) (*RegistrableTask, bool) {
return r.taskFor(taskInstance)
}
Expand Down
10 changes: 3 additions & 7 deletions chasm/lib/tests/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
"go.temporal.io/server/common"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -154,11 +155,6 @@ func (s *PayloadStore) SearchAttributes(
}

// Memo implements chasm.VisibilityMemoProvider interface
func (s *PayloadStore) Memo(
_ chasm.Context,
) map[string]chasm.VisibilityValue {
return map[string]chasm.VisibilityValue{
TotalCountMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalCount),
TotalSizeMemoFieldName: chasm.VisibilityValueInt64(s.State.TotalSize),
}
func (s *PayloadStore) Memo(_ chasm.Context) proto.Message {
return s.State
}
4 changes: 2 additions & 2 deletions chasm/ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *componentRefSuite) TestArchetypeID() {
archetypeID, err := ref.ArchetypeID(s.registry)
s.NoError(err)

rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
s.True(ok)

s.Equal(rc.componentID, archetypeID)
Expand All @@ -68,7 +68,7 @@ func (s *componentRefSuite) TestShardingKey() {
shardingKey, err := ref.ShardingKey(s.registry)
s.NoError(err)

rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
s.True(ok)

s.Equal(rc.shardingFn(executionKey), shardingKey)
Expand Down
5 changes: 5 additions & 0 deletions chasm/registrable_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (rc *RegistrableComponent) registerToLibrary(
return fqn, rc.componentID, nil
}

// SearchAttributesMapper returns the search attributes mapper for this component.
func (rc *RegistrableComponent) SearchAttributesMapper() *VisibilitySearchAttributesMapper {
return rc.searchAttributesMapper
}

// fqType returns the fully qualified name of the component, which is a combination of
// the library name and the component type. This is used to uniquely identify
// the component in the registry.
Expand Down
20 changes: 20 additions & 0 deletions chasm/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ func (r *Registry) ComponentFqnByID(id uint32) (string, bool) {
return fqn, ok
}

// ComponentByID returns the registrable component for a given archetype ID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz also comment that those methods should not be used by chasm library developer. and also for RegistrableComponent.SearchAttributesMapper()

we probably need to create a separate interface for developer facing registry later.

func (r *Registry) ComponentByID(id uint32) (*RegistrableComponent, bool) {
fqn, ok := r.componentFqnByID[id]
if !ok {
return nil, false
}
return r.component(fqn)
}

// ComponentIDFor converts registered component instance to component type ID.
// This method should only be used by CHASM framework internal code,
// NOT CHASM library developers.
Expand Down Expand Up @@ -136,6 +145,17 @@ func (r *Registry) componentOf(componentGoType reflect.Type) (*RegistrableCompon
return rc, ok
}

// ArchetypeIDOf returns the ArchetypeID for the given component Go type.
// This method should only be used by CHASM framework internal,
// NOT CHASM library developers.
func (r *Registry) ArchetypeIDOf(componentGoType reflect.Type) (ArchetypeID, bool) {
rc, ok := r.componentByGoType[componentGoType]
if !ok {
return UnspecifiedArchetypeID, false
}
return rc.componentID, true
}

func (r *Registry) taskOf(taskGoType reflect.Type) (*RegistrableTask, bool) {
rt, ok := r.taskByGoType[taskGoType]
return rt, ok
Expand Down
41 changes: 41 additions & 0 deletions chasm/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,47 @@ func (s *RegistryTestSuite) TestRegistry_RegisterComponents_Error() {
require.Contains(t, err.Error(), "must be struct or pointer to struct")
})

s.Run("duplicate search attribute alias panics", func() {
s.Require().PanicsWithValue("registrable component validation error: search attribute alias \"MyAlias\" is already defined",
func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("MyAlias", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeInt("MyAlias", chasm.SearchAttributeFieldInt01),
),
)
},
)
})

s.Run("duplicate search attribute field panics", func() {
s.Require().PanicsWithValue("registrable component validation error: search attribute field \"TemporalBool01\" is already defined",
func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("Alias1", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeBool("Alias2", chasm.SearchAttributeFieldBool01),
),
)
},
)
})

s.Run("valid search attributes do not panic", func() {
s.Require().NotPanics(func() {
chasm.NewRegistrableComponent[*chasm.MockComponent](
"Component1",
chasm.WithSearchAttributes(
chasm.NewSearchAttributeBool("Completed", chasm.SearchAttributeFieldBool01),
chasm.NewSearchAttributeInt("Count", chasm.SearchAttributeFieldInt01),
chasm.NewSearchAttributeKeyword("Status", chasm.SearchAttributeFieldKeyword01),
),
)
})
})

}

func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Error() {
Expand Down
Loading
Loading