Skip to content

Commit 7a9752b

Browse files
authored
Use proto JSON serializer for proto objects (#611)
1 parent ca1c43c commit 7a9752b

File tree

5 files changed

+55
-35
lines changed

5 files changed

+55
-35
lines changed

common/archiver/historyIterator.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
package archiver
2626

2727
import (
28+
"bytes"
2829
"encoding/json"
2930
"errors"
3031

32+
"github.com/gogo/protobuf/jsonpb"
33+
"github.com/gogo/protobuf/proto"
3134
historypb "go.temporal.io/api/history/v1"
3235
"go.temporal.io/api/serviceerror"
3336

@@ -236,19 +239,27 @@ type (
236239
EstimateSize(v interface{}) (int, error)
237240
}
238241

239-
jsonSizeEstimator struct{}
242+
jsonSizeEstimator struct {
243+
marshaler jsonpb.Marshaler
244+
}
240245
)
241246

242247
func (e *jsonSizeEstimator) EstimateSize(v interface{}) (int, error) {
248+
// jsonpb must be used for proto structs.
249+
if protoMessage, ok := v.(proto.Message); ok {
250+
var buf bytes.Buffer
251+
err := e.marshaler.Marshal(&buf, protoMessage)
252+
return buf.Len(), err
253+
}
254+
243255
data, err := json.Marshal(v)
244256
if err != nil {
245257
return 0, err
246258
}
247259
return len(data), nil
248260
}
249261

250-
// NewJSONSizeEstimator returns a new SizeEstimator which uses json encoding to
251-
// estimate size
262+
// NewJSONSizeEstimator returns a new SizeEstimator which uses json encoding to estimate size
252263
func NewJSONSizeEstimator() SizeEstimator {
253264
return &jsonSizeEstimator{}
254265
}

common/archiver/historyIterator_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@ package archiver
2727
import (
2828
"errors"
2929
"testing"
30+
"time"
3031

3132
"github.com/stretchr/testify/mock"
3233
"github.com/stretchr/testify/require"
3334
"github.com/stretchr/testify/suite"
35+
enumspb "go.temporal.io/api/enums/v1"
3436
historypb "go.temporal.io/api/history/v1"
3537
"go.temporal.io/api/serviceerror"
38+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
3639

3740
archiverspb "go.temporal.io/server/api/archiver/v1"
3841
"go.temporal.io/server/common"
@@ -710,3 +713,32 @@ func (s *HistoryIteratorSuite) constructTestHistoryIterator(
710713
itr.sizeEstimator = newTestSizeEstimator()
711714
return itr
712715
}
716+
func (s *HistoryIteratorSuite) TestJSONSizeEstimator() {
717+
e := NewJSONSizeEstimator()
718+
719+
historyEvent := &historypb.HistoryEvent{
720+
EventId: 1,
721+
Timestamp: time.Now().UnixNano(),
722+
TaskId: 1,
723+
Version: 1,
724+
}
725+
historyEvent.EventType = enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
726+
historyEvent.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
727+
TaskQueue: &taskqueuepb.TaskQueue{
728+
Name: "taskQueue",
729+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
730+
},
731+
StartToCloseTimeoutSeconds: 10,
732+
Attempt: 1,
733+
}}
734+
735+
h := &historypb.History{
736+
Events: []*historypb.HistoryEvent{
737+
historyEvent,
738+
},
739+
}
740+
741+
size, err := e.EstimateSize(h)
742+
s.NoError(err)
743+
s.Equal(261, size)
744+
}

common/persistence/elasticsearch/decodeBench_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,15 @@ var (
4848
"WorkflowId": "6bfbc1e5-6ce4-4e22-bbfb-e0faa9a7a604-1-2256",
4949
"WorkflowType": "TestWorkflowExecute",
5050
"Encoding" : "proto3",
51-
"TaskQueue" : "taskQueue",
52-
"Memo" : "deadbeef====="}`)
51+
"TaskQueue" : "taskQueue",
52+
"Memo" : "deadbeef====="}`)
5353
)
5454

