Skip to content

Commit 95261ce

Browse files
yux0alexshtin
authored andcommitted
Support NDC raw histroy in message parser (#3227)
1 parent d516b40 commit 95261ce

File tree

1 file changed

+83
-7
lines changed

1 file changed

+83
-7
lines changed

tools/cli/adminKafkaCommands.go

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ import (
4545
"github.com/gocql/gocql"
4646
"github.com/urfave/cli"
4747
commonpb "go.temporal.io/api/common/v1"
48+
historypb "go.temporal.io/api/history/v1"
4849
"gopkg.in/yaml.v2"
4950

5051
"go.temporal.io/server/api/adminservice/v1"
52+
enumsspb "go.temporal.io/server/api/enums/v1"
5153
indexerspb "go.temporal.io/server/api/indexer/v1"
5254
replicationspb "go.temporal.io/server/api/replication/v1"
5355
"go.temporal.io/server/common"
@@ -59,10 +61,12 @@ import (
5961
"go.temporal.io/server/common/persistence/cassandra"
6062
)
6163

62-
type filterFn func(*replicationspb.ReplicationTask) bool
63-
type filterFnForVisibility func(*indexerspb.Message) bool
64+
type (
65+
filterFn func(*replicationspb.ReplicationTask) bool
66+
filterFnForVisibility func(*indexerspb.Message) bool
6467

65-
type kafkaMessageType int
68+
kafkaMessageType int
69+
)
6670

6771
const (
6872
kafkaMessageTypeReplicationTask kafkaMessageType = iota
@@ -121,13 +125,14 @@ func AdminKafkaParse(c *cli.Context) {
121125
readerCh := make(chan []byte, chanBufferSize)
122126
writerCh := newWriterChannel(kafkaMessageType(c.Int(FlagMessageType)))
123127
doneCh := make(chan struct{})
128+
serializer := persistence.NewPayloadSerializer()
124129

125130
var skippedCount int32
126131
skipErrMode := c.Bool(FlagSkipErrorMode)
127132

128133
go startReader(inputFile, readerCh)
129134
go startParser(readerCh, writerCh, skipErrMode, &skippedCount)
130-
go startWriter(outputFile, writerCh, doneCh, &skippedCount, c)
135+
go startWriter(outputFile, writerCh, doneCh, &skippedCount, serializer, c)
131136

132137
<-doneCh
133138

@@ -242,6 +247,7 @@ func startWriter(
242247
writerCh *writerChannel,
243248
doneCh chan struct{},
244249
skippedCount *int32,
250+
serializer persistence.PayloadSerializer,
245251
c *cli.Context,
246252
) {
247253

@@ -252,7 +258,7 @@ func startWriter(
252258

253259
switch writerCh.Type {
254260
case kafkaMessageTypeReplicationTask:
255-
writeReplicationTask(outputFile, writerCh, skippedCount, skipErrMode, headerMode, c)
261+
writeReplicationTask(outputFile, writerCh, skippedCount, skipErrMode, headerMode, serializer, c)
256262
case kafkaMessageTypeVisibilityMsg:
257263
writeVisibilityMessage(outputFile, writerCh, skippedCount, skipErrMode, headerMode, c)
258264
}
@@ -264,10 +270,10 @@ func writeReplicationTask(
264270
skippedCount *int32,
265271
skipErrMode bool,
266272
headerMode bool,
273+
serializer persistence.PayloadSerializer,
267274
c *cli.Context,
268275
) {
269276
filter := buildFilterFn(c.String(FlagWorkflowID), c.String(FlagRunID))
270-
encoder := codec.NewJSONPBEncoder()
271277
Loop:
272278
for {
273279
select {
@@ -276,7 +282,7 @@ Loop:
276282
break Loop
277283
}
278284
if filter(task) {
279-
jsonStr, err := encoder.Encode(task)
285+
jsonStr, err := decodeReplicationTask(task, serializer)
280286
if err != nil {
281287
if !skipErrMode {
282288
ErrorAndExit(malformedMessage, fmt.Errorf("failed to encode into json, err: %v", err))
@@ -912,3 +918,73 @@ func loadBrokerConfig(hostFile string, cluster string) ([]string, *tls.Config, e
912918
}
913919
return nil, nil, fmt.Errorf("failed to load broker for cluster %v", cluster)
914920
}
921+
922+
func decodeReplicationTask(
923+
task *replicationspb.ReplicationTask,
924+
serializer persistence.PayloadSerializer,
925+
) ([]byte, error) {
926+
encoder := codec.NewJSONPBIndentEncoder(" ")
927+
switch task.GetTaskType() {
928+
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
929+
historyV2 := task.GetHistoryTaskV2Attributes()
930+
events, err := serializer.DeserializeBatchEvents(
931+
persistence.NewDataBlobFromProto(historyV2.Events),
932+
)
933+
if err != nil {
934+
return nil, err
935+
}
936+
var newRunEvents []*historypb.HistoryEvent
937+
if historyV2.GetNewRunEvents() != nil {
938+
newRunEvents, err = serializer.DeserializeBatchEvents(
939+
persistence.NewDataBlobFromProto(historyV2.NewRunEvents),
940+
)
941+
if err != nil {
942+
return nil, err
943+
}
944+
}
945+
historyV2.Events = nil
946+
historyV2.NewRunEvents = nil
947+
948+
var buf bytes.Buffer
949+
buf.WriteString("{")
950+
951+
buf.WriteString(`"Task":`)
952+
encodedTask, err := encoder.Encode(task)
953+
if err != nil {
954+
buf.WriteString(fmt.Sprintf(`"%v"`, err))
955+
}
956+
buf.Write(encodedTask)
957+
buf.WriteString(",")
958+
959+
buf.WriteString(`"Events":`)
960+
buf.WriteString("[")
961+
for _, event := range events {
962+
encodedEvent, err := encoder.Encode(event)
963+
if err != nil {
964+
buf.WriteString(fmt.Sprintf(`"%v"`, err))
965+
}
966+
buf.Write(encodedEvent)
967+
buf.WriteString(",")
968+
}
969+
buf.WriteString("]")
970+
buf.WriteString(",")
971+
972+
buf.WriteString(`"NewRunEvents":`)
973+
buf.WriteString("[")
974+
for _, event := range newRunEvents {
975+
encodedEvent, err := encoder.Encode(event)
976+
if err != nil {
977+
buf.WriteString(fmt.Sprintf(`"%v"`, err))
978+
}
979+
buf.Write(encodedEvent)
980+
buf.WriteString(",")
981+
}
982+
buf.WriteString("]")
983+
984+
buf.WriteString("}")
985+
986+
return buf.Bytes(), nil
987+
default:
988+
return encoder.Encode(task)
989+
}
990+
}

0 commit comments

Comments
 (0)