Skip to content

Commit 8c74aef

Browse files
authored
Remove JSON serialization option for persistence (#750)
1 parent 4be898f commit 8c74aef

File tree

11 files changed

+67
-220
lines changed

11 files changed

+67
-220
lines changed

common/persistence/dataInterfaces.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,6 @@ type (
567567
UpdateWorkflowMutation WorkflowMutation
568568

569569
NewWorkflowSnapshot *WorkflowSnapshot
570-
571-
Encoding enumspb.EncodingType // optional binary encoding type
572570
}
573571

574572
// ConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for a single run
@@ -589,8 +587,6 @@ type (
589587
// TODO deprecate this once nDC migration is completed
590588
// basically should use CurrentWorkflowMutation instead
591589
CurrentWorkflowCAS *CurrentWorkflowCAS
592-
593-
Encoding enumspb.EncodingType // optional binary encoding type
594590
}
595591

596592
// CurrentWorkflowCAS represent a compare and swap on current record
@@ -618,8 +614,6 @@ type (
618614

619615
// For new mutable state
620616
NewWorkflowSnapshot WorkflowSnapshot
621-
622-
Encoding enumspb.EncodingType // optional binary encoding type
623617
}
624618

625619
// WorkflowEvents is used as generic workflow history events transaction container
@@ -1043,8 +1037,6 @@ type (
10431037
Events []*historypb.HistoryEvent
10441038
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
10451039
TransactionID int64
1046-
// optional binary encoding type
1047-
Encoding enumspb.EncodingType
10481040
// The shard to get history node data
10491041
ShardID *int
10501042
}

common/persistence/executionStore.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
195195
request *UpdateWorkflowExecutionRequest,
196196
) (*UpdateWorkflowExecutionResponse, error) {
197197

198-
serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
198+
serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation)
199199
if err != nil {
200200
return nil, err
201201
}
@@ -297,7 +297,7 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
297297
}
298298
var serializedCurrentWorkflowMutation *InternalWorkflowMutation
299299
if request.CurrentWorkflowMutation != nil {
300-
serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
300+
serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation)
301301
if err != nil {
302302
return err
303303
}
@@ -342,7 +342,7 @@ func (m *executionManagerImpl) ResetWorkflowExecution(
342342
}
343343
var serializedUpdateWorkflowSnapshot *InternalWorkflowMutation
344344
if request.CurrentWorkflowMutation != nil {
345-
serializedUpdateWorkflowSnapshot, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
345+
serializedUpdateWorkflowSnapshot, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation)
346346
if err != nil {
347347
return err
348348
}
@@ -389,7 +389,6 @@ func (m *executionManagerImpl) CreateWorkflowExecution(
389389

390390
func (m *executionManagerImpl) SerializeWorkflowMutation(
391391
input *WorkflowMutation,
392-
encoding enumspb.EncodingType,
393392
) (*InternalWorkflowMutation, error) {
394393

395394
serializedExecutionInfo, err := m.SerializeExecutionInfo(
@@ -402,7 +401,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
402401

403402
var serializedNewBufferedEvents *serialization.DataBlob
404403
if len(input.NewBufferedEvents) > 0 {
405-
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
404+
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, enumspb.ENCODING_TYPE_PROTO3)
406405
if err != nil {
407406
return nil, err
408407
}
@@ -492,13 +491,12 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
492491

493492
func (m *executionManagerImpl) SerializeVersionHistories(
494493
versionHistories *VersionHistories,
495-
encoding enumspb.EncodingType,
496494
) (*serialization.DataBlob, error) {
497495

498496
if versionHistories == nil {
499497
return nil, nil
500498
}
501-
return m.serializer.SerializeVersionHistories(versionHistories.ToProto(), encoding)
499+
return m.serializer.SerializeVersionHistories(versionHistories.ToProto(), enumspb.ENCODING_TYPE_PROTO3)
502500
}
503501

504502
func (m *executionManagerImpl) DeserializeVersionHistories(

common/persistence/historyStore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
197197
}
198198

199199
// nodeID will be the first eventID
200-
blob, err := m.historySerializer.SerializeBatchEvents(request.Events, request.Encoding)
200+
blob, err := m.historySerializer.SerializeBatchEvents(request.Events, enumspb.ENCODING_TYPE_PROTO3)
201201
if err != nil {
202202
return nil, err
203203
}

common/persistence/persistence-tests/executionManagerTestForEventsV2.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,10 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
125125
WorkflowRunTimeout: timestamp.DurationFromSeconds(20),
126126
DefaultWorkflowTaskTimeout: timestamp.DurationFromSeconds(13),
127127
ExecutionState: &persistenceblobs.WorkflowExecutionState{
128-
RunId: workflowExecution.GetRunId(),
129-
CreateRequestId: uuid.New(),
130-
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
131-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
128+
RunId: workflowExecution.GetRunId(),
129+
CreateRequestId: uuid.New(),
130+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
131+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
132132
},
133133
NextEventId: 3,
134134
LastProcessedEvent: 0,
@@ -230,10 +230,10 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreationWithVersionHistor
230230
WorkflowRunTimeout: timestamp.DurationFromSeconds(20),
231231
DefaultWorkflowTaskTimeout: timestamp.DurationFromSeconds(13),
232232
ExecutionState: &persistenceblobs.WorkflowExecutionState{
233-
RunId: workflowExecution.GetRunId(),
234-
CreateRequestId: uuid.New(),
235-
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
236-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
233+
RunId: workflowExecution.GetRunId(),
234+
CreateRequestId: uuid.New(),
235+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
236+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
237237
},
238238
NextEventId: common.EmptyEventID,
239239
LastProcessedEvent: 0,
@@ -354,10 +354,10 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
354354
WorkflowRunTimeout: updatedInfo.WorkflowRunTimeout,
355355
DefaultWorkflowTaskTimeout: updatedInfo.DefaultWorkflowTaskTimeout,
356356
ExecutionState: &persistenceblobs.WorkflowExecutionState{
357-
RunId: newWorkflowExecution.GetRunId(),
358-
CreateRequestId: uuid.New(),
359-
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
360-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
357+
RunId: newWorkflowExecution.GetRunId(),
358+
CreateRequestId: uuid.New(),
359+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
360+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
361361
},
362362
NextEventId: info0.NextEventId,
363363
LastProcessedEvent: common.EmptyEventID,
@@ -370,8 +370,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
370370
TransferTasks: nil,
371371
TimerTasks: nil,
372372
},
373-
RangeID: s.ShardInfo.GetRangeId(),
374-
Encoding: pickRandomEncoding(),
373+
RangeID: s.ShardInfo.GetRangeId(),
375374
})
376375

