Skip to content

Commit 80514cd

Browse files
Remove Proto Double Serialization, VersionHistories Column Cassandra (#624)
1 parent 17e3b67 commit 80514cd

File tree

12 files changed

+531
-741
lines changed

12 files changed

+531
-741
lines changed

api/persistenceblobs/v1/message.pb.go

Lines changed: 454 additions & 598 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/cassandra/cassandraPersistence.go

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,6 @@ workflow_state = ? ` +
172172
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?` +
173173
`, ?, ?, ?, ?, ?) IF NOT EXISTS `
174174

175-
templateCreateWorkflowExecutionWithVersionHistoriesQuery = `INSERT INTO executions (` +
176-
`shard_id, namespace_id, workflow_id, run_id, type, ` +
177-
`execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, ` +
178-
`visibility_ts, task_id, version_histories, version_histories_encoding, checksum, checksum_encoding) ` +
179-
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS `
180-
181175
templateCreateTransferTaskQuery = `INSERT INTO executions (` +
182176
`shard_id, type, namespace_id, workflow_id, run_id, transfer, transfer_encoding, visibility_ts, task_id) ` +
183177
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`
@@ -203,7 +197,7 @@ workflow_state = ? ` +
203197

204198
templateGetWorkflowExecutionQuery = `SELECT execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, replication_metadata, replication_metadata_encoding, activity_map, activity_map_encoding, timer_map, timer_map_encoding, ` +
205199
`child_executions_map, child_executions_map_encoding, request_cancel_map, request_cancel_map_encoding, signal_map, signal_map_encoding, signal_requested, buffered_events_list, ` +
206-
`version_histories, version_histories_encoding, checksum, checksum_encoding ` +
200+
`checksum, checksum_encoding ` +
207201
`FROM executions ` +
208202
`WHERE shard_id = ? ` +
209203
`and type = ? ` +
@@ -275,25 +269,6 @@ workflow_state = ? ` +
275269
`and task_id = ? ` +
276270
`IF next_event_id = ? `
277271

278-
templateUpdateWorkflowExecutionWithVersionHistoriesQuery = `UPDATE executions ` +
279-
`SET execution = ?` +
280-
`, execution_encoding = ?` +
281-
`, execution_state = ? ` +
282-
`, execution_state_encoding = ? ` +
283-
`, next_event_id = ? ` +
284-
`, version_histories = ? ` +
285-
`, version_histories_encoding = ? ` +
286-
`, checksum = ? ` +
287-
`, checksum_encoding = ? ` +
288-
`WHERE shard_id = ? ` +
289-
`and type = ? ` +
290-
`and namespace_id = ? ` +
291-
`and workflow_id = ? ` +
292-
`and run_id = ? ` +
293-
`and visibility_ts = ? ` +
294-
`and task_id = ? ` +
295-
`IF next_event_id = ? `
296-
297272
templateUpdateActivityInfoQuery = `UPDATE executions ` +
298273
`SET activity_map[ ? ] = ?, activity_map_encoding = ? ` +
299274
`WHERE shard_id = ? ` +
@@ -1109,16 +1084,11 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut
11091084
return nil, convertCommonErrors("GetWorkflowExecution", err)
11101085
}
11111086

1112-
info, replicationState, err := workflowExecutionFromRow(result)
1087+
state, err := mutableStateFromRow(result)
11131088
if err != nil {
11141089
return nil, serviceerror.NewInternal(fmt.Sprintf("GetWorkflowExecution operation failed. Error: %v", err))
11151090
}
11161091

1117-
state := &p.InternalWorkflowMutableState{
1118-
ExecutionInfo: info,
1119-
ReplicationState: replicationState,
1120-
VersionHistories: p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
1121-
}
11221092