5555
/*
5656
BenchmarkJSONDecodeToType-8 200000 9321 ns/op
5757
BenchmarkJSONDecodeToMap-8 100000 12878 ns/op
5858
*/
5959

60-
//nolint
6160
func BenchmarkJSONDecodeToType(b *testing.B) {
6261
bytes := (*json.RawMessage)(&data)
6362
for i := 0; i < b.N; i++ {
@@ -71,14 +70,14 @@ func BenchmarkJSONDecodeToType(b *testing.B) {
7170
ExecutionTime: time.Unix(0, source.ExecutionTime),
7271
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
7372
TaskQueue: source.TaskQueue,
73+
CloseTime: time.Unix(0, source.CloseTime),
74+
Status: source.ExecutionStatus,
75+
HistoryLength: source.HistoryLength,
7476
}
75-
record.CloseTime = time.Unix(0, source.CloseTime)
76-
record.Status = source.ExecutionStatus
77-
record.HistoryLength = source.HistoryLength
77+
_ = record
7878
}
7979
}
8080

81-
//nolint
8281
func BenchmarkJSONDecodeToMap(b *testing.B) {
8382
for i := 0; i < b.N; i++ {
8483
var source map[string]interface{}

common/persistence/serializer.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ func (t *serializerImpl) serialize(input interface{}, encodingType common.Encodi
384384
return nil, nil
385385
}
386386

387+
// This should not pass proto struct down to JSON section.
387388
if p, ok := input.(proto.Marshaler); ok {
388389
return t.serializeProto(p, encodingType)
389390
}
@@ -394,6 +395,7 @@ func (t *serializerImpl) serialize(input interface{}, encodingType common.Encodi
394395
switch encodingType {
395396
case common.EncodingTypeJSON, common.EncodingTypeUnknown, common.EncodingTypeEmpty: // For backward-compatibility
396397
encodingType = common.EncodingTypeJSON
398+
// input should never be a proto struct.
397399
data, err = json.Marshal(input)
398400
default:
399401
return nil, NewUnknownEncodingTypeError(encodingType)
@@ -406,30 +408,6 @@ func (t *serializerImpl) serialize(input interface{}, encodingType common.Encodi
406408
return NewDataBlob(data, encodingType), nil
407409
}
408410

409-
func (t *serializerImpl) deserialize(data *serialization.DataBlob, target interface{}) error {
410-
if data == nil {
411-
return nil
412-
}
413-
if len(data.Data) == 0 {
414-
return NewDeserializationError("DeserializeEvent empty data")
415-
}
416-
var err error
417-
418-
switch data.GetEncoding() {
419-
case common.EncodingTypeProto3:
420-
return NewDeserializationError(fmt.Sprintf("proto requires proto specific deserialization"))
421-
case common.EncodingTypeJSON, common.EncodingTypeUnknown, common.EncodingTypeEmpty: // For backward-compatibility
422-
err = json.Unmarshal(data.Data, target)
423-
default:
424-
return NewUnknownEncodingTypeError(data.GetEncoding())
425-
}
426-
427-
if err != nil {
428-
return NewDeserializationError(fmt.Sprintf("DeserializeBatchEvents encoding: \"%v\", error: %v", data.Encoding, err.Error()))
429-
}
430-
return nil
431-
}
432-
433411
// NewUnknownEncodingTypeError returns a new instance of encoding type error
434412
func NewUnknownEncodingTypeError(encodingType common.EncodingType) error {
435413
return &UnknownEncodingTypeError{encodingType: encodingType}

tools/cli/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ func processJSONInput(c *cli.Context) *commonpb.Payloads {
755755
}
756756
p, err := payloads.Encode(jsons...)
757757
if err != nil {
758-
ErrorAndExit("Unable to encode Input.", err)
758+
ErrorAndExit("Unable to encode input.", err)
759759
}
760760

761761
return p

0 commit comments

Comments
 (0)