377376
s.NoError(err2)

common/persistence/persistence-tests/historyV2PersistenceTest.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
log "github.com/sirupsen/logrus"
3838
"github.com/stretchr/testify/require"
3939
"github.com/stretchr/testify/suite"
40-
enumspb "go.temporal.io/api/enums/v1"
4140
historypb "go.temporal.io/api/history/v1"
4241
"go.temporal.io/api/serviceerror"
4342

@@ -811,7 +810,6 @@ func (s *HistoryV2PersistenceSuite) append(branch []byte, events []*historypb.Hi
811810
BranchToken: branch,
812811
Events: events,
813812
TransactionID: txnID,
814-
Encoding: enumspb.ENCODING_TYPE_PROTO3,
815813
ShardID: convert.IntPtr(int(s.ShardInfo.GetShardId())),
816814
})
817815
return err

common/persistence/persistence-tests/persistenceTestBase.go

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -304,10 +304,10 @@ func (s *TestBase) CreateWorkflowExecutionWithBranchToken(namespaceID string, wo
304304
WorkflowRunTimeout: wTimeout,
305305
DefaultWorkflowTaskTimeout: workflowTaskTimeout,
306306
ExecutionState: &persistenceblobs.WorkflowExecutionState{
307-
RunId: workflowExecution.GetRunId(),
308-
CreateRequestId: uuid.New(),
309-
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
310-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
307+
RunId: workflowExecution.GetRunId(),
308+
CreateRequestId: uuid.New(),
309+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
310+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
311311
},
312312
LastFirstEventId: common.FirstEventID,
313313
NextEventId: nextEventID,
@@ -371,14 +371,14 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(namespaceID string, workflow
371371
response, err := s.ExecutionManager.CreateWorkflowExecution(&p.CreateWorkflowExecutionRequest{
372372
NewWorkflowSnapshot: p.WorkflowSnapshot{
373373
ExecutionInfo: &p.WorkflowExecutionInfo{
374-
NamespaceId: namespaceID,
375-
WorkflowId: workflowExecution.GetWorkflowId(),
376-
TaskQueue: taskQueue,
374+
NamespaceId: namespaceID,
375+
WorkflowId: workflowExecution.GetWorkflowId(),
376+
TaskQueue: taskQueue,
377377
ExecutionState: &persistenceblobs.WorkflowExecutionState{
378378
RunId: workflowExecution.GetRunId(),
379379
CreateRequestId: uuid.New(),
380-
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
381-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
380+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
381+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
382382
},
383383
LastFirstEventId: common.FirstEventID,
384384
NextEventId: nextEventID,
@@ -417,9 +417,9 @@ func (s *TestBase) CreateChildWorkflowExecution(namespaceID string, workflowExec
417417
WorkflowRunTimeout: wTimeout,
418418
DefaultWorkflowTaskTimeout: workflowTaskTimeout,
419419
ExecutionState: &persistenceblobs.WorkflowExecutionState{State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
420-
RunId: workflowExecution.GetRunId(),
421-
CreateRequestId: uuid.New(),
422-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
420+
RunId: workflowExecution.GetRunId(),
421+
CreateRequestId: uuid.New(),
422+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
423423
},
424424
LastFirstEventId: common.FirstEventID,
425425
NextEventId: nextEventID,
@@ -519,10 +519,10 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *p.WorkflowExecutionInfo,
519519
DefaultWorkflowTaskTimeout: updatedInfo.DefaultWorkflowTaskTimeout,
520520

521521
ExecutionState: &persistenceblobs.WorkflowExecutionState{
522-
RunId: newExecution.GetRunId(),
523-
CreateRequestId: uuid.New(),
524-
State: updatedInfo.ExecutionState.State,
525-
Status: updatedInfo.ExecutionState.Status,
522+
RunId: newExecution.GetRunId(),
523+
CreateRequestId: uuid.New(),
524+
State: updatedInfo.ExecutionState.State,
525+
Status: updatedInfo.ExecutionState.Status,
526526
},
527527

528528
LastFirstEventId: common.FirstEventID,
@@ -537,8 +537,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *p.WorkflowExecutionInfo,
537537
TransferTasks: nil,
538538
TimerTasks: nil,
539539
},
540-
RangeID: s.ShardInfo.GetRangeId(),
541-
Encoding: pickRandomEncoding(),
540+
RangeID: s.ShardInfo.GetRangeId(),
542541
}
543542
req.UpdateWorkflowMutation.ExecutionInfo.ExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED
544543
req.UpdateWorkflowMutation.ExecutionInfo.ExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW
@@ -574,7 +573,6 @@ func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *p.WorkflowExecu
574573
UpsertTimerInfos: nil,
575574
DeleteTimerInfos: nil,
576575
},
577-
Encoding: pickRandomEncoding(),
578576
})
579577
return err
580578
}
@@ -736,7 +734,6 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *p.Workflo
736734
Condition: condition,
737735
Checksum: testWorkflowChecksum,
738736
},
739-
Encoding: pickRandomEncoding(),
740737
})
741738
return err
742739
}
@@ -752,8 +749,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
752749
Condition: condition,
753750
UpsertActivityInfos: upsertActivityInfo,
754751
},
755-
RangeID: s.ShardInfo.GetRangeId(),
756-
Encoding: pickRandomEncoding(),
752+
RangeID: s.ShardInfo.GetRangeId(),
757753
})
758754
return err
759755
}
@@ -769,8 +765,7 @@ func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated(
769765
Condition: condition,
770766
UpsertChildExecutionInfos: childInfos,
771767
},
772-
RangeID: s.ShardInfo.GetRangeId(),
773-
Encoding: pickRandomEncoding(),
768+
RangeID: s.ShardInfo.GetRangeId(),
774769
})
775770
return err
776771
}
@@ -787,8 +782,7 @@ func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(
787782
Condition: condition,
788783
UpsertRequestCancelInfos: upsertRequestCancelInfo,
789784
},
790-
RangeID: s.ShardInfo.GetRangeId(),
791-
Encoding: pickRandomEncoding(),
785+
RangeID: s.ShardInfo.GetRangeId(),
792786
})
793787
return err
794788
}
@@ -805,8 +799,7 @@ func (s *TestBase) UpdateWorkflowExecutionForSignal(
805799
Condition: condition,
806800
UpsertSignalInfos: upsertSignalInfos,
807801
},
808-
RangeID: s.ShardInfo.GetRangeId(),
809-
Encoding: pickRandomEncoding(),
802+
RangeID: s.ShardInfo.GetRangeId(),
810803
})
811804
return err
812805
}
@@ -823,8 +816,7 @@ func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
823816
Condition: condition,
824817
ClearBufferedEvents: clearBufferedEvents,
825818
},
826-
RangeID: s.ShardInfo.GetRangeId(),
827-
Encoding: pickRandomEncoding(),
819+
RangeID: s.ShardInfo.GetRangeId(),
828820
})
829821
return err
830822
}
@@ -873,7 +865,6 @@ func (s *TestBase) UpdateAllMutableState(updatedMutableState *p.WorkflowMutableS
873865
UpsertSignalInfos: sInfos,
874866
UpsertSignalRequestedIDs: srIDs,
875867
},
876-
Encoding: pickRandomEncoding(),
877868
})
878869
return err
879870
}
@@ -902,7 +893,6 @@ func (s *TestBase) ConflictResolveWorkflowExecution(prevRunID string, prevLastWr
902893
SignalRequestedIDs: ids,
903894
Checksum: testWorkflowChecksum,
904895
},
905-
Encoding: pickRandomEncoding(),
906896
})
907897
}
908898

@@ -940,7 +930,6 @@ func (s *TestBase) ResetWorkflowExecution(condition int64, info *p.WorkflowExecu
940930
ReplicationTasks: replTasks,
941931
TimerTasks: timerTasks,
942932
},
943-
Encoding: pickRandomEncoding(),
944933
}
945934

946935
if updateCurr {
@@ -1537,7 +1526,3 @@ func GenerateRandomDBName(n int) string {
15371526
prefix.WriteString(randString(n))
15381527
return prefix.String()
15391528
}
1540-
1541-
func pickRandomEncoding() enumspb.EncodingType {
1542-
return enumspb.ENCODING_TYPE_PROTO3
1543-
}

0 commit comments

Comments
 (0)