Skip to content

Commit 4fc426a

Browse files
authored
Replace EncodingType with proto enum (#638)
* All but BufferedEvents. * Fix tests. * Add .String(). * Fix unit test.
1 parent 910675d commit 4fc426a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+268
-414
lines changed

common/constants.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,6 @@ const (
6262
WorkerServiceName = "worker"
6363
)
6464

65-
// Data encoding types
66-
const (
67-
// todo: Deprecate and use protoEncodingEnum.ToString()
68-
EncodingTypeJSON EncodingType = "json"
69-
EncodingTypeGob EncodingType = "gob"
70-
EncodingTypeUnknown EncodingType = "unknow"
71-
EncodingTypeEmpty EncodingType = ""
72-
EncodingTypeProto3 EncodingType = "proto3"
73-
)
74-
75-
func (e EncodingType) String() string {
76-
return string(e)
77-
}
78-
79-
type (
80-
// EncodingType is an enum that represents various data encoding types
81-
EncodingType string
82-
)
83-
8465
// MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
8566
const MaxTaskTimeout = MaxTaskTimeoutSeconds * time.Second
8667
const MaxTaskTimeoutSeconds = 31622400

common/persistence/cassandra/cassandraClusterMetadata.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"github.com/pborman/uuid"
3333
"go.temporal.io/api/serviceerror"
3434

35-
"go.temporal.io/server/common"
3635
"go.temporal.io/server/common/cassandra"
3736
"go.temporal.io/server/common/log"
3837
p "go.temporal.io/server/common/persistence"
@@ -113,7 +112,7 @@ func (m *cassandraClusterMetadata) Close() {
113112
func (m *cassandraClusterMetadata) InitializeImmutableClusterMetadata(
114113
request *p.InternalInitializeImmutableClusterMetadataRequest) (*p.InternalInitializeImmutableClusterMetadataResponse, error) {
115114
query := m.session.Query(templateInitImmutableClusterMetadata, constMetadataPartition,
116-
request.ImmutableClusterMetadata.Data, request.ImmutableClusterMetadata.Encoding)
115+
request.ImmutableClusterMetadata.Data, request.ImmutableClusterMetadata.Encoding.String())
117116

118117
previous := make(map[string]interface{})
119118
applied, err := query.MapScanCAS(previous)
@@ -158,7 +157,7 @@ func (m *cassandraClusterMetadata) convertPreviousMapToInitializeResponse(previo
158157
}
159158

160159
return &p.InternalInitializeImmutableClusterMetadataResponse{
161-
PersistedImmutableMetadata: p.NewDataBlob(imData, common.EncodingType(imDataEncoding)),
160+
PersistedImmutableMetadata: p.NewDataBlob(imData, imDataEncoding),
162161
RequestApplied: false,
163162
}, nil
164163
}
@@ -173,7 +172,7 @@ func (m *cassandraClusterMetadata) GetImmutableClusterMetadata() (*p.InternalGet
173172
}
174173

175174
return &p.InternalGetImmutableClusterMetadataResponse{
176-
ImmutableClusterMetadata: p.NewDataBlob(immutableMetadata, common.EncodingType(encoding)),
175+
ImmutableClusterMetadata: p.NewDataBlob(immutableMetadata, encoding),
177176
}, nil
178177
}
179178

common/persistence/cassandra/cassandraHistoryPersistence.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
128128

129129
batch := h.session.NewBatch(gocql.LoggedBatch)
130130
batch.Query(v2templateInsertTree,
131-
branchInfo.TreeId, branchInfo.BranchId, treeInfoDataBlob.Data, treeInfoDataBlob.Encoding)
131+
branchInfo.TreeId, branchInfo.BranchId, treeInfoDataBlob.Data, treeInfoDataBlob.Encoding.String())
132132
batch.Query(v2templateUpsertData,
133-
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.Encoding)
133+
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.Encoding.String())
134134
err = h.session.ExecuteBatch(batch)
135135
} else {
136136
query := h.session.Query(v2templateUpsertData,
137-
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.Encoding)
137+
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.Encoding.String())
138138
err = query.Exec()
139139
}
140140

@@ -173,11 +173,14 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
173173

