Skip to content

Commit d6f85df

Browse files
authored
Persistence context part 5: persistence store interfaces (#2659)
1 parent 642d64a commit d6f85df

29 files changed

+1188
-762
lines changed

common/persistence/cassandra/cluster_metadata_store.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package cassandra
2626

2727
import (
28+
"context"
2829
"net"
2930
"strings"
3031
"time"
@@ -86,6 +87,7 @@ func NewClusterMetadataStore(
8687
}
8788

8889
func (m *ClusterMetadataStore) ListClusterMetadata(
90+
_ context.Context,
8991
request *p.InternalListClusterMetadataRequest,
9092
) (*p.InternalListClusterMetadataResponse, error) {
9193
query := m.session.Query(templateListClusterMetadata, constMetadataPartition)
@@ -122,6 +124,7 @@ func (m *ClusterMetadataStore) ListClusterMetadata(
122124
}
123125

124126
func (m *ClusterMetadataStore) GetClusterMetadata(
127+
_ context.Context,
125128
request *p.InternalGetClusterMetadataRequest,
126129
) (*p.InternalGetClusterMetadataResponse, error) {
127130

@@ -141,7 +144,10 @@ func (m *ClusterMetadataStore) GetClusterMetadata(
141144
}, nil
142145
}
143146

144-
func (m *ClusterMetadataStore) SaveClusterMetadata(request *p.InternalSaveClusterMetadataRequest) (bool, error) {
147+
func (m *ClusterMetadataStore) SaveClusterMetadata(
148+
_ context.Context,
149+
request *p.InternalSaveClusterMetadataRequest,
150+
) (bool, error) {
145151
var query gocql.Query
146152
if request.Version == 0 {
147153
query = m.session.Query(
@@ -175,15 +181,21 @@ func (m *ClusterMetadataStore) SaveClusterMetadata(request *p.InternalSaveCluste
175181
return true, nil
176182
}
177183

178-
func (m *ClusterMetadataStore) DeleteClusterMetadata(request *p.InternalDeleteClusterMetadataRequest) error {
184+
func (m *ClusterMetadataStore) DeleteClusterMetadata(
185+
_ context.Context,
186+
request *p.InternalDeleteClusterMetadataRequest,
187+
) error {
179188
query := m.session.Query(templateDeleteClusterMetadata, constMetadataPartition, request.ClusterName)
180189
if err := query.Exec(); err != nil {
181190
return gocql.ConvertError("DeleteClusterMetadata", err)
182191
}
183192
return nil
184193
}
185194

186-
func (m *ClusterMetadataStore) GetClusterMembers(request *p.GetClusterMembersRequest) (*p.GetClusterMembersResponse, error) {
195+
func (m *ClusterMetadataStore) GetClusterMembers(
196+
_ context.Context,
197+
request *p.GetClusterMembersRequest,
198+
) (*p.GetClusterMembersResponse, error) {
187199
var queryString strings.Builder
188200
var operands []interface{}
189201
queryString.WriteString(templateGetClusterMembership)
@@ -257,7 +269,10 @@ func (m *ClusterMetadataStore) GetClusterMembers(request *p.GetClusterMembersReq
257269
return &p.GetClusterMembersResponse{ActiveMembers: clusterMembers, NextPageToken: pagingToken}, nil
258270
}
259271

260-
func (m *ClusterMetadataStore) UpsertClusterMembership(request *p.UpsertClusterMembershipRequest) error {
272+
func (m *ClusterMetadataStore) UpsertClusterMembership(
273+
_ context.Context,
274+
request *p.UpsertClusterMembershipRequest,
275+
) error {
261276
query := m.session.Query(templateUpsertActiveClusterMembership, constMembershipPartition, []byte(request.HostID),
262277
request.RPCAddress, request.RPCPort, request.Role, request.SessionStart, time.Now().UTC(), int64(request.RecordExpiry.Seconds()))
263278
err := query.Exec()
@@ -269,7 +284,10 @@ func (m *ClusterMetadataStore) UpsertClusterMembership(request *p.UpsertClusterM
269284
return nil
270285
}
271286

272-
func (m *ClusterMetadataStore) PruneClusterMembership(request *p.PruneClusterMembershipRequest) error {
287+
func (m *ClusterMetadataStore) PruneClusterMembership(
288+
_ context.Context,
289+
request *p.PruneClusterMembershipRequest,
290+
) error {
273291
return nil
274292
}
275293

common/persistence/cassandra/execution_store.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package cassandra
2626

2727
import (
28+
"context"
2829
"time"
2930

3031
"go.temporal.io/server/common/log"
@@ -130,54 +131,57 @@ func NewExecutionStore(
130131
}
131132

132133
func (d *ExecutionStore) CreateWorkflowExecution(
134+
ctx context.Context,
133135
request *p.InternalCreateWorkflowExecutionRequest,
134136
) (*p.InternalCreateWorkflowExecutionResponse, error) {
135137
for _, req := range request.NewWorkflowNewEvents {
136-
if err := d.AppendHistoryNodes(req); err != nil {
138+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
137139
return nil, err
138140
}
139141
}
140142

141-
return d.MutableStateStore.CreateWorkflowExecution(request)
143+
return d.MutableStateStore.CreateWorkflowExecution(ctx, request)
142144
}
143145

144146
func (d *ExecutionStore) UpdateWorkflowExecution(
147+
ctx context.Context,
145148
request *p.InternalUpdateWorkflowExecutionRequest,
146149
) error {
147150
for _, req := range request.UpdateWorkflowNewEvents {
148-
if err := d.AppendHistoryNodes(req); err != nil {
151+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
149152
return err
150153
}
151154
}
152155
for _, req := range request.NewWorkflowNewEvents {
153-
if err := d.AppendHistoryNodes(req); err != nil {
156+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
154157
return err
155158
}
156159
}
157160

158-
return d.MutableStateStore.UpdateWorkflowExecution(request)
161+
return d.MutableStateStore.UpdateWorkflowExecution(ctx, request)
159162
}
160163

161164
func (d *ExecutionStore) ConflictResolveWorkflowExecution(
165+
ctx context.Context,
162166
request *p.InternalConflictResolveWorkflowExecutionRequest,
163167
) error {
164168
for _, req := range request.CurrentWorkflowEventsNewEvents {
165-
if err := d.AppendHistoryNodes(req); err != nil {
169+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
166170
return err
167171
}
168172
}
169173
for _, req := range request.ResetWorkflowEventsNewEvents {
170-
if err := d.AppendHistoryNodes(req); err != nil {
174+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
171175
return err
172176
}
173177
}
174178
for _, req := range request.NewWorkflowEventsNewEvents {
175-
if err := d.AppendHistoryNodes(req); err != nil {
179+
if err := d.AppendHistoryNodes(ctx, req); err != nil {
176180
return err
177181
}
178182
}
179183

180-
return d.MutableStateStore.ConflictResolveWorkflowExecution(request)
184+
return d.MutableStateStore.ConflictResolveWorkflowExecution(ctx, request)
181185
}
182186

183187
func (d *ExecutionStore) GetName() string {

common/persistence/cassandra/history_store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package cassandra
2626

2727
import (
28+
"context"
2829
"fmt"
2930
"sort"
3031

@@ -89,6 +90,7 @@ func NewHistoryStore(
8990
// AppendHistoryNodes upsert a batch of events as a single node to a history branch
9091
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
9192
func (h *HistoryStore) AppendHistoryNodes(
93+
_ context.Context,
9294
request *p.InternalAppendHistoryNodesRequest,
9395
) error {
9496
branchInfo := request.BranchInfo
@@ -135,6 +137,7 @@ func (h *HistoryStore) AppendHistoryNodes(
135137

136138
// DeleteHistoryNodes delete a history node
137139
func (h *HistoryStore) DeleteHistoryNodes(
140+
_ context.Context,
138141
request *p.InternalDeleteHistoryNodesRequest,
139142
) error {
140143
branchInfo := request.BranchInfo
@@ -164,6 +167,7 @@ func (h *HistoryStore) DeleteHistoryNodes(
164167
// ReadHistoryBranch returns history node data for a branch
165168
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
166169
func (h *HistoryStore) ReadHistoryBranch(
170+
_ context.Context,
167171
request *p.InternalReadHistoryBranchRequest,
168172
) (*p.InternalReadHistoryBranchResponse, error) {
169173
treeID, err := primitives.ValidateUUID(request.TreeID)
@@ -255,6 +259,7 @@ func (h *HistoryStore) ReadHistoryBranch(
255259
// 8[8,9]
256260
//
257261
func (h *HistoryStore) ForkHistoryBranch(
262+
_ context.Context,
258263
request *p.InternalForkHistoryBranchRequest,
259264
) error {
260265

@@ -281,6 +286,7 @@ func (h *HistoryStore) ForkHistoryBranch(
281286

282287
// DeleteHistoryBranch removes a branch
283288
func (h *HistoryStore) DeleteHistoryBranch(
289+
_ context.Context,
284290
request *p.InternalDeleteHistoryBranchRequest,
285291
) error {
286292
batch := h.Session.NewBatch(gocql.LoggedBatch)
@@ -312,6 +318,7 @@ func (h *HistoryStore) deleteBranchRangeNodes(
312318
}
313319

314320
func (h *HistoryStore) GetAllHistoryTreeBranches(
321+
_ context.Context,
315322
request *p.GetAllHistoryTreeBranchesRequest,
316323
) (*p.InternalGetAllHistoryTreeBranchesResponse, error) {
317324

@@ -359,6 +366,7 @@ func (h *HistoryStore) GetAllHistoryTreeBranches(
359366

360367
// GetHistoryTree returns all branch information of a tree
361368
func (h *HistoryStore) GetHistoryTree(
369+
_ context.Context,
362370
request *p.GetHistoryTreeRequest,
363371
) (*p.InternalGetHistoryTreeResponse, error) {
364372

common/persistence/cassandra/matching_task_store.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package cassandra
2626

2727
import (
28+
"context"
2829
"fmt"
2930
"strings"
3031
"time"
@@ -150,6 +151,7 @@ func NewMatchingTaskStore(
150151
}
151152

152153
func (d *MatchingTaskStore) CreateTaskQueue(
154+
_ context.Context,
153155
request *p.InternalCreateTaskQueueRequest,
154156
) error {
155157
query := d.Session.Query(templateInsertTaskQueueQuery,
@@ -181,6 +183,7 @@ func (d *MatchingTaskStore) CreateTaskQueue(
181183
}
182184

183185
func (d *MatchingTaskStore) GetTaskQueue(
186+
_ context.Context,
184187
request *p.InternalGetTaskQueueRequest,
185188
) (*p.InternalGetTaskQueueResponse, error) {
186189
query := d.Session.Query(templateGetTaskQueueQuery,
@@ -206,6 +209,7 @@ func (d *MatchingTaskStore) GetTaskQueue(
206209

207210
// UpdateTaskQueue update task queue
208211
func (d *MatchingTaskStore) UpdateTaskQueue(
212+
_ context.Context,
209213
request *p.InternalUpdateTaskQueueRequest,
210214
) (*p.UpdateTaskQueueResponse, error) {
211215
var err error
@@ -276,12 +280,14 @@ func (d *MatchingTaskStore) UpdateTaskQueue(
276280
}
277281

278282
func (d *MatchingTaskStore) ListTaskQueue(
283+
_ context.Context,
279284
_ *p.ListTaskQueueRequest,
280285
) (*p.InternalListTaskQueueResponse, error) {
281286
return nil, serviceerror.NewUnavailable(fmt.Sprintf("unsupported operation"))
282287
}
283288

284289
func (d *MatchingTaskStore) DeleteTaskQueue(
290+
_ context.Context,
285291
request *p.DeleteTaskQueueRequest,
286292
) error {
287293
query := d.Session.Query(templateDeleteTaskQueueQuery,
@@ -301,6 +307,7 @@ func (d *MatchingTaskStore) DeleteTaskQueue(
301307

302308
// CreateTasks add tasks
303309
func (d *MatchingTaskStore) CreateTasks(
310+
_ context.Context,
304311
request *p.InternalCreateTasksRequest,
305312
) (*p.CreateTasksResponse, error) {
306313
batch := d.Session.NewBatch(gocql.LoggedBatch)
@@ -380,6 +387,7 @@ func GetTaskTTL(expireTime *time.Time) int64 {
380387

381388
// GetTasks get a task
382389
func (d *MatchingTaskStore) GetTasks(
390+
_ context.Context,
383391
request *p.GetTasksRequest,
384392
) (*p.InternalGetTasksResponse, error) {
385393
// Reading taskqueue tasks need to be quorum level consistent, otherwise we could lose tasks
@@ -437,6 +445,7 @@ func (d *MatchingTaskStore) GetTasks(
437445

438446
// CompleteTask delete a task
439447
func (d *MatchingTaskStore) CompleteTask(
448+
_ context.Context,
440449
request *p.CompleteTaskRequest,
441450
) error {
442451
tli := request.TaskQueue
@@ -459,6 +468,7 @@ func (d *MatchingTaskStore) CompleteTask(
459468
// Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will
460469
// be returned to the caller
461470
func (d *MatchingTaskStore) CompleteTasksLessThan(
471+
_ context.Context,
462472
request *p.CompleteTasksLessThanRequest,
463473
) (int, error) {
464474
query := d.Session.Query(templateCompleteTasksLessThanQuery,

0 commit comments

Comments
 (0)