Skip to content

Commit 57347cb

Browse files
yux0yycptt
authored andcommitted
Flush buffer event when failing workflow task (#4010)
* Flush buffer event when failing workflow task
1 parent dcaa3a3 commit 57347cb

File tree

4 files changed

+117
-3
lines changed

4 files changed

+117
-3
lines changed

service/history/workflowTaskHandlerCallbacks.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,9 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
548548
newMutableState = nil
549549

550550
if wtFailedCause.workflowFailure != nil {
551+
// Flush buffer event before failing the workflow
552+
ms.FlushBufferedEvents()
553+
551554
attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
552555
Failure: wtFailedCause.workflowFailure,
553556
}

tests/onebox.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ type (
127127
NumHistoryHosts int
128128
HistoryCountLimitError int
129129
HistoryCountLimitWarn int
130+
BlobSizeLimitError int
131+
BlobSizeLimitWarn int
130132
}
131133

132134
// TemporalParams contains everything needed to bootstrap Temporal
@@ -699,6 +701,12 @@ func (c *temporalImpl) overrideHistoryDynamicConfig(client *dcClient) {
699701
if c.historyConfig.HistoryCountLimitError != 0 {
700702
client.OverrideValue(dynamicconfig.HistoryCountLimitError, c.historyConfig.HistoryCountLimitError)
701703
}
704+
if c.historyConfig.BlobSizeLimitError != 0 {
705+
client.OverrideValue(dynamicconfig.BlobSizeLimitError, c.historyConfig.BlobSizeLimitError)
706+
}
707+
if c.historyConfig.BlobSizeLimitWarn != 0 {
708+
client.OverrideValue(dynamicconfig.BlobSizeLimitWarn, c.historyConfig.BlobSizeLimitWarn)
709+
}
702710

703711
// For DeleteWorkflowExecution tests
704712
client.OverrideValue(dynamicconfig.TransferProcessorUpdateAckInterval, 1*time.Second)

tests/sizelimit_test.go

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@ import (
2828
"bytes"
2929
"encoding/binary"
3030
"flag"
31+
"math/rand"
3132
"testing"
3233
"time"
3334

3435
"github.com/pborman/uuid"
3536
"github.com/stretchr/testify/require"
3637
"github.com/stretchr/testify/suite"
37-
enumspb "go.temporal.io/api/enums/v1"
38-
"go.temporal.io/api/serviceerror"
39-
4038
commandpb "go.temporal.io/api/command/v1"
4139
commonpb "go.temporal.io/api/common/v1"
40+
enumspb "go.temporal.io/api/enums/v1"
4241
filterpb "go.temporal.io/api/filter/v1"
4342
historypb "go.temporal.io/api/history/v1"
43+
"go.temporal.io/api/serviceerror"
4444
taskqueuepb "go.temporal.io/api/taskqueue/v1"
4545
"go.temporal.io/api/workflowservice/v1"
4646

@@ -243,3 +243,104 @@ SignalLoop:
243243
}
244244
s.True(isCloseCorrect)
245245
}
246+
247+
func (s *sizeLimitIntegrationSuite) TestWorkflowFailed_PayloadSizeTooLarge() {
248+
249+
id := "integration-workflow-failed-large-payload"
250+
wt := "integration-workflow-failed-large-payload-type"
251+
tl := "integration-workflow-failed-large-payload-taskqueue"
252+
identity := "worker1"
253+
254+
var largePayload []byte
255+
for i := 0; i < 101; i++ {
256+
largePayload = append(largePayload, byte(rand.Int()))
257+
}
258+
pl, err := payloads.Encode(largePayload)
259+
s.NoError(err)
260+
sigReadyToSendChan := make(chan struct{}, 1)
261+
sigSendDoneChan := make(chan struct{})
262+
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
263+
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
264+
select {
265+
case sigReadyToSendChan <- struct{}{}:
266+
default:
267+
}
268+
269+
select {
270+
case <-sigSendDoneChan:
271+
}
272+
return []*commandpb.Command{
273+
{
274+
CommandType: enumspb.COMMAND_TYPE_RECORD_MARKER,
275+
Attributes: &commandpb.Command_RecordMarkerCommandAttributes{
276+
RecordMarkerCommandAttributes: &commandpb.RecordMarkerCommandAttributes{
277+
MarkerName: "large-payload",
278+
Details: map[string]*commonpb.Payloads{"test": pl},
279+
},
280+
},
281+
},
282+
}, nil
283+
}
284+
poller := &TaskPoller{
285+
Engine: s.engine,
286+
Namespace: s.namespace,
287+
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
288+
Identity: identity,
289+
WorkflowTaskHandler: wtHandler,
290+
ActivityTaskHandler: nil,
291+
Logger: s.Logger,
292+
T: s.T(),
293+
}
294+
295+
request := &workflowservice.StartWorkflowExecutionRequest{
296+
RequestId: uuid.New(),
297+
Namespace: s.namespace,
298+
WorkflowId: id,
299+
WorkflowType: &commonpb.WorkflowType{Name: wt},
300+
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
301+
Input: nil,
302+
WorkflowTaskTimeout: timestamp.DurationPtr(60 * time.Second),
303+
Identity: identity,
304+
}
305+
306+
we, err := s.engine.StartWorkflowExecution(NewContext(), request)
307+
s.NoError(err)
308+
309+
go func() {
310+
_, err = poller.PollAndProcessWorkflowTask(false, false)
311+
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
312+
}()
313+
314+
select {
315+
case <-sigReadyToSendChan:
316+
}
317+
318+
_, err = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
319+
Namespace: s.namespace,
320+
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: id, RunId: we.GetRunId()},
321+
SignalName: "signal-name",
322+
Identity: identity,
323+
RequestId: uuid.New(),
324+
})
325+
s.NoError(err)
326+
close(sigSendDoneChan)
327+
328+
verifyWorkflowFailed := false
329+
for i := 0; i < 10; i++ {
330+
lastEvent := s.getLastEvent(s.namespace, &commonpb.WorkflowExecution{WorkflowId: id, RunId: we.GetRunId()})
331+
if enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED == lastEvent.GetEventType() {
332+
verifyWorkflowFailed = true
333+
}
334+
time.Sleep(time.Second)
335+
}
336+
if !verifyWorkflowFailed {
337+
s.Fail("The workflow is expected to fail but it is not.")
338+
}
339+
histories := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: id, RunId: we.GetRunId()})
340+
for _, event := range histories {
341+
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
342+
return
343+
}
344+
}
345+
s.Fail("Missing signal event")
346+
}

tests/testdata/integration_sizelimit_cluster.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ historyconfig:
55
numhistoryhosts: 1
66
historycountlimiterror: 20
77
historycountlimitwarn: 10
8+
blobsizelimiterror: 100
9+
blobsizelimitwarn: 1
810
workerconfig:
911
enablearchiver: false
1012
enablereplicator: false

0 commit comments

Comments
 (0)