174174
history := make([]*serialization.DataBlob, 0, int(request.PageSize))
175175

176-
eventBlob := &serialization.DataBlob{}
177-
nodeID := int64(0)
178-
txnID := int64(0)
179-
180-
for iter.Scan(&nodeID, &txnID, &eventBlob.Data, &eventBlob.Encoding) {
176+
for {
177+
var data []byte
178+
var encoding string
179+
nodeID := int64(0)
180+
txnID := int64(0)
181+
if !iter.Scan(&nodeID, &txnID, &data, &encoding) {
182+
break
183+
}
181184
if txnID < lastTxnID {
182185
// assuming that business logic layer is correct and transaction ID only increase
183186
// thus, valid event batch will come with increasing transaction ID
@@ -201,8 +204,8 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
201204
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
202205
lastTxnID = txnID
203206
lastNodeID = nodeID
207+
eventBlob := p.NewDataBlob(data, encoding)
204208
history = append(history, eventBlob)
205-
eventBlob = &serialization.DataBlob{}
206209
}
207210
}
208211

@@ -239,7 +242,7 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
239242
// \
240243
// 8[8]
241244
//
242-
//Now we want to fork a new branch B3 from B2.
245+
// Now we want to fork a new branch B3 from B2.
243246
// The only valid forking nodeIDs are 3,6 or 8.
244247
// 1 is not valid because we can't fork from first node.
245248
// 2/4/5 is NOT valid either because they are inside a batch.
@@ -318,7 +321,7 @@ func (h *cassandraHistoryV2Persistence) ForkHistoryBranch(
318321
if err != nil {
319322
return nil, serviceerror.NewInternal(fmt.Sprintf("ForkHistoryBranch - Gocql NewBranchID UUID cast failed. Error: %v", err))
320323
}
321-
query := h.session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.Encoding)
324+
query := h.session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.Encoding.String())
322325
err = query.Exec()
323326
if err != nil {
324327
return nil, convertCommonErrors("ForkHistoryBranch", err)

common/persistence/cassandra/cassandraMetadataPersistenceV2.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/gocql/gocql"
3131
"go.temporal.io/api/serviceerror"
3232

33-
"go.temporal.io/server/common"
3433
"go.temporal.io/server/common/cassandra"
3534
"go.temporal.io/server/common/log"
3635
"go.temporal.io/server/common/log/tag"
@@ -156,7 +155,7 @@ func (m *cassandraMetadataPersistenceV2) CreateNamespaceInV2Table(request *p.Int
156155
request.ID,
157156
request.Name,
158157
request.Namespace.Data,
159-
request.Namespace.Encoding,
158+
request.Namespace.Encoding.String(),
160159
metadata.NotificationVersion,
161160
request.IsGlobal,
162161
)
@@ -270,7 +269,7 @@ func (m *cassandraMetadataPersistenceV2) GetNamespace(request *p.GetNamespaceReq
270269
}
271270

272271
return &p.InternalGetNamespaceResponse{
273-
Namespace: p.NewDataBlob(detail, common.EncodingType(detailEncoding)),
272+
Namespace: p.NewDataBlob(detail, detailEncoding),
274273
IsGlobal: isGlobalNamespace,
275274
NotificationVersion: notificationVersion,
276275
}, nil
@@ -307,7 +306,7 @@ func (m *cassandraMetadataPersistenceV2) ListNamespaces(request *p.ListNamespace
307306
// do not include the metadata record
308307
if name != namespaceMetadataRecordName {
309308
response.Namespaces = append(response.Namespaces, &p.InternalGetNamespaceResponse{
310-
Namespace: p.NewDataBlob(detail, common.EncodingType(detailEncoding)),
309+
Namespace: p.NewDataBlob(detail, detailEncoding),
311310
IsGlobal: isGlobal,
312311
NotificationVersion: notificationVersion,
313312
})

common/persistence/cassandra/cassandraPersistence.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ func (d *cassandraPersistence) CreateShard(request *p.CreateShardRequest) error
774774
defaultVisibilityTimestamp,
775775
rowTypeShardTaskID,
776776
data.Data,
777-
data.Encoding,
777+
data.Encoding.String(),
778778
shardInfo.GetRangeId())
779779

780780
previous := make(map[string]interface{})
@@ -834,7 +834,7 @@ func (d *cassandraPersistence) UpdateShard(request *p.UpdateShardRequest) error
834834

835835
query := d.session.Query(templateUpdateShardQuery,
836836
data.Data,
837-
data.Encoding,
837+
data.Encoding.String(),
838838
shardInfo.GetRangeId(),
839839
shardInfo.GetShardId(), // Where
840840
rowTypeShard,
@@ -1398,7 +1398,7 @@ func (d *cassandraPersistence) ResetWorkflowExecution(request *p.InternalResetWo
13981398
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
13991399
newRunID,
14001400
stateDatablob.Data,
1401-
stateDatablob.Encoding,
1401+
stateDatablob.Encoding.String(),
14021402
replicationVersions.Data,
14031403
replicationVersions.Encoding.String(),
14041404
lastWriteVersion,
@@ -2240,7 +2240,7 @@ func (d *cassandraPersistence) LeaseTaskQueue(request *p.LeaseTaskQueueRequest)
22402240
taskQueueTaskID,
22412241
initialRangeID,
22422242
datablob.Data,
2243-
datablob.Encoding,
2243+
datablob.Encoding.String(),
22442244
)
22452245
} else if isThrottlingError(err) {
22462246
return nil, serviceerror.NewResourceExhausted(fmt.Sprintf("LeaseTaskQueue operation failed. TaskQueue: %v, TaskType: %v, Error: %v", request.TaskQueue, request.TaskType, err))
@@ -2277,7 +2277,7 @@ func (d *cassandraPersistence) LeaseTaskQueue(request *p.LeaseTaskQueueRequest)
22772277
query = d.session.Query(templateUpdateTaskQueueQuery,
22782278
rangeID+1,
22792279
datablob.Data,
2280-
datablob.Encoding,
2280+
datablob.Encoding.String(),
22812281
request.NamespaceID,
22822282
&request.TaskQueue,
22832283
request.TaskType,
@@ -2326,7 +2326,7 @@ func (d *cassandraPersistence) UpdateTaskQueue(request *p.UpdateTaskQueueRequest
23262326
taskQueueTaskID,
23272327
request.RangeID,
23282328
datablob.Data,
2329-
datablob.Encoding,
2329+
datablob.Encoding.String(),
23302330
stickyTaskQueueTTL,
23312331
)
23322332
err = query.Exec()
@@ -2346,7 +2346,7 @@ func (d *cassandraPersistence) UpdateTaskQueue(request *p.UpdateTaskQueueRequest
23462346
query := d.session.Query(templateUpdateTaskQueueQuery,
23472347
request.RangeID,
23482348
datablob.Data,
2349-
datablob.Encoding,
2349+
datablob.Encoding.String(),
23502350
tli.GetNamespaceId(),
23512351
&tli.Name,
23522352
tli.TaskType,
@@ -2431,7 +2431,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
24312431
rowTypeTask,
24322432
task.GetTaskId(),
24332433
datablob.Data,
2434-
datablob.Encoding)
2434+
datablob.Encoding.String())
24352435
} else {
24362436
if ttl > maxCassandraTTL {
24372437
ttl = maxCassandraTTL
@@ -2444,7 +2444,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
24442444
rowTypeTask,
24452445
task.GetTaskId(),
24462446
datablob.Data,
2447-
datablob.Encoding,
2447+
datablob.Encoding.String(),
24482448
ttl)
24492449
}
24502450
}
@@ -2461,7 +2461,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
24612461
batch.Query(templateUpdateTaskQueueQuery,
24622462
request.TaskQueueInfo.RangeID,
24632463
datablob.Data,
2464-
datablob.Encoding,
2464+
datablob.Encoding.String(),
24652465
namespaceID,
24662466
taskQueue,
24672467
taskQueueType,
@@ -2702,7 +2702,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplication
27022702
request.SourceClusterName,
27032703
rowTypeDLQRunID,
27042704
datablob.Data,
2705-
datablob.Encoding,
2705+
datablob.Encoding.String(),
27062706
defaultVisibilityTimestamp,
27072707
task.GetTaskId())
27082708

0 commit comments

Comments
 (0)