Skip to content

Commit 5134f5d

Browse files
yux0xwduan
authored andcommitted
Use GetCloseTime for generate close event task (#4797)
<!-- Describe what has changed in this PR --> **What changed?** Use GetCloseTime for generate close event task <!-- Tell your future self why have you made these changes --> **Why?** As we store the closed time in mutable state, we can get the close time from mutable state instead of reading it from history event. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?**
1 parent 1b86aa1 commit 5134f5d

File tree

7 files changed

+30
-39
lines changed

7 files changed

+30
-39
lines changed

service/history/workflow/mutable_state_impl.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2789,7 +2789,7 @@ func (ms *MutableStateImpl) AddCompletedWorkflowEvent(
27892789
}
27902790
// TODO merge active & passive task generation
27912791
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
2792-
event,
2792+
event.GetEventTime(),
27932793
false,
27942794
); err != nil {
27952795
return nil, err
@@ -2834,7 +2834,7 @@ func (ms *MutableStateImpl) AddFailWorkflowEvent(
28342834
}
28352835
// TODO merge active & passive task generation
28362836
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
2837-
event,
2837+
event.GetEventTime(),
28382838
false,
28392839
); err != nil {
28402840
return nil, err
@@ -2878,7 +2878,7 @@ func (ms *MutableStateImpl) AddTimeoutWorkflowEvent(
28782878
}
28792879
// TODO merge active & passive task generation
28802880
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
2881-
event,
2881+
event.GetEventTime(),
28822882
false,
28832883
); err != nil {
28842884
return nil, err
@@ -2959,7 +2959,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionCanceledEvent(
29592959
}
29602960
// TODO merge active & passive task generation
29612961
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
2962-
event,
2962+
event.GetEventTime(),
29632963
false,
29642964
); err != nil {
29652965
return nil, err
@@ -3509,7 +3509,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionTerminatedEvent(
35093509
}
35103510
// TODO merge active & passive task generation
35113511
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
3512-
event,
3512+
event.GetEventTime(),
35133513
deleteAfterTerminate,
35143514
); err != nil {
35153515
return nil, err
@@ -3752,7 +3752,7 @@ func (ms *MutableStateImpl) AddContinueAsNewEvent(
37523752
}
37533753
// TODO merge active & passive task generation
37543754
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
3755-
continueAsNewEvent,
3755+
continueAsNewEvent.GetEventTime(),
37563756
false,
37573757
); err != nil {
37583758
return nil, nil, err

service/history/workflow/mutable_state_rebuilder.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
560560
}
561561

562562
if err := taskGenerator.GenerateWorkflowCloseTasks(
563-
event,
563+
event.GetEventTime(),
564564
false,
565565
); err != nil {
566566
return nil, err
@@ -575,7 +575,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
575575
}
576576

577577
if err := taskGenerator.GenerateWorkflowCloseTasks(
578-
event,
578+
event.GetEventTime(),
579579
false,
580580
); err != nil {
581581
return nil, err
@@ -590,7 +590,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
590590
}
591591

592592
if err := taskGenerator.GenerateWorkflowCloseTasks(
593-
event,
593+
event.GetEventTime(),
594594
false,
595595
); err != nil {
596596
return nil, err
@@ -605,7 +605,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
605605
}
606606

607607
if err := taskGenerator.GenerateWorkflowCloseTasks(
608-
event,
608+
event.GetEventTime(),
609609
false,
610610
); err != nil {
611611
return nil, err
@@ -620,7 +620,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
620620
}
621621

622622
if err := taskGenerator.GenerateWorkflowCloseTasks(
623-
event,
623+
event.GetEventTime(),
624624
false,
625625
); err != nil {
626626
return nil, err
@@ -667,7 +667,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
667667
}
668668

