Skip to content

Commit 606faf5

Browse files
authored
Fixes instances where an activity heartbeat timeout would get "lost" (#771)
If, for any reason, the cached in-memory mutable state of a Workflow were to be reset (e.g. shard loading or transient persistent error), any in-flight activities w/heartbeats for that Workflow would lose their associated heartbeat timer tasks. In other words, it could result in an activity whose heartbeat has long expired, but which the server doesn't trying to re-schedule. The fix is to populate the pendingActivityTimerHeartbeats map when loading the mutable state from the persistence layer if we have indicated that a heartbeat task has been created. This ensures that we don't have a pending activity without an in-memory record of a heartbeat when processing timer tasks.
1 parent 1410257 commit 606faf5

File tree

3 files changed

+7
-4
lines changed

3 files changed

+7
-4
lines changed

service/history/mutableStateBuilder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,11 @@ func (e *mutableStateBuilder) Load(
279279
e.pendingActivityInfoIDs = state.ActivityInfos
280280
for _, activityInfo := range state.ActivityInfos {
281281
e.pendingActivityIDToEventID[activityInfo.ActivityId] = activityInfo.ScheduleId
282+
if (activityInfo.TimerTaskStatus & timerTaskStatusCreatedHeartbeat) > 0 {
283+
// Sets last pending timer heartbeat to year 2000.
284+
// This ensures at least one heartbeat task will be processed for the pending activity.
285+
e.pendingActivityTimerHeartbeats[activityInfo.ScheduleId] = time.Unix(946684800, 0)
286+
}
282287
}
283288
e.pendingTimerInfoIDs = state.TimerInfos
284289
for _, timerInfo := range state.TimerInfos {

service/history/timerQueueActiveTaskExecutor_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,6 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat
824824
s.True(modified)
825825
task := mutableState.insertTimerTasks[0]
826826
s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, task.(*persistence.ActivityTimeoutTask).TimeoutType)
827-
protoTaskTime := task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp().Add(-time.Second)
828827
s.NoError(err)
829828
timerTask := &persistenceblobs.TimerTaskInfo{
830829
ScheduleAttempt: 1,
@@ -835,7 +834,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat
835834
TaskId: int64(100),
836835
TaskType: enumsspb.TASK_TYPE_ACTIVITY_TIMEOUT,
837836
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
838-
VisibilityTime: &protoTaskTime,
837+
VisibilityTime: &time.Time{},
839838
EventId: scheduledEvent.GetEventId(),
840839
}
841840

service/history/timerQueueStandbyTaskExecutor_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Heartbea
600600
s.True(modified)
601601
task := mutableState.insertTimerTasks[0]
602602
s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, task.(*persistence.ActivityTimeoutTask).TimeoutType)
603-
protoTaskTime := task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp().Add(-time.Second)
604603
s.NoError(err)
605604
timerTask := &persistenceblobs.TimerTaskInfo{
606605
ScheduleAttempt: 1,
@@ -611,7 +610,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Heartbea
611610
TaskId: int64(100),
612611
TaskType: enumsspb.TASK_TYPE_ACTIVITY_TIMEOUT,
613612
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
614-
VisibilityTime: &protoTaskTime,
613+
VisibilityTime: &time.Time{},
615614
EventId: scheduledEvent.GetEventId(),
616615
}
617616

0 commit comments

Comments
 (0)