Skip to content

Commit 503f037

Browse files
Support time.Time in MutableState#ActivityWithTimerHeartbeat methods (#666)
1 parent e8e21a8 commit 503f037

16 files changed

+117
-110
lines changed

common/archiver/filestore/visibilityArchiver_test.go

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,16 @@ func (s *visibilityArchiverSuite) TestArchive_Fail_InvalidURI() {
129129
URI, err := archiver.NewURI("wrongscheme://")
130130
s.NoError(err)
131131
request := &archiverproto.ArchiveVisibilityRequest{
132-
Namespace: testNamespace,
133-
NamespaceId: testNamespaceID,
134-
WorkflowId: testWorkflowID,
135-
RunId: testRunID,
136-
WorkflowTypeName: testWorkflowTypeName,
137-
StartTime: timestamp.TimeNowPtrUtc(),
138-
ExecutionTime: nil, // workflow without backoff
139-
CloseTime: timestamp.TimeNowPtrUtc(),
140-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
141-
HistoryLength: int64(101),
132+
Namespace: testNamespace,
133+
NamespaceId: testNamespaceID,
134+
WorkflowId: testWorkflowID,
135+
RunId: testRunID,
136+
WorkflowTypeName: testWorkflowTypeName,
137+
StartTime: timestamp.TimeNowPtrUtc(),
138+
ExecutionTime: nil, // workflow without backoff
139+
CloseTime: timestamp.TimeNowPtrUtc(),
140+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
141+
HistoryLength: int64(101),
142142
}
143143
err = visibilityArchiver.Archive(context.Background(), URI, request)
144144
s.Error(err)
@@ -170,16 +170,16 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
170170
visibilityArchiver := s.newTestVisibilityArchiver()
171171
closeTimestamp := timestamp.TimeNowPtrUtc()
172172
request := &archiverproto.ArchiveVisibilityRequest{
173-
NamespaceId: testNamespaceID,
174-
Namespace: testNamespace,
175-
WorkflowId: testWorkflowID,
176-
RunId: testRunID,
177-
WorkflowTypeName: testWorkflowTypeName,
178-
StartTime: timestamp.TimePtr(closeTimestamp.Add(-time.Hour)),
179-
ExecutionTime: nil, // workflow without backoff
180-
CloseTime: closeTimestamp,
181-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
182-
HistoryLength: int64(101),
173+
NamespaceId: testNamespaceID,
174+
Namespace: testNamespace,
175+
WorkflowId: testWorkflowID,
176+
RunId: testRunID,
177+
WorkflowTypeName: testWorkflowTypeName,
178+
StartTime: timestamp.TimePtr(closeTimestamp.Add(-time.Hour)),
179+
ExecutionTime: nil, // workflow without backoff
180+
CloseTime: closeTimestamp,
181+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
182+
HistoryLength: int64(101),
183183
Memo: &commonpb.Memo{
184184
Fields: map[string]*commonpb.Payload{
185185
"testFields": payload.EncodeBytes([]byte{1, 2, 3}),
@@ -253,7 +253,7 @@ func (s *visibilityArchiverSuite) TestMatchQuery() {
253253
runID: convert.StringPtr("random runID"),
254254
},
255255
record: &archiverproto.ArchiveVisibilityRequest{
256-
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
256+
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
257257
WorkflowId: "random workflowID",
258258
RunId: "random runID",
259259
WorkflowTypeName: "random type name",
@@ -279,7 +279,7 @@ func (s *visibilityArchiverSuite) TestMatchQuery() {
279279
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW),
280280
},
281281
record: &archiverproto.ArchiveVisibilityRequest{
282-
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
282+
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
283283
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
284284
WorkflowTypeName: "some random type name",
285285
},
@@ -518,58 +518,58 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
518518
WorkflowId: testWorkflowID,
519519
RunId: testRunID,
520520
WorkflowTypeName: testWorkflowTypeName,
521-
StartTime: timestamp.UnixOrZeroTimePtr(1),
522-
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
521+
StartTime: timestamp.UnixOrZeroTimePtr(1),
522+
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
523523
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
524524
HistoryLength: 101,
525525
},
526526
{
527-
NamespaceId: testNamespaceID,
528-
Namespace: testNamespace,
529-
WorkflowId: "some random workflow ID",
530-
RunId: "some random run ID",
531-
WorkflowTypeName: testWorkflowTypeName,
532-
StartTime: timestamp.UnixOrZeroTimePtr(2),
533-
ExecutionTime: nil,
534-
CloseTime: timestamp.UnixOrZeroTimePtr(1000),
535-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
536-
HistoryLength: 123,
527+
NamespaceId: testNamespaceID,
528+
Namespace: testNamespace,
529+
WorkflowId: "some random workflow ID",
530+
RunId: "some random run ID",
531+
WorkflowTypeName: testWorkflowTypeName,
532+
StartTime: timestamp.UnixOrZeroTimePtr(2),
533+
ExecutionTime: nil,
534+
CloseTime: timestamp.UnixOrZeroTimePtr(1000),
535+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
536+
HistoryLength: 123,
537537
},
538538
{
539-
NamespaceId: testNamespaceID,
540-
Namespace: testNamespace,
541-
WorkflowId: "another workflow ID",
542-
RunId: "another run ID",
543-
WorkflowTypeName: testWorkflowTypeName,
544-
StartTime: timestamp.UnixOrZeroTimePtr(3),
545-
ExecutionTime: nil,
546-
CloseTime: timestamp.UnixOrZeroTimePtr(10),
547-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
548-
HistoryLength: 456,
539+
NamespaceId: testNamespaceID,
540+
Namespace: testNamespace,
541+
WorkflowId: "another workflow ID",
542+
RunId: "another run ID",
543+
WorkflowTypeName: testWorkflowTypeName,
544+
StartTime: timestamp.UnixOrZeroTimePtr(3),
545+
ExecutionTime: nil,
546+
CloseTime: timestamp.UnixOrZeroTimePtr(10),
547+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
548+
HistoryLength: 456,
549549
},
550550
{
551-
NamespaceId: testNamespaceID,
552-
Namespace: testNamespace,
553-
WorkflowId: "and another workflow ID",
554-
RunId: "and another run ID",
555-
WorkflowTypeName: testWorkflowTypeName,
556-
StartTime: timestamp.UnixOrZeroTimePtr(3),
557-
ExecutionTime: nil,
558-
CloseTime: timestamp.UnixOrZeroTimePtr(5),
559-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
560-
HistoryLength: 456,
551+
NamespaceId: testNamespaceID,
552+
Namespace: testNamespace,
553+
WorkflowId: "and another workflow ID",
554+
RunId: "and another run ID",
555+
WorkflowTypeName: testWorkflowTypeName,
556+
StartTime: timestamp.UnixOrZeroTimePtr(3),
557+
ExecutionTime: nil,
558+
CloseTime: timestamp.UnixOrZeroTimePtr(5),
559+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
560+
HistoryLength: 456,
561561
},
562562
{
563-
NamespaceId: "some random namespace ID",
564-
Namespace: "some random namespace name",
565-
WorkflowId: "another workflow ID",
566-
RunId: "another run ID",
567-
WorkflowTypeName: testWorkflowTypeName,
568-
StartTime: timestamp.UnixOrZeroTimePtr(3),
569-
ExecutionTime: nil,
570-
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
571-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
572-
HistoryLength: 456,
563+
NamespaceId: "some random namespace ID",
564+
Namespace: "some random namespace name",
565+
WorkflowId: "another workflow ID",
566+
RunId: "another run ID",
567+
WorkflowTypeName: testWorkflowTypeName,
568+
StartTime: timestamp.UnixOrZeroTimePtr(3),
569+
ExecutionTime: nil,
570+
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
571+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
572+
HistoryLength: 456,
573573
},
574574
}
575575

common/archiver/s3store/visibilityArchiver_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,16 @@ func (s *visibilityArchiverSuite) TestArchive_Fail_InvalidURI() {
160160
URI, err := archiver.NewURI("wrongscheme://")
161161
s.NoError(err)
162162
request := &archiverproto.ArchiveVisibilityRequest{
163-
Namespace: testNamespace,
164-
NamespaceId: testNamespaceID,
165-
WorkflowId: testWorkflowID,
166-
RunId: testRunID,
167-
WorkflowTypeName: testWorkflowTypeName,
168-
StartTime: timestamp.TimeNowPtrUtc(),
169-
ExecutionTime: nil, // workflow without backoff
170-
CloseTime: timestamp.TimeNowPtrUtc(),
171-
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
172-
HistoryLength: int64(101),
163+
Namespace: testNamespace,
164+
NamespaceId: testNamespaceID,
165+
WorkflowId: testWorkflowID,
166+
RunId: testRunID,
167+
WorkflowTypeName: testWorkflowTypeName,
168+
StartTime: timestamp.TimeNowPtrUtc(),
169+
ExecutionTime: nil, // workflow without backoff
170+
CloseTime: timestamp.TimeNowPtrUtc(),
171+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
172+
HistoryLength: int64(101),
173173
}
174174
err = visibilityArchiver.Archive(context.Background(), URI, request)
175175
s.Error(err)
@@ -437,8 +437,8 @@ func (s *visibilityArchiverSuite) TestArchiveAndQueryPrecisions() {
437437
WorkflowId: testWorkflowID,
438438
RunId: fmt.Sprintf("%s-%d", testRunID, i),
439439
WorkflowTypeName: testWorkflowTypeName,
440-
StartTime: timestamp.UnixOrZeroTimePtr(testData.day*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
441-
CloseTime: timestamp.UnixOrZeroTimePtr((testData.day+30)*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
440+
StartTime: timestamp.UnixOrZeroTimePtr(testData.day*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
441+
CloseTime: timestamp.UnixOrZeroTimePtr((testData.day+30)*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
442442
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
443443
HistoryLength: 101,
444444
}
@@ -574,8 +574,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
574574
WorkflowId: testWorkflowID,
575575
RunId: testRunID,
576576
WorkflowTypeName: testWorkflowTypeName,
577-
StartTime: timestamp.UnixOrZeroTimePtr(1),
578-
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds()),
577+
StartTime: timestamp.UnixOrZeroTimePtr(1),
578+
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds()),
579579
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
580580
HistoryLength: 101,
581581
},
@@ -585,8 +585,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
585585
WorkflowId: testWorkflowID,
586586
RunId: testRunID + "1",
587587
WorkflowTypeName: testWorkflowTypeName,
588-
StartTime: timestamp.UnixOrZeroTimePtr(1),
589-
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds() + 30*time.Minute.Nanoseconds()),
588+
StartTime: timestamp.UnixOrZeroTimePtr(1),
589+
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds() + 30*time.Minute.Nanoseconds()),
590590
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
591591
HistoryLength: 101,
592592
},
@@ -596,8 +596,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
596596
WorkflowId: testWorkflowID,
597597
RunId: testRunID + "1",
598598
WorkflowTypeName: testWorkflowTypeName,
599-
StartTime: timestamp.UnixOrZeroTimePtr(1),
600-
CloseTime: timestamp.UnixOrZeroTimePtr(3 * time.Hour.Nanoseconds()),
599+
StartTime: timestamp.UnixOrZeroTimePtr(1),
600+
CloseTime: timestamp.UnixOrZeroTimePtr(3 * time.Hour.Nanoseconds()),
601601
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
602602
HistoryLength: 101,
603603
},

