Skip to content

Commit 0d7350d

Browse files
rodrigozhoumindaugasrukas
authored andcommitted
Get execution start/close time when using cassandra visibility store (#4549)
1 parent 0c441a7 commit 0d7350d

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

service/history/deletemanager/delete_manager.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"go.temporal.io/server/common/namespace"
4141
"go.temporal.io/server/common/persistence"
4242
"go.temporal.io/server/common/persistence/visibility/manager"
43+
"go.temporal.io/server/common/persistence/visibility/store/standard/cassandra"
4344
"go.temporal.io/server/common/primitives"
4445
"go.temporal.io/server/common/searchattribute"
4546
"go.temporal.io/server/service/history/configs"
@@ -210,15 +211,17 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
210211
// TODO (alex): Remove them when cassandra standard visibility is removed.
211212
var startTime *time.Time
212213
var closeTime *time.Time
213-
// There are cases when workflow execution is closed but visibility is not updated and still open.
214-
// This happens, for example, when workflow execution is deleted right from CloseExecutionTask.
215-
// Therefore, force to delete from open visibility regardless of execution state.
216-
if forceDeleteFromOpenVisibility || ms.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
217-
startTime = ms.GetExecutionInfo().GetStartTime()
218-
} else {
219-
closeTime, err = ms.GetWorkflowCloseTime(ctx)
220-
if err != nil {
221-
return err
214+
if m.visibilityManager.HasStoreName(cassandra.CassandraPersistenceName) {
215+
// There are cases when workflow execution is closed but visibility is not updated and still open.
216+
// This happens, for example, when workflow execution is deleted right from CloseExecutionTask.
217+
// Therefore, force to delete from open visibility regardless of execution state.
218+
if forceDeleteFromOpenVisibility || ms.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
219+
startTime = ms.GetExecutionInfo().GetStartTime()
220+
} else {
221+
closeTime, err = ms.GetWorkflowCloseTime(ctx)
222+
if err != nil {
223+
return err
224+
}
222225
}
223226
}
224227

service/history/deletemanager/delete_manager_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"go.temporal.io/server/common/metrics"
4848
"go.temporal.io/server/common/namespace"
4949
"go.temporal.io/server/common/persistence/visibility/manager"
50+
"go.temporal.io/server/common/persistence/visibility/store/standard/cassandra"
5051
"go.temporal.io/server/common/primitives"
5152
"go.temporal.io/server/common/searchattribute"
5253
"go.temporal.io/server/service/history/shard"
@@ -122,6 +123,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteDeletedWorkflowExecution() {
122123
RunId: tests.RunID,
123124
}
124125

126+
s.mockVisibilityManager.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true)
125127
mockWeCtx := workflow.NewMockContext(s.controller)
126128
mockMutableState := workflow.NewMockMutableState(s.controller)
127129
mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil)
@@ -167,6 +169,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteDeletedWorkflowExecution_Error()
167169
RunId: tests.RunID,
168170
}
169171

172+
s.mockVisibilityManager.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true)
170173
mockWeCtx := workflow.NewMockContext(s.controller)
171174
mockMutableState := workflow.NewMockMutableState(s.controller)
172175
mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{22, 8, 78}, nil)
@@ -212,6 +215,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecution_OpenWorkflow()
212215
}
213216
now := time.Now()
214217

218+
s.mockVisibilityManager.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true)
215219
mockWeCtx := workflow.NewMockContext(s.controller)
216220
mockMutableState := workflow.NewMockMutableState(s.controller)
217221
closeExecutionVisibilityTaskID := int64(39)
@@ -257,6 +261,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
257261
RunId: tests.RunID,
258262
}
259263

264+
s.mockVisibilityManager.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true)
260265
mockWeCtx := workflow.NewMockContext(s.controller)
261266
mockMutableState := workflow.NewMockMutableState(s.controller)
262267

@@ -332,6 +337,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
332337
RunId: tests.RunID,
333338
}
334339

340+
s.mockVisibilityManager.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true)
335341
mockWeCtx := workflow.NewMockContext(s.controller)
336342
mockMutableState := workflow.NewMockMutableState(s.controller)
337343
branchToken := []byte{22, 8, 78}

0 commit comments

Comments
 (0)