Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/token/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

217 changes: 166 additions & 51 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activity
import (
"errors"
"fmt"
"slices"
"time"

"go.temporal.io/api/activity/v1"
Expand All @@ -15,7 +16,6 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common"
Expand All @@ -25,11 +25,11 @@ import (
)

type ActivityStore interface {
// PopulateRecordActivityTaskStartedResponse populates the response for RecordActivityTaskStarted
PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking but you will want Activity in both of these method names because when the workflow component implements these APIs it will be important to qualify.
Don't worry about this though, we are going to want to rewrite all of this eventually anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

// PopulateRecordStartedResponse populates the response for HandleStarted
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error

// RecordCompletion applies the provided function to record activity completion
RecordCompletion(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
// RecordCompleted applies the provided function to record activity completion
RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
}

// Activity component represents an activity execution persistence object and can be either standalone activity or one
Expand All @@ -53,13 +53,6 @@ type Activity struct {
Store chasm.Field[ActivityStore]
}

// RecordActivityTaskStartedParams holds parameters for RecordActivityTaskStarted
type RecordActivityTaskStartedParams struct {
VersionDirective *taskqueuespb.TaskVersionDirective
WorkerIdentity string
}

// LifecycleState TODO: we need to add more lifecycle states to better categorize some activity states, particulary for terminated/canceled.
func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch a.Status {
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
Expand Down Expand Up @@ -138,8 +131,9 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
}, nil
}

// RecordActivityTaskStarted updates the activity on recording activity task started and populates the response.
func (a *Activity) RecordActivityTaskStarted(ctx chasm.MutableContext, params RecordActivityTaskStartedParams) (*historyservice.RecordActivityTaskStartedResponse, error) {
// HandleStarted updates the activity on recording activity task started and populates the response.
func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) (
*historyservice.RecordActivityTaskStartedResponse, error) {
if err := TransitionStarted.Apply(a, ctx, nil); err != nil {
return nil, err
}
Expand All @@ -149,10 +143,10 @@ func (a *Activity) RecordActivityTaskStarted(ctx chasm.MutableContext, params Re
return nil, err
}

attempt.LastStartedTime = timestamppb.New(ctx.Now(a))
attempt.LastWorkerIdentity = params.WorkerIdentity
attempt.StartedTime = timestamppb.New(ctx.Now(a))
attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()

if versionDirective := params.VersionDirective.GetDeploymentVersion(); versionDirective != nil {
if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil {
attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
BuildId: versionDirective.GetBuildId(),
DeploymentName: versionDirective.GetDeploymentName(),
Expand All @@ -166,19 +160,19 @@ func (a *Activity) RecordActivityTaskStarted(ctx chasm.MutableContext, params Re

response := &historyservice.RecordActivityTaskStartedResponse{}
if store == nil {
if err := a.PopulateRecordActivityTaskStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
if err := a.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
return nil, err
}
} else {
if err := store.PopulateRecordActivityTaskStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
if err := store.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
return nil, err
}
}

return response, nil
}

func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error {
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error {
attempt, err := a.Attempt.Get(ctx)
if err != nil {
return err
Expand All @@ -194,7 +188,7 @@ func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context,
return err
}

response.StartedTime = attempt.LastStartedTime
response.StartedTime = attempt.StartedTime
response.Attempt = attempt.GetCount()
if lastHeartbeat != nil {
response.HeartbeatDetails = lastHeartbeat.GetDetails()
Expand All @@ -221,13 +215,89 @@ func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context,
return nil
}

func (a *Activity) RecordCompletion(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
return applyFn(ctx)
}

// recordFromScheduledTimeOut records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
// HandleCompleted updates the activity on activity completion.
func (a *Activity) HandleCompleted(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) (
*historyservice.RespondActivityTaskCompletedResponse, error) {
if err := TransitionCompleted.Apply(a, ctx, request); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskCompletedResponse{}, nil
}

// HandleFailed updates the activity on activity failure. if the activity is retryable, it will be rescheduled
// for retry instead.
func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) (
*historyservice.RespondActivityTaskFailedResponse, error) {
failure := req.GetFailedRequest().GetFailure()

shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure)
if err != nil {
return nil, err
}

if shouldRetry {
if err := TransitionRescheduled.Apply(a, ctx, rescheduleEvent{
retryInterval: retryInterval,
failure: failure,
}); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskFailedResponse{}, nil
}

// No more retries, transition to failed state
if err := TransitionFailed.Apply(a, ctx, req); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskFailedResponse{}, nil
}

// getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
// to avoid unnecessary writes when heartbeats are not used.
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.ActivityHeartbeatState, error) {
heartbeat, err := a.LastHeartbeat.Get(ctx)
if err != nil {
return nil, err
}

if heartbeat == nil {
heartbeat = &activitypb.ActivityHeartbeatState{}
a.LastHeartbeat = chasm.NewDataField(ctx, heartbeat)
}

return heartbeat, nil
}

func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Failure) (bool, time.Duration, error) {
var isRetryable bool

if failure.GetApplicationFailureInfo() != nil {
appFailure := failure.GetApplicationFailureInfo()
isRetryable = !appFailure.GetNonRetryable() && !slices.Contains(
a.GetRetryPolicy().GetNonRetryableErrorTypes(),
appFailure.GetType(),
)
}

if !isRetryable {
return false, 0, nil
}

overridingRetryInterval := failure.GetApplicationFailureInfo().GetNextRetryDelay().AsDuration()

return a.shouldRetry(ctx, overridingRetryInterval)
}

// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
// set the outcome failure directly and leave the attempt failure as is.
func (a *Activity) recordFromScheduledTimeOut(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
outcome, err := a.Outcome.Get(ctx)
if err != nil {
return err
Expand All @@ -251,25 +321,20 @@ func (a *Activity) recordFromScheduledTimeOut(ctx chasm.MutableContext, timeoutT
return nil
}

// recordStartToCloseTimedOut records start-to-close timeouts. These come from retried attempts so we update the attempt
// failure info but leave the outcome failure empty to avoid duplication
func (a *Activity) recordStartToCloseTimedOut(ctx chasm.MutableContext, retryInterval time.Duration, noRetriesLeft bool) error {
// recordFailedAttempt records any failures resulting from a tried attempt, including worker application failures and
// start-to-close timeouts. Since the calls come from retried attempts we update the attempt failure info but leave
// the outcome failure empty to avoid duplication.
func (a *Activity) recordFailedAttempt(
ctx chasm.MutableContext,
retryInterval time.Duration,
failure *failurepb.Failure,
noRetriesLeft bool,
) error {
outcome, err := a.Outcome.Get(ctx)
if err != nil {
return err
}

timeoutType := enumspb.TIMEOUT_TYPE_START_TO_CLOSE

failure := &failurepb.Failure{
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()),
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: timeoutType,
},
},
}

attempt, err := a.Attempt.Get(ctx)
if err != nil {
return err
Expand All @@ -281,7 +346,7 @@ func (a *Activity) recordStartToCloseTimedOut(ctx chasm.MutableContext, retryInt
Failure: failure,
Time: currentTime,
}
attempt.LastAttemptCompleteTime = currentTime
attempt.CompleteTime = currentTime

// If the activity has exhausted retries, mark the outcome failure as well but don't store duplicate failure info.
// Also reset the retry interval as there won't be any more retries.
Expand All @@ -295,17 +360,50 @@ func (a *Activity) recordStartToCloseTimedOut(ctx chasm.MutableContext, retryInt
return nil
}

func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context) (bool, error) {
func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
attempt, err := a.Attempt.Get(ctx)
if err != nil {
return false, err
return false, 0, err
}
retryPolicy := a.RetryPolicy

retryInterval := backoff.CalculateExponentialRetryInterval(a.RetryPolicy, attempt.Count)
enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || attempt.GetCount() < retryPolicy.GetMaximumAttempts()
enoughTime, retryInterval, err := a.hasEnoughTimeForRetry(ctx, overridingRetryInterval)
if err != nil {
return false, 0, err
}

return enoughAttempts && enoughTime, retryInterval, nil
}

// hasEnoughTimeForRetry checks if there is enough time left in the schedule-to-close timeout. If sufficient time
// remains, it will also return a valid retry interval
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
attempt, err := a.Attempt.Get(ctx)
if err != nil {
return false, 0, err
}

// Use overriding retry interval if provided, else calculate based on retry policy
retryInterval := overridingRetryInterval
if retryInterval <= 0 {
retryInterval = backoff.CalculateExponentialRetryInterval(a.RetryPolicy, attempt.Count)
}

deadline := a.ScheduledTime.AsTime().Add(a.GetScheduleToCloseTimeout().AsDuration())

return ctx.Now(a).Add(retryInterval).Before(deadline), nil
return ctx.Now(a).Add(retryInterval).Before(deadline), retryInterval, nil
}

func createStartToCloseTimeoutFailure() *failurepb.Failure {
return &failurepb.Failure{
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()),
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
},
},
}
}

func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.Payloads) (chasm.NoValue, error) {
Expand Down Expand Up @@ -363,15 +461,32 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti

key := ctx.ExecutionKey()

attempt, err := a.Attempt.Get(ctx)
if err != nil {
return nil, err
}

heartbeat, err := a.LastHeartbeat.Get(ctx)
if err != nil {
return nil, err
}

info := &activity.ActivityExecutionInfo{
ActivityId: key.BusinessID,
RunId: key.EntityID,
ActivityType: a.GetActivityType(),
Status: status,
RunState: runState,
ScheduledTime: a.GetScheduledTime(),
Priority: a.GetPriority(),
Header: requestData.GetHeader(),
ActivityId: key.BusinessID,
ActivityType: a.GetActivityType(),
Attempt: attempt.GetCount(),
Header: requestData.GetHeader(),
HeartbeatDetails: heartbeat.GetDetails(),
LastAttemptCompleteTime: attempt.GetCompleteTime(),
LastFailure: attempt.GetLastFailureDetails().GetFailure(),
LastHeartbeatTime: heartbeat.GetRecordedTime(),
LastStartedTime: attempt.GetStartedTime(),
LastWorkerIdentity: attempt.GetLastWorkerIdentity(),
Priority: a.GetPriority(),
RunId: key.EntityID,
RunState: runState,
ScheduledTime: a.GetScheduledTime(),
Status: status,
// TODO(dan): populate remaining fields
}

Expand Down
Loading
Loading