common/log/tag/tags.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,16 @@ func QueryLevel(s time.Time) Tag {
854854
return newTimeTag("query-level", s)
855855
}
856856

857+
// QueryLevel returns tag for query level
858+
func MinQueryLevel(s time.Time) Tag {
859+
return newTimeTag("min-query-level", s)
860+
}
861+
862+
// QueryLevel returns tag for query level
863+
func MaxQueryLevel(s time.Time) Tag {
864+
return newTimeTag("max-query-level", s)
865+
}
866+
857867
// TaskQueueInfo returns tag for task queue info
858868
func TaskQueueInfo(s interface{}) Tag {
859869
return newObjectTag("task-queue-info", s)

common/persistence/dataInterfaces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,12 +2077,12 @@ func (a *SyncActivityTask) SetVisibilityTimestamp(timestamp time.Time) {
20772077

20782078
// DBTimestampToUnixNano converts CQL timestamp to UnixNano
20792079
func DBTimestampToUnixNano(milliseconds int64) int64 {
2080-
return milliseconds * 1000 * 1000 // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-3) - (-9) = 6, so multiply by 10⁶
2080+
return (time.Duration(milliseconds) * time.Millisecond).Nanoseconds()
20812081
}
20822082

20832083
// UnixNanoToDBTimestamp converts UnixNano to CQL timestamp
20842084
func UnixNanoToDBTimestamp(timestamp int64) int64 {
2085-
return timestamp / (1000 * 1000) // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-9) - (-3) = -6, so divide by 10⁶
2085+
return time.Duration(timestamp).Milliseconds()
20862086
}
20872087

20882088
// NewHistoryBranchToken return a new branch token

common/persistence/historyStore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package persistence
2626

2727
import (
2828
"fmt"
29+
2930
"github.com/pborman/uuid"
3031
enumspb "go.temporal.io/api/enums/v1"
3132
historypb "go.temporal.io/api/history/v1"

common/persistence/sql/sqlplugin/mysql/execution_maps.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,12 @@ func expandBatchInsertQuery(q string, rowCount int) string {
160160
// into the sql query when we don't use a semicolon when we don't terminate with a `;`.
161161
// Removing that while writing the second half of the query if it exists.
162162
lastIdx := strings.LastIndex(q, ",")
163-
if lastIdx == -1 || lastIdx <= valEndIdx{
163+
if lastIdx == -1 || lastIdx <= valEndIdx {
164164
b.WriteString(q[valEndIdx:])
165165
} else {
166166
b.WriteString(q[valEndIdx:lastIdx])
167167
}
168168

169-
170169
return b.String()
171170
}
172171

common/persistence/sql/sqlplugin/mysql/visibility.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ const (
3939
`ON DUPLICATE KEY UPDATE ` +
4040
`run_id=VALUES(run_id)`
4141

42-
4342
templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` +
4443
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding) ` +
4544
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +

service/history/mutableState.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type (
125125
FlushBufferedEvents() error
126126
GetActivityByActivityID(string) (*persistenceblobs.ActivityInfo, bool)
127127
GetActivityInfo(int64) (*persistenceblobs.ActivityInfo, bool)
128-
GetActivityInfoWithTimerHeartbeat(int64) (*persistenceblobs.ActivityInfo, int64, bool)
128+
GetActivityInfoWithTimerHeartbeat(scheduleEventID int64) (*persistenceblobs.ActivityInfo, time.Time, bool)
129129
GetActivityScheduledEvent(int64) (*historypb.HistoryEvent, error)
130130
GetChildExecutionInfo(int64) (*persistenceblobs.ChildExecutionInfo, bool)
131131
GetChildExecutionInitiatedEvent(int64) (*historypb.HistoryEvent, error)
@@ -219,7 +219,7 @@ type (
219219
SetHistoryTree(treeID string) error
220220
SetVersionHistories(*persistence.VersionHistories) error
221221
UpdateActivity(*persistenceblobs.ActivityInfo) error
222-
UpdateActivityWithTimerHeartbeat(*persistenceblobs.ActivityInfo, int64) error
222+
UpdateActivityWithTimerHeartbeat(*persistenceblobs.ActivityInfo, time.Time) error
223223
UpdateActivityProgress(ai *persistenceblobs.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
224224
UpdateWorkflowTask(*workflowTaskInfo)
225225
UpdateReplicationStateVersion(int64, bool)

service/history/mutableStateBuilder.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ var (
9292

9393
type (
9494
mutableStateBuilder struct {
95-
pendingActivityTimerHeartbeats map[int64]int64 // Schedule Event ID -> LastHeartbeatTimeoutVisibilityInSeconds.
95+
pendingActivityTimerHeartbeats map[int64]time.Time // Schedule Event ID -> LastHeartbeatTimeoutVisibilityInSeconds.
9696
pendingActivityInfoIDs map[int64]*persistenceblobs.ActivityInfo // Schedule Event ID -> Activity Info.
9797
pendingActivityIDToEventID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
9898
updateActivityInfos map[*persistenceblobs.ActivityInfo]struct{} // Modified activities from last update.
@@ -180,7 +180,7 @@ func newMutableStateBuilder(
180180
) *mutableStateBuilder {
181181
s := &mutableStateBuilder{
182182
updateActivityInfos: make(map[*persistenceblobs.ActivityInfo]struct{}),
183-
pendingActivityTimerHeartbeats: make(map[int64]int64),
183+
pendingActivityTimerHeartbeats: make(map[int64]time.Time),
184184
pendingActivityInfoIDs: make(map[int64]*persistenceblobs.ActivityInfo),
185185
pendingActivityIDToEventID: make(map[string]int64),
186186
deleteActivityInfos: make(map[int64]struct{}),
@@ -1038,7 +1038,7 @@ func (e *mutableStateBuilder) GetActivityInfo(
10381038
// GetActivityInfo gives details about an activity that is currently in progress.
10391039
func (e *mutableStateBuilder) GetActivityInfoWithTimerHeartbeat(
10401040
scheduleEventID int64,
1041-
) (*persistenceblobs.ActivityInfo, int64, bool) {
1041+
) (*persistenceblobs.ActivityInfo, time.Time, bool) {
10421042
ai, ok := e.pendingActivityInfoIDs[scheduleEventID]
10431043
timerVis, ok := e.pendingActivityTimerHeartbeats[scheduleEventID]
10441044

@@ -1363,15 +1363,15 @@ func (e *mutableStateBuilder) UpdateActivity(
13631363
// UpdateActivity updates an activity
13641364
func (e *mutableStateBuilder) UpdateActivityWithTimerHeartbeat(
13651365
ai *persistenceblobs.ActivityInfo,
1366-
timerTimeoutVisiblityInSeconds int64,
1366+
timerTimeoutVisibility time.Time,
13671367
) error {
13681368

13691369
err := e.UpdateActivity(ai)
13701370
if err != nil {
13711371
return err
13721372
}
13731373

1374-
e.pendingActivityTimerHeartbeats[ai.ScheduleId] = timerTimeoutVisiblityInSeconds
1374+
e.pendingActivityTimerHeartbeats[ai.ScheduleId] = timerTimeoutVisibility
13751375
return nil
13761376
}
13771377

0 commit comments

Comments
 (0)