11231093
if state.VersionHistories != nil && state.ReplicationState != nil {
11241094
return nil, serviceerror.NewInternal(fmt.Sprintf("GetWorkflowExecution operation failed. VersionHistories and ReplicationState both are set."))
@@ -1929,7 +1899,8 @@ func (d *cassandraPersistence) ListConcreteExecutions(
19291899
continue
19301900
}
19311901
if _, ok := result["execution"]; ok {
1932-
wfInfo, _, _ := workflowExecutionFromRow(result)
1902+
state, _ := mutableStateFromRow(result)
1903+
wfInfo := state.ExecutionInfo
19331904
response.ExecutionInfos = append(response.ExecutionInfos, wfInfo)
19341905
}
19351906
result = make(map[string]interface{})
@@ -2806,30 +2777,30 @@ func (d *cassandraPersistence) RangeDeleteReplicationTaskFromDLQ(
28062777
return nil
28072778
}
28082779

2809-
func workflowExecutionFromRow(result map[string]interface{}) (*p.InternalWorkflowExecutionInfo, *persistenceblobs.ReplicationState, error) {
2780+
func mutableStateFromRow(result map[string]interface{}) (*p.InternalWorkflowMutableState, error) {
28102781
eiBytes, ok := result["execution"].([]byte)
28112782
if !ok {
2812-
return nil, nil, newPersistedTypeMismatchError("execution", "", eiBytes, result)
2783+
return nil, newPersistedTypeMismatchError("execution", "", eiBytes, result)
28132784
}
28142785

28152786
eiEncoding, ok := result["execution_encoding"].(string)
28162787
if !ok {
2817-
return nil, nil, newPersistedTypeMismatchError("execution_encoding", "", eiEncoding, result)
2788+
return nil, newPersistedTypeMismatchError("execution_encoding", "", eiEncoding, result)
28182789
}
28192790

28202791
protoInfo, err := serialization.WorkflowExecutionInfoFromBlob(eiBytes, eiEncoding)
28212792
if err != nil {
2822-
return nil, nil, err
2793+
return nil, err
28232794
}
28242795

28252796
nextEventID, ok := result["next_event_id"].(int64)
28262797
if !ok {
2827-
return nil, nil, newPersistedTypeMismatchError("next_event_id", "", nextEventID, result)
2798+
return nil, newPersistedTypeMismatchError("next_event_id", "", nextEventID, result)
28282799
}
28292800

28302801
protoState, err := protoExecutionStateFromRow(result)
28312802
if err != nil {
2832-
return nil, nil, err
2803+
return nil, err
28332804
}
28342805

28352806
info := p.ProtoWorkflowExecutionToPartialInternalExecution(protoInfo, protoState, nextEventID)
@@ -2838,13 +2809,18 @@ func workflowExecutionFromRow(result map[string]interface{}) (*p.InternalWorkflo
28382809
if protoInfo.ReplicationData != nil {
28392810
protoReplVersions, err := ProtoReplicationVersionsFromResultMap(result)
28402811
if err != nil {
2841-
return nil, nil, err
2812+
return nil, err
28422813
}
28432814

28442815
state = ReplicationStateFromProtos(protoInfo, protoReplVersions)
28452816
}
28462817

2847-
return info, state, nil
2818+
mutableState := &p.InternalWorkflowMutableState{
2819+
ExecutionInfo: info,
2820+
ReplicationState: state,
2821+
VersionHistories: protoInfo.VersionHistories,
2822+
}
2823+
return mutableState, nil
28482824
}
28492825

28502826
func ProtoReplicationVersionsFromResultMap(result map[string]interface{}) (*persistenceblobs.ReplicationVersions, error) {

common/persistence/cassandra/cassandraPersistenceUtil.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"go.temporal.io/api/serviceerror"
3535

3636
enumsspb "go.temporal.io/server/api/enums/v1"
37+
"go.temporal.io/server/api/history/v1"
3738
"go.temporal.io/server/api/persistenceblobs/v1"
3839
replicationspb "go.temporal.io/server/api/replication/v1"
3940
"go.temporal.io/server/common"
@@ -426,7 +427,7 @@ func createExecution(
426427
shardID int,
427428
executionInfo *p.InternalWorkflowExecutionInfo,
428429
replicationState *persistenceblobs.ReplicationState,
429-
versionHistories *serialization.DataBlob,
430+
versionHistories *history.VersionHistories,
430431
checksum checksum.Checksum,
431432
cqlNowTimestampMillis int64,
432433
startVersion int64,
@@ -487,8 +488,7 @@ func createExecution(
487488
checksumDatablob.Encoding.String())
488489
} else if versionHistories != nil {
489490
// TODO also need to set the start / current / last write version
490-
versionHistoriesData, versionHistoriesEncoding := p.FromDataBlob(versionHistories)
491-
batch.Query(templateCreateWorkflowExecutionWithVersionHistoriesQuery,
491+
batch.Query(templateCreateWorkflowExecutionQuery,
492492
shardID,
493493
namespaceID,
494494
workflowID,
@@ -501,8 +501,6 @@ func createExecution(
501501
executionInfo.NextEventID,
502502
defaultVisibilityTimestamp,
503503
rowTypeExecutionTaskID,
504-
versionHistoriesData,
505-
versionHistoriesEncoding,
506504
checksumDatablob.Data,
507505
checksumDatablob.Encoding.String())
508506
} else if replicationState != nil {
@@ -538,7 +536,7 @@ func updateExecution(
538536
shardID int,
539537
executionInfo *p.InternalWorkflowExecutionInfo,
540538
replicationState *persistenceblobs.ReplicationState,
541-
versionHistories *serialization.DataBlob,
539+
versionHistories *history.VersionHistories,
542540
cqlNowTimestampMillis int64,
543541
condition int64,
544542
checksum checksum.Checksum,
@@ -600,15 +598,12 @@ func updateExecution(
600598
condition)
601599
} else if versionHistories != nil {
602600
// TODO also need to set the start / current / last write version
603-
versionHistoriesData, versionHistoriesEncoding := p.FromDataBlob(versionHistories)
604-
batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQuery,
601+
batch.Query(templateUpdateWorkflowExecutionQuery,
605602
executionDatablob.Data,
606603
executionDatablob.Encoding.String(),
607604
executionStateDatablob.Data,
608605
executionStateDatablob.Encoding.String(),
609606
executionInfo.NextEventID,
610-
versionHistoriesData,
611-
versionHistoriesEncoding,
612607
checksumDatablob.Data,
613608
checksumDatablob.Encoding.String(),
614609
shardID,

common/persistence/executionStore.go

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package persistence
2727
import (
2828
historypb "go.temporal.io/api/history/v1"
2929
"go.temporal.io/api/serviceerror"
30+
workflowpb "go.temporal.io/api/workflow/v1"
3031

3132
"go.temporal.io/server/api/persistenceblobs/v1"
3233
"go.temporal.io/server/common"
@@ -87,6 +88,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
8788
ReplicationState: response.State.ReplicationState,
8889
Checksum: response.State.Checksum,
8990
ChildExecutionInfos: response.State.ChildExecutionInfos,
91+
VersionHistories: NewVersionHistoriesFromProto(response.State.VersionHistories),
9092
},
9193
}
9294

@@ -98,11 +100,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
98100
if err != nil {
99101
return nil, err
100102
}
101-
versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
102-
if err != nil {
103-
return nil, err
104-
}
105-
newResponse.State.VersionHistories = versionHistories
103+
106104
newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
107105

108106
return newResponse, nil
@@ -111,20 +109,8 @@ func (m *executionManagerImpl) GetWorkflowExecution(
111109
func (m *executionManagerImpl) DeserializeExecutionInfo(
112110
info *InternalWorkflowExecutionInfo,
113111
) (*WorkflowExecutionInfo, *persistenceblobs.ExecutionStats, error) {
114-
115-
completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
116-
if err != nil {
117-
return nil, nil, err
118-
}
119-
120-
autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
121-
if err != nil {
122-
return nil, nil, err
123-
}
124-
125112
newInfo := &WorkflowExecutionInfo{
126-
CompletionEvent: completionEvent,
127-
113+
CompletionEvent: info.CompletionEvent,
128114
NamespaceID: info.NamespaceID,
129115
WorkflowID: info.WorkflowID,
130116
RunID: info.RunID,
@@ -174,10 +160,15 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
174160
NonRetryableErrorTypes: info.NonRetryableErrorTypes,
175161
BranchToken: info.BranchToken,
176162
CronSchedule: info.CronSchedule,
177-
AutoResetPoints: autoResetPoints,
163+
AutoResetPoints: info.AutoResetPoints,
178164
SearchAttributes: info.SearchAttributes,
179165
Memo: info.Memo,
180166
}
167+
168+
if newInfo.AutoResetPoints == nil {
169+
newInfo.AutoResetPoints = &workflowpb.ResetPoints{}
170+
}
171+
181172
newStats := &persistenceblobs.ExecutionStats{
182173
HistorySize: info.HistorySize,
183174
}
@@ -238,15 +229,6 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
238229
if info == nil {
239230
return &InternalWorkflowExecutionInfo{}, nil
240231
}
241-
completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
242-
if err != nil {
243-
return nil, err
244-
}
245-
246-
resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
247-
if err != nil {
248-
return nil, err
249-
}
250232

251233
return &InternalWorkflowExecutionInfo{
252234
NamespaceID: info.NamespaceID,
@@ -257,7 +239,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
257239
ParentRunID: info.ParentRunID,
258240
InitiatedID: info.InitiatedID,
259241
CompletionEventBatchID: info.CompletionEventBatchID,
260-
CompletionEvent: completionEvent,
242+
CompletionEvent: info.CompletionEvent,
261243
TaskQueue: info.TaskQueue,
262244
WorkflowTypeName: info.WorkflowTypeName,
263245
WorkflowRunTimeout: info.WorkflowRunTimeout,
@@ -289,7 +271,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
289271
ClientLibraryVersion: info.ClientLibraryVersion,
290272
ClientFeatureVersion: info.ClientFeatureVersion,
291273
ClientImpl: info.ClientImpl,
292-
AutoResetPoints: resetPoints,
274+
AutoResetPoints: info.AutoResetPoints,
293275
Attempt: info.Attempt,
294276
HasRetryPolicy: info.HasRetryPolicy,
295277
InitialInterval: info.InitialInterval,
@@ -423,10 +405,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
423405
if err != nil {
424406
return nil, err
425407
}
426-
serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
427-
if err != nil {
428-
return nil, err
429-
}
408+
430409
var serializedNewBufferedEvents *serialization.DataBlob
431410
if input.NewBufferedEvents != nil {
432411
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
@@ -447,7 +426,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
447426
return &InternalWorkflowMutation{
448427
ExecutionInfo: serializedExecutionInfo,
449428
ReplicationState: input.ReplicationState,
450-
VersionHistories: serializedVersionHistories,
429+
VersionHistories: input.VersionHistories.ToProto(),
451430
StartVersion: startVersion,
452431
LastWriteVersion: lastWriteVersion,
453432

@@ -488,10 +467,6 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
488467
if err != nil {
489468
return nil, err
490469
}
491-
serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
492-
if err != nil {
493-
return nil, err
494-
}
495470

496471
startVersion, err := getStartVersion(input.VersionHistories, input.ReplicationState)
497472
if err != nil {
@@ -505,7 +480,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
505480
return &InternalWorkflowSnapshot{
506481
ExecutionInfo: serializedExecutionInfo,
507482
ReplicationState: input.ReplicationState,
508-
VersionHistories: serializedVersionHistories,
483+
VersionHistories: input.VersionHistories.ToProto(),
509484
StartVersion: startVersion,
510485
LastWriteVersion: lastWriteVersion,
511486

0 commit comments

Comments
 (0)