Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions common/persistence/cassandra/mutable_state_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/service/history/tasks"
)

Expand Down Expand Up @@ -214,7 +213,6 @@ func (d *MutableStateTaskStore) AddHistoryTasks(
if !applied {
if previousRangeID, ok := previous["range_id"].(int64); ok && previousRangeID != request.RangeID {
// CreateWorkflowExecution failed because rangeID was modified
softassert.Sometimes(d.Logger).Debug("ShardOwnershipLost: Failed to add tasks")
return &p.ShardOwnershipLostError{
ShardID: request.ShardID,
Msg: fmt.Sprintf("Failed to add tasks. Request RangeID: %v, Actual RangeID: %v", request.RangeID, previousRangeID),
Expand Down
2 changes: 0 additions & 2 deletions common/persistence/cassandra/shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"go.temporal.io/server/common/log"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/softassert"
)

const (
Expand Down Expand Up @@ -146,7 +145,6 @@ func (d *ShardStore) UpdateShard(
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}
softassert.Sometimes(d.Logger).Debug("ShardOwnershipLostError: Failed to update shard")
return &p.ShardOwnershipLostError{
ShardID: request.ShardID,
Msg: fmt.Sprintf("Failed to update shard. previous_range_id: %v, columns: (%v)",
Expand Down
2 changes: 0 additions & 2 deletions common/persistence/sql/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/softassert"
)

type sqlShardStore struct {
Expand Down Expand Up @@ -134,7 +133,6 @@ func lockShard(
switch err {
case nil:
if rangeID != oldRangeID {
softassert.Sometimes(logger).Debug("ShardOwnershipLostError: Failed to update shard")
return &persistence.ShardOwnershipLostError{
ShardID: shardID,
Msg: fmt.Sprintf("Failed to update shard. Previous range ID: %v; new range ID: %v", oldRangeID, rangeID),
Expand Down
44 changes: 0 additions & 44 deletions common/softassert/softassert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
"go.temporal.io/server/common/log/tag"
)

type sometimesLogger struct {
logger log.Logger
}

// That performs a soft assertion by logging an error if the given condition is false.
// It is meant to indicate a condition is always expected to be true.
// Returns true if the condition is met, otherwise false.
Expand Down Expand Up @@ -47,43 +43,3 @@ func That(logger log.Logger, condition bool, staticMessage string, tags ...tag.T
func Fail(logger log.Logger, staticMessage string, tags ...tag.Tag) {
logger.Error("failed assertion: "+staticMessage, append([]tag.Tag{tag.FailedAssertion}, tags...)...)
}

// Sometimes is used to log a message of a noteworthy but non-problematic event.
//
// Example:
// softassert.Sometimes(logger).Warn("termination event", tag.NewStringTag("state", object.state))
func Sometimes(logger log.Logger) *sometimesLogger {
return &sometimesLogger{logger: logger}
}

// Debug logs a message at debug level.
//
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
// Dynamic information should be passed via `tags`.
func (s *sometimesLogger) Debug(staticMessage string, tags ...tag.Tag) {
s.logger.Debug(staticMessage, tags...)
}

// Info logs a message at info level.
//
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
// Dynamic information should be passed via `tags`.
func (s *sometimesLogger) Info(staticMessage string, tags ...tag.Tag) {
s.logger.Info(staticMessage, tags...)
}

// Warn logs a message at warn level.
//
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
// Dynamic information should be passed via `tags`.
func (s *sometimesLogger) Warn(staticMessage string, tags ...tag.Tag) {
s.logger.Warn(staticMessage, tags...)
}

// Error logs a message at error level.
//
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
// Dynamic information should be passed via `tags`.
func (s *sometimesLogger) Error(staticMessage string, tags ...tag.Tag) {
s.logger.Error(staticMessage, tags...)
}
30 changes: 3 additions & 27 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/tasktoken"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/api"
Expand Down Expand Up @@ -130,12 +129,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
metrics.StaleMutableStateCounter.With(handler.metricsHandler).Record(
1,
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope))
softassert.Sometimes(handler.logger).Debug("stale mutable state detected",
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
tag.WorkflowScheduledEventID(token.GetScheduledEventId()),
tag.NewInt64("mutable-state-next-event-id", mutableState.GetNextEventID()),
)
return false
}
return true
Expand Down Expand Up @@ -173,7 +166,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
// This is NOT 100% bulletproof solution because this write operation may also fail.
// TODO: remove this call when GetWorkflowExecutionHistory includes speculative WFT events.
if clearStickyErr := handler.clearStickyTaskQueue(ctx, workflowLease.GetContext()); clearStickyErr != nil {
softassert.Sometimes(handler.logger).Error("Failed to clear stickiness after speculative workflow task failed to complete.",
handler.logger.Error("Failed to clear stickiness after speculative workflow task failed to complete.",
tag.NewErrorTag("clear-sticky-error", clearStickyErr),
tag.Error(retError),
tag.WorkflowID(token.GetWorkflowId()),
Expand Down Expand Up @@ -232,7 +225,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
if retError != nil {
cancelled := effects.Cancel(ctx)
if cancelled {
softassert.Sometimes(handler.logger).Info("Canceled effects due to error",
handler.logger.Info("Canceled effects due to error",
tag.Error(retError),
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
Expand Down Expand Up @@ -291,11 +284,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
metrics.NamespaceTag(nsName),
)
metrics.WorkflowTaskHeartbeatTimeoutCounter.With(scope).Record(1)
softassert.Sometimes(handler.logger).Debug("workflow task heartbeat timed out",
tag.WorkflowNamespaceID(nsName),
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
)
completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask)
if err != nil {
return nil, err
Expand Down Expand Up @@ -350,12 +338,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
// and admitted updates are lost. Uncomment this check when durable admitted is implemented
// or updates stay in the registry after WFT is failed.
hasBufferedEventsOrMessages := ms.HasBufferedEvents() // || updateRegistry.HasOutgoingMessages(false)
if hasBufferedEventsOrMessages {
softassert.Sometimes(handler.logger).Debug("workflow has buffered events/messages",
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
)
}
if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil {
wtFailedCause = newWorkflowTaskFailedCause(
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY,
Expand Down Expand Up @@ -465,7 +447,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
metrics.FailureTag(wtFailedCause.failedCause.String()),
metrics.FirstAttemptTag(currentWorkflowTask.Attempt),
)
softassert.Sometimes(handler.logger).Info("Failing the workflow task.",
handler.logger.Info("Failing the workflow task.",
tag.Value(wtFailedCause.Message()),
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
Expand Down Expand Up @@ -641,12 +623,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
// if updateErr resulted in TransactionSizeLimitError then fail workflow
switch updateErr.(type) {
case *persistence.TransactionSizeLimitError:
softassert.Sometimes(handler.logger).Debug("workflow terminated due to size limit",
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
tag.Error(updateErr),
)

// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
// clears mutable state if error is returned
ms, err = weContext.LoadMutableState(ctx, handler.shardContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/protocol"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/tasktoken"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/api"
Expand Down Expand Up @@ -240,7 +239,7 @@ func (handler *workflowTaskCompletedHandler) rejectUnprocessedUpdates(
handler.effects)

if len(rejectedUpdateIDs) > 0 {
softassert.Sometimes(handler.logger).Warn(
handler.logger.Warn(
"Workflow task completed w/o processing updates.",
tag.WorkflowNamespaceID(wfKey.NamespaceID),
tag.WorkflowID(wfKey.WorkflowID),
Expand Down Expand Up @@ -374,21 +373,12 @@ func (handler *workflowTaskCompletedHandler) handleMessage(
}
if upd == nil {
// Update was not found in the registry and can't be resurrected.
softassert.Sometimes(handler.logger).Debug("update lost and cannot be resurrected",
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId))
return handler.failWorkflowTask(
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE,
serviceerror.NewNotFoundf("update %s wasn't found on the server. This is most likely a transient error which will be resolved automatically by retries", message.ProtocolInstanceId))
}

if err := upd.OnProtocolMessage(message, workflow.WithEffects(handler.effects, handler.mutableState)); err != nil {
softassert.Sometimes(handler.logger).Debug("update failed",
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId),
tag.NewStringTag("error", err.Error()))
return handler.failWorkflowTaskOnInvalidArgument(
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err)
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/queues/dlq_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/service/history/tasks"
)

Expand Down Expand Up @@ -119,7 +118,7 @@ func (q *DLQWriter) WriteTaskToDLQ(
} else {
namespaceTag = tag.WorkflowNamespace(string(ns.Name()))
}
softassert.Sometimes(q.logger).Warn("Task enqueued to DLQ",
q.logger.Warn("Task enqueued to DLQ",
tag.DLQMessageID(resp.Metadata.ID),
tag.SourceCluster(sourceCluster),
tag.TargetCluster(targetCluster),
Expand Down
19 changes: 3 additions & 16 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/softassert"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/telemetry"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -467,7 +466,6 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
if errors.As(err, &resourceExhaustedErr) {
switch resourceExhaustedErr.Cause { //nolint:exhaustive
case enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW:
softassert.Sometimes(e.logger).Debug("task throttled due to busy workflow", tag.TaskType(e.GetType()))
err = consts.ErrResourceExhaustedBusyWorkflow
case enumspb.RESOURCE_EXHAUSTED_CAUSE_APS_LIMIT:
err = consts.ErrResourceExhaustedAPSLimit
Expand Down Expand Up @@ -510,14 +508,7 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
var terr MaybeTerminalTaskError
if errors.As(err, &terr) {
isTerminal := terr.IsTerminalTaskError()
if isTerminal {
softassert.Sometimes(e.logger).Debug("terminal task error detected",
tag.TaskType(e.GetType()),
tag.Error(err),
)
}
return isTerminal
return terr.IsTerminalTaskError()
}

if _, isDataLoss := err.(*serviceerror.DataLoss); isDataLoss {
Expand All @@ -527,10 +518,6 @@ func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
isInternalError := common.IsInternalError(err)
if isInternalError {
metrics.TaskInternalErrorCounter.With(e.metricsHandler).Record(1)
softassert.Sometimes(e.logger).Debug("internal non-retryable task processing error",
tag.TaskType(e.GetType()),
tag.Error(err),
)
// Only DQL/drop when configured to
shouldDLQ := e.dlqInternalErrors()
return shouldDLQ
Expand Down Expand Up @@ -588,9 +575,9 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
tag.NewStringTag("task-category", e.GetCategory().Name()),
)
if attempt > taskCriticalLogMetricAttempts {
softassert.Sometimes(logger).Error("Critical error processing task, retrying.", tag.OperationCritical)
logger.Error("Critical error processing task, retrying.", tag.OperationCritical)
} else {
softassert.Sometimes(logger).Warn("Fail to process task")
logger.Warn("Fail to process task")
}

if e.isUnexpectedNonRetryableError(err) {
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/executable_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e *ExecutableTaskImpl) Resend(
) (bool, error) {
remainingAttempt--
if remainingAttempt < 0 {
softassert.Sometimes(e.Logger).Error("resend history attempts exceeded",
e.Logger.Error("resend history attempts exceeded",
tag.WorkflowNamespaceID(retryErr.NamespaceId),
tag.WorkflowID(retryErr.WorkflowId),
tag.WorkflowRunID(retryErr.RunId),
Expand Down Expand Up @@ -773,7 +773,7 @@ func (e *ExecutableTaskImpl) MarkPoisonPill() error {
taskInfo := e.ReplicationTask().GetRawTaskInfo()

if e.markPoisonPillAttempts >= MarkPoisonPillMaxAttempts {
softassert.Sometimes(e.Logger).Error("MarkPoisonPill reached max attempts",
e.Logger.Error("MarkPoisonPill reached max attempts",
tag.SourceCluster(e.SourceClusterName()),
tag.ReplicationTask(taskInfo),
)
Expand Down
2 changes: 0 additions & 2 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/rpc"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
Expand Down Expand Up @@ -2280,7 +2279,6 @@ func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) {

// newShardClosedErrorWithShardID when shard is closed and a req cannot be processed
func (s *ContextImpl) newShardClosedErrorWithShardID() *persistence.ShardOwnershipLostError {
softassert.Sometimes(s.contextTaggedLogger).Debug("ShardOwnershipLostError: Shard closed")
return &persistence.ShardOwnershipLostError{
ShardID: s.shardID, // immutable
Msg: "shard closed",
Expand Down
9 changes: 1 addition & 8 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/priorities"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
Expand Down Expand Up @@ -537,7 +536,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
}

if task.Attempt < activityInfo.Attempt || activityInfo.StartedEventId != common.EmptyEventID {
softassert.Sometimes(t.logger).Info("Duplicate activity retry timer task",
t.logger.Info("Duplicate activity retry timer task",
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId),
Expand Down Expand Up @@ -910,12 +909,6 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag(
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
metrics.HeartbeatTimeoutCounter.With(metricsScope).Record(1)
}

softassert.Sometimes(t.logger).Debug("timer queue task timed out",
tag.NewStringTag("timer-type", timerType.String()),
tag.Operation(operation),
tag.Attempt(taskAttempt),
)
}

func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules(
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/update/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (i *instrumentation) countTooMany() {
func (i *instrumentation) countAborted(updateID string, reason AbortReason) {
i.metrics.Counter(metrics.WorkflowExecutionUpdateAborted.Name()).
Record(1, metrics.ReasonTag(metrics.ReasonString(reason.String())))
softassert.Sometimes(i.log).Debug("update aborted",
i.log.Debug("update aborted",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a tabbing error

tag.NewStringTag("reason", reason.String()),
tag.NewStringTag("update-id", updateID),
)
Expand Down Expand Up @@ -103,7 +103,7 @@ func (i *instrumentation) oneOf(counterName string) {
}

func (i *instrumentation) stateChange(updateID string, from, to state) {
softassert.Sometimes(i.log).Debug(
i.log.Debug(
"update state change",
tag.ComponentWorkflowUpdate,
tag.NewStringTag("update-id", updateID),
Expand Down
Loading