Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2e200cc
LastUpdatedTimestamp
shawnhathaway Sep 1, 2020
b34bf45
Move ExecutionStats to be embedded in WEI
shawnhathaway Sep 1, 2020
e2ac138
Remove Internal WEI
shawnhathaway Sep 1, 2020
1afc3a4
Use new helpers
shawnhathaway Sep 1, 2020
5f9d449
Init execution stats when history size is 0
shawnhathaway Sep 1, 2020
fe4e45f
WEI MaximalInterval -> *time.Duration
shawnhathaway Sep 1, 2020
4494383
WEI InitialInterval -> *time.Duration
shawnhathaway Sep 1, 2020
3b3fe06
WEI StickySchedToStart -> *time.Duration
shawnhathaway Sep 1, 2020
46028c4
WEI WorkflowTaskTimeout & Default -> *time.Duration
shawnhathaway Sep 1, 2020
f5ff7ce
WEI WorkflowExecutionTimeout -> *time.Duration
shawnhathaway Sep 1, 2020
f17c3dd
WEI WorkflowRunTimeout -> *time.Duration
shawnhathaway Sep 1, 2020
11592a0
WEI WorkflowStartedTimestamp -> *time.Time
shawnhathaway Sep 1, 2020
8502c61
Test Fixes
shawnhathaway Sep 1, 2020
2878992
WEI WorkflowExpirationTime -> *time.Time
shawnhathaway Sep 1, 2020
2b00214
WEI StartTime/LastUpdatedTime -> *time.Time
shawnhathaway Sep 1, 2020
8c9c877
WEI OriginalScheduledTimestamp/WorkflowScheduledTimestamp -> *time.Time
shawnhathaway Sep 1, 2020
1a04be8
Merge remote-tracking branch 'noir/master' into mutable
shawnhathaway Sep 9, 2020
7d01598
go fmt ./...
shawnhathaway Sep 9, 2020
3b8be21
Merge branch 'master' into mutable
shawnhathaway Sep 9, 2020
36c1142
Merge branch 'master' into mutable
shawnhathaway Sep 10, 2020
09b554e
Merge branch 'master' into mutable
alexshtin Sep 11, 2020
fab665f
Copy Execution Stats, verify ExecutionInfo duplicates HistorySize in …
shawnhathaway Sep 11, 2020
8946b77
Rename WEI ProtoConverters, move to protoConverter.go
shawnhathaway Sep 11, 2020
537cbe3
Add proto conversion testing for WEI
shawnhathaway Sep 11, 2020
ace015d
Merge branch 'master' into mutable
shawnhathaway Sep 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
736 changes: 404 additions & 332 deletions api/persistenceblobs/v1/message.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion common/cassandra/cassandraCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ package cassandra

import (
"crypto/tls"
"github.com/gocql/gocql"
"strings"

"github.com/gocql/gocql"

"go.temporal.io/server/common/service/config"
)

Expand Down
14 changes: 14 additions & 0 deletions common/log/tag/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"time"

"go.uber.org/zap"

"go.temporal.io/server/common/primitives/timestamp"
)

// Tag is the interface for logging system
Expand Down Expand Up @@ -91,12 +93,24 @@ func newDurationTag(key string, value time.Duration) Tag {
}
}

func newDurationPtrTag(key string, value *time.Duration) Tag {
return Tag{
field: zap.Duration(key, timestamp.DurationValue(value)),
}
}

func newTimeTag(key string, value time.Time) Tag {
return Tag{
field: zap.Time(key, value),
}
}

func newTimePtrTag(key string, value *time.Time) Tag {
return Tag{
field: zap.Time(key, timestamp.TimeValue(value)),
}
}

