Skip to content

Commit b1d936f

Browse files
committed
Subscribe to notifications at component level, not execution level
1 parent 24ef87c commit b1d936f

File tree

4 files changed

+65
-36
lines changed

4 files changed

+65
-36
lines changed

chasm/ref.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chasm
22

33
import (
44
"reflect"
5+
"strings"
56

67
"go.temporal.io/api/serviceerror"
78
persistencespb "go.temporal.io/server/api/persistence/v1"
@@ -21,6 +22,13 @@ type EntityKey struct {
2122
EntityID string
2223
}
2324

25+
// ComponentKey uniquely identifies a CHASM component in the system.
26+
// TODO(dan): include componentInitialVT
27+
type ComponentKey struct {
28+
EntityKey
29+
Path string
30+
}
31+
2432
type ComponentRef struct {
2533
EntityKey
2634

@@ -105,6 +113,15 @@ func (r *ComponentRef) ShardingKey(
105113
return rc.shardingFn(r.EntityKey), nil
106114
}
107115

116+
// ComponentKey returns the component key for the referenced component.
117+
func (r *ComponentRef) ComponentKey() ComponentKey {
118+
return ComponentKey{
119+
EntityKey: r.EntityKey,
120+
// TODO(dan): we would need to use a separator that is guaranteed not present in the path
121+
Path: strings.Join(r.componentPath, "/"),
122+
}
123+
}
124+
108125
func (r *ComponentRef) Serialize(
109126
registry *Registry,
110127
) ([]byte, error) {

service/history/chasm_engine.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (e *ChasmEngine) UpdateComponent(
236236
}
237237

238238
e.notifier.Notify(&ChasmComponentNotification{
239-
Key: ref.EntityKey,
239+
Key: ref.ComponentKey(),
240240
Ref: newSerializedRef,
241241
})
242242

@@ -330,12 +330,13 @@ func (e *ChasmEngine) PollComponent(
330330

331331
// Wait condition not satisfied; long-poll
332332

333-
channel, subscriberID, err := e.notifier.Subscribe(requestRef.EntityKey)
333+
componentKey := requestRef.ComponentKey()
334+
channel, subscriberID, err := e.notifier.Subscribe(componentKey)
334335
if err != nil {
335336
return nil, err
336337
}
337338
defer func() {
338-
_ = e.notifier.Unsubscribe(requestRef.EntityKey, subscriberID)
339+
_ = e.notifier.Unsubscribe(componentKey, subscriberID)
339340
}()
340341

341342
// Release the lock, now that we are subscribed

service/history/chasm_notifier.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type (
3333
// ChasmComponentNotification is a notification relating to a CHASM component.
3434
ChasmComponentNotification struct {
3535
// TODO(dan): confirm that we want Key in addition to the key in serialized ref
36-
Key chasm.EntityKey
36+
Key chasm.ComponentKey
3737
Ref []byte
3838
timestamp time.Time
3939
}
@@ -53,11 +53,15 @@ func NewChasmNotifier(
5353
stopCh: make(chan bool),
5454
notificationsCh: make(chan *ChasmComponentNotification, 1000),
5555
subscribers: collection.NewShardedConcurrentTxMap(1024, func(key any) uint32 {
56-
executionKey, ok := key.(chasm.EntityKey)
56+
componentKey, ok := key.(chasm.ComponentKey)
5757
if !ok {
5858
return 0
5959
}
60-
return farm.Fingerprint32([]byte(executionKey.NamespaceID + "_" + executionKey.BusinessID))
60+
// TODO(dan): accesses to a map shard are serialized. Workflow uses (namespace,
61+
// workflowID) here, hence different runs of the same execution map to the same map
62+
// shard. Do we want to include the component path & initialVT in the hash key in order
63+
// to improve concurrent access to components within the same execution?
64+
return farm.Fingerprint32([]byte(componentKey.NamespaceID + "_" + componentKey.BusinessID + "_" + componentKey.Path))
6165
}),
6266
}
6367
}
@@ -83,11 +87,9 @@ func (n *ChasmNotifier) Notify(notification *ChasmComponentNotification) {
8387
n.enqueue(notification)
8488
}
8589

86-
// Subscribe returns a channel that will receive notifications relating to the execution, along with
90+
// Subscribe returns a channel that will receive notifications relating to the component, along with
8791
// a subscriber ID that can be passed to UnsubscribeNotification.
88-
//
89-
// TODO(dan): support subscribing to notifications for a specific component only?
90-
func (n *ChasmNotifier) Subscribe(key chasm.EntityKey) (chan *ChasmComponentNotification, string, error) {
92+
func (n *ChasmNotifier) Subscribe(key chasm.ComponentKey) (chan *ChasmComponentNotification, string, error) {
9193
channel := make(chan *ChasmComponentNotification, 1)
9294
subscriberID := uuid.NewString()
9395

@@ -114,8 +116,8 @@ func (n *ChasmNotifier) Subscribe(key chasm.EntityKey) (chan *ChasmComponentNoti
114116
return channel, subscriberID, nil
115117
}
116118

117-
// Unsubscribe unsubscribes the subscriber from notifications relating to the execution.
118-
func (n *ChasmNotifier) Unsubscribe(key chasm.EntityKey, subscriberID string) error {
119+
// Unsubscribe unsubscribes the subscriber from notifications relating to the component.
120+
func (n *ChasmNotifier) Unsubscribe(key chasm.ComponentKey, subscriberID string) error {
119121
success := true
120122
n.subscribers.RemoveIf(key, func(key any, value any) bool {
121123
subscribers := value.(map[string]chan *ChasmComponentNotification)

service/history/chasm_notifier_test.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) {
2424
notifier.Start()
2525
defer notifier.Stop()
2626

27-
entityKey := chasm.EntityKey{
28-
NamespaceID: tv.NamespaceID().String(),
29-
BusinessID: tv.WorkflowID(),
30-
EntityID: tv.RunID(),
27+
componentKey := chasm.ComponentKey{
28+
EntityKey: chasm.EntityKey{
29+
NamespaceID: tv.NamespaceID().String(),
30+
BusinessID: tv.WorkflowID(),
31+
EntityID: tv.RunID(),
32+
},
33+
Path: "",
3134
}
3235

3336
// Multiple subscribers
@@ -38,7 +41,7 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) {
3841
}, subscriberCount)
3942

4043
for i := range subscriberCount {
41-
ch, id, err := notifier.Subscribe(entityKey)
44+
ch, id, err := notifier.Subscribe(componentKey)
4245
require.NoError(t, err)
4346
subscribers[i].channel = ch
4447
subscribers[i].id = id
@@ -47,7 +50,7 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) {
4750
// Single notification
4851
expectedRef := []byte("test-ref")
4952
notifier.Notify(&ChasmComponentNotification{
50-
Key: entityKey,
53+
Key: componentKey,
5154
Ref: expectedRef,
5255
})
5356

@@ -56,15 +59,15 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) {
5659
select {
5760
case received := <-sub.channel:
5861
require.NotNil(t, received, "subscriber %d", i)
59-
require.Equal(t, entityKey, received.Key, "subscriber %d", i)
62+
require.Equal(t, componentKey, received.Key, "subscriber %d", i)
6063
require.Equal(t, expectedRef, received.Ref, "subscriber %d", i)
6164
case <-time.After(time.Second):
6265
t.Fatalf("subscriber %d: timeout waiting for notification", i)
6366
}
6467
}
6568

6669
for _, sub := range subscribers {
67-
err := notifier.Unsubscribe(entityKey, sub.id)
70+
err := notifier.Unsubscribe(componentKey, sub.id)
6871
require.NoError(t, err)
6972
}
7073
}
@@ -80,21 +83,24 @@ func TestChasmNotifier_KeyIsolation(t *testing.T) {
8083
notifier.Start()
8184
defer notifier.Stop()
8285

83-
entityKey1 := chasm.EntityKey{
86+
entityKey := chasm.EntityKey{
8487
NamespaceID: tv.NamespaceID().String(),
8588
BusinessID: tv.WorkflowID(),
8689
EntityID: tv.RunID(),
8790
}
88-
entityKey2 := chasm.EntityKey{
89-
NamespaceID: "different-namespace-id",
90-
BusinessID: "different-workflow-id",
91-
EntityID: "different-run-id",
91+
componentKey1 := chasm.ComponentKey{
92+
EntityKey: entityKey,
93+
Path: "component1",
94+
}
95+
componentKey2 := chasm.ComponentKey{
96+
EntityKey: entityKey,
97+
Path: "component2",
9298
}
9399

94-
channel, subscriberID, err := notifier.Subscribe(entityKey1)
100+
channel, subscriberID, err := notifier.Subscribe(componentKey1)
95101
require.NoError(t, err)
96102
notifier.Notify(&ChasmComponentNotification{
97-
Key: entityKey2,
103+
Key: componentKey2,
98104
Ref: []byte("wrong-entity"),
99105
})
100106
select {
@@ -103,7 +109,7 @@ func TestChasmNotifier_KeyIsolation(t *testing.T) {
103109
case <-time.After(50 * time.Millisecond):
104110
}
105111

106-
err = notifier.Unsubscribe(entityKey1, subscriberID)
112+
err = notifier.Unsubscribe(componentKey1, subscriberID)
107113
require.NoError(t, err)
108114
}
109115

@@ -118,17 +124,20 @@ func TestChasmNotifier_UnsubscribeStopsDelivery(t *testing.T) {
118124
notifier.Start()
119125
defer notifier.Stop()
120126

121-
entityKey := chasm.EntityKey{
122-
NamespaceID: tv.NamespaceID().String(),
123-
BusinessID: tv.WorkflowID(),
124-
EntityID: tv.RunID(),
127+
componentKey := chasm.ComponentKey{
128+
EntityKey: chasm.EntityKey{
129+
NamespaceID: tv.NamespaceID().String(),
130+
BusinessID: tv.WorkflowID(),
131+
EntityID: tv.RunID(),
132+
},
133+
Path: "",
125134
}
126135

127136
// First notification should arrive
128-
channel, subscriberID, err := notifier.Subscribe(entityKey)
137+
channel, subscriberID, err := notifier.Subscribe(componentKey)
129138
require.NoError(t, err)
130139
notifier.Notify(&ChasmComponentNotification{
131-
Key: entityKey,
140+
Key: componentKey,
132141
Ref: []byte("before-unsubscribe"),
133142
})
134143
select {
@@ -139,10 +148,10 @@ func TestChasmNotifier_UnsubscribeStopsDelivery(t *testing.T) {
139148
}
140149

141150
// Notification after unsubscribe should not arrive
142-
err = notifier.Unsubscribe(entityKey, subscriberID)
151+
err = notifier.Unsubscribe(componentKey, subscriberID)
143152
require.NoError(t, err)
144153
notifier.Notify(&ChasmComponentNotification{
145-
Key: entityKey,
154+
Key: componentKey,
146155
Ref: []byte("after-unsubscribe"),
147156
})
148157
select {

0 commit comments

Comments
 (0)