Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 64 additions & 64 deletions common/archiver/filestore/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ func (s *visibilityArchiverSuite) TestArchive_Fail_InvalidURI() {
URI, err := archiver.NewURI("wrongscheme://")
s.NoError(err)
request := &archiverproto.ArchiveVisibilityRequest{
Namespace: testNamespace,
NamespaceId: testNamespaceID,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimeNowPtrUtc(),
ExecutionTime: nil, // workflow without backoff
CloseTime: timestamp.TimeNowPtrUtc(),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
Namespace: testNamespace,
NamespaceId: testNamespaceID,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimeNowPtrUtc(),
ExecutionTime: nil, // workflow without backoff
CloseTime: timestamp.TimeNowPtrUtc(),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
}
err = visibilityArchiver.Archive(context.Background(), URI, request)
s.Error(err)
Expand Down Expand Up @@ -170,16 +170,16 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
visibilityArchiver := s.newTestVisibilityArchiver()
closeTimestamp := timestamp.TimeNowPtrUtc()
request := &archiverproto.ArchiveVisibilityRequest{
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimePtr(closeTimestamp.Add(-time.Hour)),
ExecutionTime: nil, // workflow without backoff
CloseTime: closeTimestamp,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimePtr(closeTimestamp.Add(-time.Hour)),
ExecutionTime: nil, // workflow without backoff
CloseTime: closeTimestamp,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
Memo: &commonpb.Memo{
Fields: map[string]*commonpb.Payload{
"testFields": payload.EncodeBytes([]byte{1, 2, 3}),
Expand Down Expand Up @@ -253,7 +253,7 @@ func (s *visibilityArchiverSuite) TestMatchQuery() {
runID: convert.StringPtr("random runID"),
},
record: &archiverproto.ArchiveVisibilityRequest{
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
WorkflowId: "random workflowID",
RunId: "random runID",
WorkflowTypeName: "random type name",
Expand All @@ -279,7 +279,7 @@ func (s *visibilityArchiverSuite) TestMatchQuery() {
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW),
},
record: &archiverproto.ArchiveVisibilityRequest{
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
CloseTime: timestamp.UnixOrZeroTimePtr(12345),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
WorkflowTypeName: "some random type name",
},
Expand Down Expand Up @@ -518,58 +518,58 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 101,
},
{
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "some random workflow ID",
RunId: "some random run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(2),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(1000),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 123,
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "some random workflow ID",
RunId: "some random run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(2),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(1000),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 123,
},
{
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "another workflow ID",
RunId: "another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(10),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
HistoryLength: 456,
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "another workflow ID",
RunId: "another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(10),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
HistoryLength: 456,
},
{
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "and another workflow ID",
RunId: "and another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(5),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 456,
NamespaceId: testNamespaceID,
Namespace: testNamespace,
WorkflowId: "and another workflow ID",
RunId: "and another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(5),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 456,
},
{
NamespaceId: "some random namespace ID",
Namespace: "some random namespace name",
WorkflowId: "another workflow ID",
RunId: "another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
HistoryLength: 456,
NamespaceId: "some random namespace ID",
Namespace: "some random namespace name",
WorkflowId: "another workflow ID",
RunId: "another run ID",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(3),
ExecutionTime: nil,
CloseTime: timestamp.UnixOrZeroTimePtr(10000),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
HistoryLength: 456,
},
}

Expand Down
36 changes: 18 additions & 18 deletions common/archiver/s3store/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ func (s *visibilityArchiverSuite) TestArchive_Fail_InvalidURI() {
URI, err := archiver.NewURI("wrongscheme://")
s.NoError(err)
request := &archiverproto.ArchiveVisibilityRequest{
Namespace: testNamespace,
NamespaceId: testNamespaceID,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimeNowPtrUtc(),
ExecutionTime: nil, // workflow without backoff
CloseTime: timestamp.TimeNowPtrUtc(),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
Namespace: testNamespace,
NamespaceId: testNamespaceID,
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.TimeNowPtrUtc(),
ExecutionTime: nil, // workflow without backoff
CloseTime: timestamp.TimeNowPtrUtc(),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: int64(101),
}
err = visibilityArchiver.Archive(context.Background(), URI, request)
s.Error(err)
Expand Down Expand Up @@ -437,8 +437,8 @@ func (s *visibilityArchiverSuite) TestArchiveAndQueryPrecisions() {
WorkflowId: testWorkflowID,
RunId: fmt.Sprintf("%s-%d", testRunID, i),
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(testData.day*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
CloseTime: timestamp.UnixOrZeroTimePtr((testData.day+30)*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
StartTime: timestamp.UnixOrZeroTimePtr(testData.day*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
CloseTime: timestamp.UnixOrZeroTimePtr((testData.day+30)*int64(time.Hour)*24 + testData.hour*int64(time.Hour) + testData.minute*int64(time.Minute) + testData.second*int64(time.Second)),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 101,
}
Expand Down Expand Up @@ -574,8 +574,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
WorkflowId: testWorkflowID,
RunId: testRunID,
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds()),
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds()),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 101,
},
Expand All @@ -585,8 +585,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
WorkflowId: testWorkflowID,
RunId: testRunID + "1",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds() + 30*time.Minute.Nanoseconds()),
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(time.Hour.Nanoseconds() + 30*time.Minute.Nanoseconds()),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 101,
},
Expand All @@ -596,8 +596,8 @@ func (s *visibilityArchiverSuite) setupVisibilityDirectory() {
WorkflowId: testWorkflowID,
RunId: testRunID + "1",
WorkflowTypeName: testWorkflowTypeName,
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(3 * time.Hour.Nanoseconds()),
StartTime: timestamp.UnixOrZeroTimePtr(1),
CloseTime: timestamp.UnixOrZeroTimePtr(3 * time.Hour.Nanoseconds()),
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
HistoryLength: 101,
},
Expand Down
10 changes: 10 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,16 @@ func QueryLevel(s time.Time) Tag {
return newTimeTag("query-level", s)
}

// QueryLevel returns tag for query level
func MinQueryLevel(s time.Time) Tag {
return newTimeTag("min-query-level", s)
}

// QueryLevel returns tag for query level
func MaxQueryLevel(s time.Time) Tag {
return newTimeTag("max-query-level", s)
}

// TaskQueueInfo returns tag for task queue info
func TaskQueueInfo(s interface{}) Tag {
return newObjectTag("task-queue-info", s)
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2077,12 +2077,12 @@ func (a *SyncActivityTask) SetVisibilityTimestamp(timestamp time.Time) {

// DBTimestampToUnixNano converts CQL timestamp to UnixNano
func DBTimestampToUnixNano(milliseconds int64) int64 {
return milliseconds * 1000 * 1000 // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-3) - (-9) = 6, so multiply by 10⁶
return (time.Duration(milliseconds) * time.Millisecond).Nanoseconds()
}

// UnixNanoToDBTimestamp converts UnixNano to CQL timestamp
func UnixNanoToDBTimestamp(timestamp int64) int64 {
return timestamp / (1000 * 1000) // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-9) - (-3) = -6, so divide by 10⁶
return time.Duration(timestamp).Milliseconds()
}

// NewHistoryBranchToken return a new branch token
Expand Down
1 change: 1 addition & 0 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package persistence

import (
"fmt"

"github.com/pborman/uuid"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/sql/sqlplugin/mysql/execution_maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,12 @@ func expandBatchInsertQuery(q string, rowCount int) string {
// into the sql query when we don't use a semicolon when we don't terminate with a `;`.
// Removing that while writing the second half of the query if it exists.
lastIdx := strings.LastIndex(q, ",")
if lastIdx == -1 || lastIdx <= valEndIdx{
if lastIdx == -1 || lastIdx <= valEndIdx {
b.WriteString(q[valEndIdx:])
} else {
b.WriteString(q[valEndIdx:lastIdx])
}


return b.String()
}

Expand Down
1 change: 0 additions & 1 deletion common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ const (
`ON DUPLICATE KEY UPDATE ` +
`run_id=VALUES(run_id)`


templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` +
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
Expand Down
4 changes: 2 additions & 2 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type (
FlushBufferedEvents() error
GetActivityByActivityID(string) (*persistenceblobs.ActivityInfo, bool)
GetActivityInfo(int64) (*persistenceblobs.ActivityInfo, bool)
GetActivityInfoWithTimerHeartbeat(int64) (*persistenceblobs.ActivityInfo, int64, bool)
GetActivityInfoWithTimerHeartbeat(scheduleEventID int64) (*persistenceblobs.ActivityInfo, time.Time, bool)
GetActivityScheduledEvent(int64) (*historypb.HistoryEvent, error)
GetChildExecutionInfo(int64) (*persistenceblobs.ChildExecutionInfo, bool)
GetChildExecutionInitiatedEvent(int64) (*historypb.HistoryEvent, error)
Expand Down Expand Up @@ -219,7 +219,7 @@ type (
SetHistoryTree(treeID string) error
SetVersionHistories(*persistence.VersionHistories) error
UpdateActivity(*persistenceblobs.ActivityInfo) error
UpdateActivityWithTimerHeartbeat(*persistenceblobs.ActivityInfo, int64) error
UpdateActivityWithTimerHeartbeat(*persistenceblobs.ActivityInfo, time.Time) error
UpdateActivityProgress(ai *persistenceblobs.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
UpdateWorkflowTask(*workflowTaskInfo)
UpdateReplicationStateVersion(int64, bool)
Expand Down
10 changes: 5 additions & 5 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (

type (
mutableStateBuilder struct {
pendingActivityTimerHeartbeats map[int64]int64 // Schedule Event ID -> LastHeartbeatTimeoutVisibilityInSeconds.
pendingActivityTimerHeartbeats map[int64]time.Time // Schedule Event ID -> LastHeartbeatTimeoutVisibilityInSeconds.
pendingActivityInfoIDs map[int64]*persistenceblobs.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityIDToEventID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos map[*persistenceblobs.ActivityInfo]struct{} // Modified activities from last update.
Expand Down Expand Up @@ -180,7 +180,7 @@ func newMutableStateBuilder(
) *mutableStateBuilder {
s := &mutableStateBuilder{
updateActivityInfos: make(map[*persistenceblobs.ActivityInfo]struct{}),
pendingActivityTimerHeartbeats: make(map[int64]int64),
pendingActivityTimerHeartbeats: make(map[int64]time.Time),
pendingActivityInfoIDs: make(map[int64]*persistenceblobs.ActivityInfo),
pendingActivityIDToEventID: make(map[string]int64),
deleteActivityInfos: make(map[int64]struct{}),
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (e *mutableStateBuilder) GetActivityInfo(
// GetActivityInfo gives details about an activity that is currently in progress.
func (e *mutableStateBuilder) GetActivityInfoWithTimerHeartbeat(
scheduleEventID int64,
) (*persistenceblobs.ActivityInfo, int64, bool) {
) (*persistenceblobs.ActivityInfo, time.Time, bool) {
ai, ok := e.pendingActivityInfoIDs[scheduleEventID]
timerVis, ok := e.pendingActivityTimerHeartbeats[scheduleEventID]

Expand Down Expand Up @@ -1363,15 +1363,15 @@ func (e *mutableStateBuilder) UpdateActivity(
// UpdateActivity updates an activity
func (e *mutableStateBuilder) UpdateActivityWithTimerHeartbeat(
ai *persistenceblobs.ActivityInfo,
timerTimeoutVisiblityInSeconds int64,
timerTimeoutVisibility time.Time,
) error {

err := e.UpdateActivity(ai)
if err != nil {
return err
}

e.pendingActivityTimerHeartbeats[ai.ScheduleId] = timerTimeoutVisiblityInSeconds
e.pendingActivityTimerHeartbeats[ai.ScheduleId] = timerTimeoutVisibility
return nil
}

Expand Down
Loading