func newObjectTag(key string, value interface{}) Tag {
return Tag{
field: zap.String(key, fmt.Sprintf("%v", value)),
Expand Down
10 changes: 10 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func WorkflowTaskTimeoutSeconds(s int64) Tag {
return newInt64("workflow-task-timeout", s)
}

// WorkflowTaskTimeoutSeconds returns tag for WorkflowTaskTimeoutSeconds
func WorkflowTaskTimeout(s *time.Duration) Tag {
return newDurationPtrTag("workflow-task-timeout", s)
}

// QueryID returns tag for QueryID
func QueryID(queryID string) Tag {
return newStringTag("query-id", queryID)
Expand Down Expand Up @@ -211,6 +216,11 @@ func WorkflowStartedID(id int64) Tag {
return newInt64("wf-started-id", id)
}

// WorkflowStartedID returns tag for WorkflowStartedTimestamp
func WorkflowStartedTimestamp(t *time.Time) Tag {
return newTimePtrTag("wf-started-timestamp", t)
}

// WorkflowInitiatedID returns tag for WorkflowInitiatedID
func WorkflowInitiatedID(id int64) Tag {
return newInt64("wf-initiated-id", id)
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func applyWorkflowSnapshotBatchAsNew(
func createExecution(
batch *gocql.Batch,
shardID int,
executionInfo *p.InternalWorkflowExecutionInfo,
executionInfo *p.WorkflowExecutionInfo,
versionHistories *history.VersionHistories,
checksum checksum.Checksum,
cqlNowTimestampMillis int64,
Expand All @@ -415,8 +415,8 @@ func createExecution(
runID := executionInfo.RunID

// TODO we should set the start time and last update time on business logic layer
executionInfo.StartTimestamp = time.Unix(0, p.DBTimestampToUnixNano(cqlNowTimestampMillis)).UTC()
executionInfo.LastUpdateTimestamp = time.Unix(0, p.DBTimestampToUnixNano(cqlNowTimestampMillis)).UTC()
executionInfo.StartTimestamp = timestamp.UnixOrZeroTimePtr(p.DBTimestampToUnixNano(cqlNowTimestampMillis))
executionInfo.LastUpdatedTimestamp = timestamp.UnixOrZeroTimePtr(p.DBTimestampToUnixNano(cqlNowTimestampMillis))

protoExecution, protoState, err := p.InternalWorkflowExecutionInfoToProto(executionInfo, startVersion, versionHistories)
if err != nil {
Expand Down Expand Up @@ -479,7 +479,7 @@ func createExecution(
func updateExecution(
batch *gocql.Batch,
shardID int,
executionInfo *p.InternalWorkflowExecutionInfo,
executionInfo *p.WorkflowExecutionInfo,
versionHistories *history.VersionHistories,
cqlNowTimestampMillis int64,
condition int64,
Expand All @@ -499,7 +499,7 @@ func updateExecution(
runID := executionInfo.RunID

// TODO we should set the last update time on business logic layer
executionInfo.LastUpdateTimestamp = time.Unix(0, p.DBTimestampToUnixNano(cqlNowTimestampMillis)).UTC()
executionInfo.LastUpdatedTimestamp = timestamp.UnixOrZeroTimePtr(p.DBTimestampToUnixNano(cqlNowTimestampMillis))

protoExecution, protoState, err := p.InternalWorkflowExecutionInfoToProto(executionInfo, startVersion, versionHistories)
if err != nil {
Expand Down
29 changes: 15 additions & 14 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,32 @@ type (
CompletionEvent *historypb.HistoryEvent
TaskQueue string
WorkflowTypeName string
WorkflowRunTimeout int64
WorkflowExecutionTimeout int64
DefaultWorkflowTaskTimeout int64
WorkflowRunTimeout *time.Duration
WorkflowExecutionTimeout *time.Duration
DefaultWorkflowTaskTimeout *time.Duration
State enumsspb.WorkflowExecutionState
Status enumspb.WorkflowExecutionStatus
LastFirstEventID int64
LastEventTaskID int64
NextEventID int64
LastProcessedEvent int64
StartTimestamp time.Time
LastUpdatedTimestamp time.Time
StartTimestamp *time.Time
LastUpdatedTimestamp *time.Time
CreateRequestID string
SignalCount int64
WorkflowTaskVersion int64
WorkflowTaskScheduleID int64
WorkflowTaskStartedID int64
WorkflowTaskRequestID string
WorkflowTaskTimeout int64
WorkflowTaskTimeout *time.Duration
WorkflowTaskAttempt int32
WorkflowTaskStartedTimestamp int64
WorkflowTaskScheduledTimestamp int64
WorkflowTaskOriginalScheduledTimestamp int64
WorkflowTaskStartedTimestamp *time.Time
WorkflowTaskScheduledTimestamp *time.Time
WorkflowTaskOriginalScheduledTimestamp *time.Time
Comment on lines +231 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these need to be renamed to *Time not *Timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR :)

CancelRequested bool
CancelRequestID string
StickyTaskQueue string
StickyScheduleToStartTimeout int64
StickyScheduleToStartTimeout *time.Duration
ClientLibraryVersion string
ClientFeatureVersion string
ClientImpl string
Expand All @@ -244,15 +244,16 @@ type (
// for retry
Attempt int32
HasRetryPolicy bool
InitialInterval int64
InitialInterval *time.Duration
BackoffCoefficient float64
MaximumInterval int64
WorkflowExpirationTime time.Time
MaximumInterval *time.Duration
WorkflowExpirationTime *time.Time
MaximumAttempts int32
NonRetryableErrorTypes []string
BranchToken []byte
// Cron
CronSchedule string
CronSchedule string
ExecutionStats *persistenceblobs.ExecutionStats
}

// ReplicationTaskInfoWrapper describes a replication task.
Expand Down
23 changes: 10 additions & 13 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
}

func (m *executionManagerImpl) DeserializeExecutionInfo(
info *InternalWorkflowExecutionInfo,
info *WorkflowExecutionInfo,
) (*WorkflowExecutionInfo, *persistenceblobs.ExecutionStats, error) {
newInfo := &WorkflowExecutionInfo{
CompletionEvent: info.CompletionEvent,
Expand All @@ -132,7 +132,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
NextEventID: info.NextEventID,
LastProcessedEvent: info.LastProcessedEvent,
StartTimestamp: info.StartTimestamp,
LastUpdatedTimestamp: info.LastUpdateTimestamp,
LastUpdatedTimestamp: info.LastUpdatedTimestamp,
CreateRequestID: info.CreateRequestID,
SignalCount: info.SignalCount,
WorkflowTaskVersion: info.WorkflowTaskVersion,
Expand All @@ -156,7 +156,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
InitialInterval: info.InitialInterval,
BackoffCoefficient: info.BackoffCoefficient,
MaximumInterval: info.MaximumInterval,
WorkflowExpirationTime: info.ExpirationTime,
WorkflowExpirationTime: info.WorkflowExpirationTime,
MaximumAttempts: info.MaximumAttempts,
NonRetryableErrorTypes: info.NonRetryableErrorTypes,
BranchToken: info.BranchToken,
Expand All @@ -170,10 +170,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
newInfo.AutoResetPoints = &workflowpb.ResetPoints{}
}

newStats := &persistenceblobs.ExecutionStats{
HistorySize: info.HistorySize,
}
return newInfo, newStats, nil
return newInfo, info.ExecutionStats, nil
}

func (m *executionManagerImpl) DeserializeBufferedEvents(
Expand Down Expand Up @@ -230,13 +227,13 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
func (m *executionManagerImpl) SerializeExecutionInfo(
info *WorkflowExecutionInfo,
stats *persistenceblobs.ExecutionStats,
) (*InternalWorkflowExecutionInfo, error) {
) (*WorkflowExecutionInfo, error) {

if info == nil {
return &InternalWorkflowExecutionInfo{}, nil
return &WorkflowExecutionInfo{}, nil
}

return &InternalWorkflowExecutionInfo{
return &WorkflowExecutionInfo{
NamespaceID: info.NamespaceID,
WorkflowID: info.WorkflowID,
RunID: info.RunID,
Expand All @@ -259,7 +256,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
NextEventID: info.NextEventID,
LastProcessedEvent: info.LastProcessedEvent,
StartTimestamp: info.StartTimestamp,
LastUpdateTimestamp: info.LastUpdatedTimestamp,
LastUpdatedTimestamp: info.LastUpdatedTimestamp,
CreateRequestID: info.CreateRequestID,
SignalCount: info.SignalCount,
WorkflowTaskVersion: info.WorkflowTaskVersion,
Expand All @@ -284,7 +281,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
InitialInterval: info.InitialInterval,
BackoffCoefficient: info.BackoffCoefficient,
MaximumInterval: info.MaximumInterval,
ExpirationTime: info.WorkflowExpirationTime,
WorkflowExpirationTime: info.WorkflowExpirationTime,
MaximumAttempts: info.MaximumAttempts,
NonRetryableErrorTypes: info.NonRetryableErrorTypes,
BranchToken: info.BranchToken,
Expand All @@ -293,7 +290,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
SearchAttributes: info.SearchAttributes,

// attributes which are not related to mutable state
HistorySize: stats.HistorySize,
ExecutionStats: stats,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions common/persistence/operationModeValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (s *validateOperationWorkflowModeStateSuite) newTestWorkflowSnapshot(
state enumsspb.WorkflowExecutionState,
) InternalWorkflowSnapshot {
return InternalWorkflowSnapshot{
ExecutionInfo: &InternalWorkflowExecutionInfo{
ExecutionInfo: &WorkflowExecutionInfo{
State: state,
},
}
Expand All @@ -415,7 +415,7 @@ func (s *validateOperationWorkflowModeStateSuite) newTestWorkflowMutation(
state enumsspb.WorkflowExecutionState,
) InternalWorkflowMutation {
return InternalWorkflowMutation{
ExecutionInfo: &InternalWorkflowExecutionInfo{
ExecutionInfo: &WorkflowExecutionInfo{
State: state,
},
}
Expand Down
Loading