669669
if err := taskGenerator.GenerateWorkflowCloseTasks(
670-
event,
670+
event.GetEventTime(),
671671
false,
672672
); err != nil {
673673
return nil, err

service/history/workflow/mutable_state_rebuilder_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut()
278278
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTimedoutEvent(event.GetEventId(), event).Return(nil)
279279
s.mockUpdateVersion(event)
280280
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
281-
event,
281+
&now,
282282
false,
283283
).Return(nil)
284284
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -310,7 +310,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated
310310
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTerminatedEvent(event.GetEventId(), event).Return(nil)
311311
s.mockUpdateVersion(event)
312312
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
313-
event,
313+
&now,
314314
false,
315315
).Return(nil)
316316
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -341,7 +341,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() {
341341
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionFailedEvent(event.GetEventId(), event).Return(nil)
342342
s.mockUpdateVersion(event)
343343
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
344-
event,
344+
&now,
345345
false,
346346
).Return(nil)
347347
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -373,7 +373,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted(
373373
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCompletedEvent(event.GetEventId(), event).Return(nil)
374374
s.mockUpdateVersion(event)
375375
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
376-
event,
376+
&now,
377377
false,
378378
).Return(nil)
379379
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -405,7 +405,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled()
405405
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCanceledEvent(event.GetEventId(), event).Return(nil)
406406
s.mockUpdateVersion(event)
407407
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
408-
event,
408+
&now,
409409
false,
410410
).Return(nil)
411411
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -503,7 +503,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
503503
s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes()
504504
s.mockUpdateVersion(continueAsNewEvent)
505505
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
506-
continueAsNewEvent,
506+
&now,
507507
false,
508508
).Return(nil)
509509
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
@@ -561,7 +561,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
561561
s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes()
562562
s.mockUpdateVersion(continueAsNewEvent)
563563
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
564-
continueAsNewEvent,
564+
&now,
565565
false,
566566
).Return(nil)
567567
s.mockMutableState.EXPECT().ClearStickyTaskQueue()

service/history/workflow/task_generator.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ type (
5050
startEvent *historypb.HistoryEvent,
5151
) error
5252
GenerateWorkflowCloseTasks(
53-
// TODO: remove closeEvent parameter
54-
// when deprecating the backward compatible logic
55-
// for getting close time from close event.
56-
closeEvent *historypb.HistoryEvent,
53+
closedTime *time.Time,
5754
deleteAfterClose bool,
5855
) error
5956
// GenerateDeleteHistoryEventTask adds a tasks.DeleteHistoryEventTask to the mutable state.
@@ -154,7 +151,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
154151
}
155152

156153
func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
157-
closeEvent *historypb.HistoryEvent,
154+
closedTime *time.Time,
158155
deleteAfterClose bool,
159156
) error {
160157

@@ -206,7 +203,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
206203
delay = retention
207204
}
208205
// archiveTime is the time when the archival queue recognizes the ArchiveExecutionTask as ready-to-process
209-
archiveTime := closeEvent.GetEventTime().Add(delay)
206+
archiveTime := timestamp.TimeValue(closedTime).Add(delay)
210207

211208
// This flag is only untrue for old server versions which were using the archival workflow instead of the
212209
// archival queue.
@@ -219,7 +216,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
219216
}
220217
closeTasks = append(closeTasks, task)
221218
} else {
222-
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())
219+
closeTime := timestamp.TimeValue(closedTime)
223220
if err := r.GenerateDeleteHistoryEventTask(closeTime, false); err != nil {
224221
return err
225222
}

service/history/workflow/task_generator_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/task_generator_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import (
5656
"github.com/stretchr/testify/assert"
5757
"github.com/stretchr/testify/require"
5858
"go.temporal.io/api/enums/v1"
59-
historypb "go.temporal.io/api/history/v1"
6059

6160
"go.temporal.io/server/api/persistence/v1"
6261
"go.temporal.io/server/common/archiver"
@@ -277,12 +276,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
277276
}).AnyTimes()
278277

279278
taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata)
280-
err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{
281-
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
282-
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
283-
},
284-
EventTime: timestamp.TimePtr(p.CloseEventTime),
285-
}, p.DeleteAfterClose)
279+
err := taskGenerator.GenerateWorkflowCloseTasks(timestamp.TimePtr(p.CloseEventTime), p.DeleteAfterClose)
286280
require.NoError(t, err)
287281

288282
var (

service/history/workflow/task_refresher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,13 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowClose(
188188

189189
executionState := mutableState.GetExecutionState()
190190
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
191-
closeEvent, err := mutableState.GetCompletionEvent(ctx)
191+
closeEventTime, err := mutableState.GetWorkflowCloseTime(ctx)
192192
if err != nil {
193193
return err
194194
}
195195

196196
return taskGenerator.GenerateWorkflowCloseTasks(
197-
closeEvent,
197+
closeEventTime,
198198
false,
199199
)
200200
}

0 commit comments

Comments
 (0)