diff --git a/common/persistence/cassandra/mutable_state_task_store.go b/common/persistence/cassandra/mutable_state_task_store.go index 76055665008..96a49c58029 100644 --- a/common/persistence/cassandra/mutable_state_task_store.go +++ b/common/persistence/cassandra/mutable_state_task_store.go @@ -10,7 +10,6 @@ import ( p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" "go.temporal.io/server/common/persistence/serialization" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/service/history/tasks" ) @@ -214,7 +213,6 @@ func (d *MutableStateTaskStore) AddHistoryTasks( if !applied { if previousRangeID, ok := previous["range_id"].(int64); ok && previousRangeID != request.RangeID { // CreateWorkflowExecution failed because rangeID was modified - softassert.Sometimes(d.Logger).Debug("ShardOwnershipLost: Failed to add tasks") return &p.ShardOwnershipLostError{ ShardID: request.ShardID, Msg: fmt.Sprintf("Failed to add tasks. Request RangeID: %v, Actual RangeID: %v", request.RangeID, previousRangeID), diff --git a/common/persistence/cassandra/shard_store.go b/common/persistence/cassandra/shard_store.go index ed4d80eae3c..955029278e6 100644 --- a/common/persistence/cassandra/shard_store.go +++ b/common/persistence/cassandra/shard_store.go @@ -8,7 +8,6 @@ import ( "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" - "go.temporal.io/server/common/softassert" ) const ( @@ -146,7 +145,6 @@ func (d *ShardStore) UpdateShard( for k, v := range previous { columns = append(columns, fmt.Sprintf("%s=%v", k, v)) } - softassert.Sometimes(d.Logger).Debug("ShardOwnershipLostError: Failed to update shard") return &p.ShardOwnershipLostError{ ShardID: request.ShardID, Msg: fmt.Sprintf("Failed to update shard. previous_range_id: %v, columns: (%v)", diff --git a/common/persistence/sql/shard.go b/common/persistence/sql/shard.go index 233fb961c1c..6a46be84e5b 100644 --- a/common/persistence/sql/shard.go +++ b/common/persistence/sql/shard.go @@ -9,7 +9,6 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/sql/sqlplugin" - "go.temporal.io/server/common/softassert" ) type sqlShardStore struct { @@ -134,7 +133,6 @@ func lockShard( switch err { case nil: if rangeID != oldRangeID { - softassert.Sometimes(logger).Debug("ShardOwnershipLostError: Failed to update shard") return &persistence.ShardOwnershipLostError{ ShardID: shardID, Msg: fmt.Sprintf("Failed to update shard. Previous range ID: %v; new range ID: %v", oldRangeID, rangeID), diff --git a/common/softassert/softassert.go b/common/softassert/softassert.go index 52a562ba4d7..19ca896441b 100644 --- a/common/softassert/softassert.go +++ b/common/softassert/softassert.go @@ -16,10 +16,6 @@ import ( "go.temporal.io/server/common/log/tag" ) -type sometimesLogger struct { - logger log.Logger -} - // That performs a soft assertion by logging an error if the given condition is false. // It is meant to indicate a condition is always expected to be true. // Returns true if the condition is met, otherwise false. @@ -47,43 +43,3 @@ func That(logger log.Logger, condition bool, staticMessage string, tags ...tag.T func Fail(logger log.Logger, staticMessage string, tags ...tag.Tag) { logger.Error("failed assertion: "+staticMessage, append([]tag.Tag{tag.FailedAssertion}, tags...)...) } - -// Sometimes is used to log a message of a noteworthy but non-problematic event. -// -// Example: -// softassert.Sometimes(logger).Warn("termination event", tag.NewStringTag("state", object.state)) -func Sometimes(logger log.Logger) *sometimesLogger { - return &sometimesLogger{logger: logger} -} - -// Debug logs a message at debug level. -// -// `staticMessage` is expected to be a static string to help with grouping and searching logs. -// Dynamic information should be passed via `tags`. -func (s *sometimesLogger) Debug(staticMessage string, tags ...tag.Tag) { - s.logger.Debug(staticMessage, tags...) -} - -// Info logs a message at info level. -// -// `staticMessage` is expected to be a static string to help with grouping and searching logs. -// Dynamic information should be passed via `tags`. -func (s *sometimesLogger) Info(staticMessage string, tags ...tag.Tag) { - s.logger.Info(staticMessage, tags...) -} - -// Warn logs a message at warn level. -// -// `staticMessage` is expected to be a static string to help with grouping and searching logs. -// Dynamic information should be passed via `tags`. -func (s *sometimesLogger) Warn(staticMessage string, tags ...tag.Tag) { - s.logger.Warn(staticMessage, tags...) -} - -// Error logs a message at error level. -// -// `staticMessage` is expected to be a static string to help with grouping and searching logs. -// Dynamic information should be passed via `tags`. -func (s *sometimesLogger) Error(staticMessage string, tags ...tag.Tag) { - s.logger.Error(staticMessage, tags...) -} diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 1ef2e3bfe02..51d8d10b8fd 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -30,7 +30,6 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/searchattribute" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/common/tasktoken" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/api" @@ -130,12 +129,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( metrics.StaleMutableStateCounter.With(handler.metricsHandler).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) - softassert.Sometimes(handler.logger).Debug("stale mutable state detected", - tag.WorkflowID(token.GetWorkflowId()), - tag.WorkflowRunID(token.GetRunId()), - tag.WorkflowScheduledEventID(token.GetScheduledEventId()), - tag.NewInt64("mutable-state-next-event-id", mutableState.GetNextEventID()), - ) return false } return true @@ -173,7 +166,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( // This is NOT 100% bulletproof solution because this write operation may also fail. // TODO: remove this call when GetWorkflowExecutionHistory includes speculative WFT events. if clearStickyErr := handler.clearStickyTaskQueue(ctx, workflowLease.GetContext()); clearStickyErr != nil { - softassert.Sometimes(handler.logger).Error("Failed to clear stickiness after speculative workflow task failed to complete.", + handler.logger.Error("Failed to clear stickiness after speculative workflow task failed to complete.", tag.NewErrorTag("clear-sticky-error", clearStickyErr), tag.Error(retError), tag.WorkflowID(token.GetWorkflowId()), @@ -232,7 +225,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( if retError != nil { cancelled := effects.Cancel(ctx) if cancelled { - softassert.Sometimes(handler.logger).Info("Canceled effects due to error", + handler.logger.Info("Canceled effects due to error", tag.Error(retError), tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), @@ -291,11 +284,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( metrics.NamespaceTag(nsName), ) metrics.WorkflowTaskHeartbeatTimeoutCounter.With(scope).Record(1) - softassert.Sometimes(handler.logger).Debug("workflow task heartbeat timed out", - tag.WorkflowNamespaceID(nsName), - tag.WorkflowID(token.GetWorkflowId()), - tag.WorkflowRunID(token.GetRunId()), - ) completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask) if err != nil { return nil, err @@ -350,12 +338,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( // and admitted updates are lost. Uncomment this check when durable admitted is implemented // or updates stay in the registry after WFT is failed. hasBufferedEventsOrMessages := ms.HasBufferedEvents() // || updateRegistry.HasOutgoingMessages(false) - if hasBufferedEventsOrMessages { - softassert.Sometimes(handler.logger).Debug("workflow has buffered events/messages", - tag.WorkflowID(token.GetWorkflowId()), - tag.WorkflowRunID(token.GetRunId()), - ) - } if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil { wtFailedCause = newWorkflowTaskFailedCause( enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY, @@ -465,7 +447,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( metrics.FailureTag(wtFailedCause.failedCause.String()), metrics.FirstAttemptTag(currentWorkflowTask.Attempt), ) - softassert.Sometimes(handler.logger).Info("Failing the workflow task.", + handler.logger.Info("Failing the workflow task.", tag.Value(wtFailedCause.Message()), tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), @@ -641,12 +623,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( // if updateErr resulted in TransactionSizeLimitError then fail workflow switch updateErr.(type) { case *persistence.TransactionSizeLimitError: - softassert.Sometimes(handler.logger).Debug("workflow terminated due to size limit", - tag.WorkflowID(token.GetWorkflowId()), - tag.WorkflowRunID(token.GetRunId()), - tag.Error(updateErr), - ) - // must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution // clears mutable state if error is returned ms, err = weContext.LoadMutableState(ctx, handler.shardContext) diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index c5974f6ba2a..7a3565b4e48 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -34,7 +34,6 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/protocol" "go.temporal.io/server/common/searchattribute" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/common/tasktoken" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/api" @@ -240,7 +239,7 @@ func (handler *workflowTaskCompletedHandler) rejectUnprocessedUpdates( handler.effects) if len(rejectedUpdateIDs) > 0 { - softassert.Sometimes(handler.logger).Warn( + handler.logger.Warn( "Workflow task completed w/o processing updates.", tag.WorkflowNamespaceID(wfKey.NamespaceID), tag.WorkflowID(wfKey.WorkflowID), @@ -374,21 +373,12 @@ func (handler *workflowTaskCompletedHandler) handleMessage( } if upd == nil { // Update was not found in the registry and can't be resurrected. - softassert.Sometimes(handler.logger).Debug("update lost and cannot be resurrected", - tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId), - tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId)) return handler.failWorkflowTask( enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, serviceerror.NewNotFoundf("update %s wasn't found on the server. This is most likely a transient error which will be resolved automatically by retries", message.ProtocolInstanceId)) } if err := upd.OnProtocolMessage(message, workflow.WithEffects(handler.effects, handler.mutableState)); err != nil { - softassert.Sometimes(handler.logger).Debug("update failed", - tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId), - tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId), - tag.NewStringTag("error", err.Error())) return handler.failWorkflowTaskOnInvalidArgument( enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err) } diff --git a/service/history/queues/dlq_writer.go b/service/history/queues/dlq_writer.go index fd9aa8bebf6..726c8109329 100644 --- a/service/history/queues/dlq_writer.go +++ b/service/history/queues/dlq_writer.go @@ -11,7 +11,6 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/service/history/tasks" ) @@ -119,7 +118,7 @@ func (q *DLQWriter) WriteTaskToDLQ( } else { namespaceTag = tag.WorkflowNamespace(string(ns.Name())) } - softassert.Sometimes(q.logger).Warn("Task enqueued to DLQ", + q.logger.Warn("Task enqueued to DLQ", tag.DLQMessageID(resp.Metadata.ID), tag.SourceCluster(sourceCluster), tag.TargetCluster(targetCluster), diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 76facc55fbb..a820293c1c9 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -29,7 +29,6 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/softassert" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/common/telemetry" "go.temporal.io/server/common/util" @@ -467,7 +466,6 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool, if errors.As(err, &resourceExhaustedErr) { switch resourceExhaustedErr.Cause { //nolint:exhaustive case enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW: - softassert.Sometimes(e.logger).Debug("task throttled due to busy workflow", tag.TaskType(e.GetType())) err = consts.ErrResourceExhaustedBusyWorkflow case enumspb.RESOURCE_EXHAUSTED_CAUSE_APS_LIMIT: err = consts.ErrResourceExhaustedAPSLimit @@ -510,14 +508,7 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool, func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool { var terr MaybeTerminalTaskError if errors.As(err, &terr) { - isTerminal := terr.IsTerminalTaskError() - if isTerminal { - softassert.Sometimes(e.logger).Debug("terminal task error detected", - tag.TaskType(e.GetType()), - tag.Error(err), - ) - } - return isTerminal + return terr.IsTerminalTaskError() } if _, isDataLoss := err.(*serviceerror.DataLoss); isDataLoss { @@ -527,10 +518,6 @@ func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool { isInternalError := common.IsInternalError(err) if isInternalError { metrics.TaskInternalErrorCounter.With(e.metricsHandler).Record(1) - softassert.Sometimes(e.logger).Debug("internal non-retryable task processing error", - tag.TaskType(e.GetType()), - tag.Error(err), - ) // Only DQL/drop when configured to shouldDLQ := e.dlqInternalErrors() return shouldDLQ @@ -588,9 +575,9 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { tag.NewStringTag("task-category", e.GetCategory().Name()), ) if attempt > taskCriticalLogMetricAttempts { - softassert.Sometimes(logger).Error("Critical error processing task, retrying.", tag.OperationCritical) + logger.Error("Critical error processing task, retrying.", tag.OperationCritical) } else { - softassert.Sometimes(logger).Warn("Fail to process task") + logger.Warn("Fail to process task") } if e.isUnexpectedNonRetryableError(err) { diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index 3001a27452b..a51e3b1414a 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -345,7 +345,7 @@ func (e *ExecutableTaskImpl) Resend( ) (bool, error) { remainingAttempt-- if remainingAttempt < 0 { - softassert.Sometimes(e.Logger).Error("resend history attempts exceeded", + e.Logger.Error("resend history attempts exceeded", tag.WorkflowNamespaceID(retryErr.NamespaceId), tag.WorkflowID(retryErr.WorkflowId), tag.WorkflowRunID(retryErr.RunId), @@ -773,7 +773,7 @@ func (e *ExecutableTaskImpl) MarkPoisonPill() error { taskInfo := e.ReplicationTask().GetRawTaskInfo() if e.markPoisonPillAttempts >= MarkPoisonPillMaxAttempts { - softassert.Sometimes(e.Logger).Error("MarkPoisonPill reached max attempts", + e.Logger.Error("MarkPoisonPill reached max attempts", tag.SourceCluster(e.SourceClusterName()), tag.ReplicationTask(taskInfo), ) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index b10f63eb93f..582b2777fe6 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -46,7 +46,6 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/searchattribute" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -2280,7 +2279,6 @@ func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) { // newShardClosedErrorWithShardID when shard is closed and a req cannot be processed func (s *ContextImpl) newShardClosedErrorWithShardID() *persistence.ShardOwnershipLostError { - softassert.Sometimes(s.contextTaggedLogger).Debug("ShardOwnershipLostError: Shard closed") return &persistence.ShardOwnershipLostError{ ShardID: s.shardID, // immutable Msg: "shard closed", diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index bb7efbff440..c89f7479de3 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -24,7 +24,6 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/priorities" "go.temporal.io/server/common/resource" - "go.temporal.io/server/common/softassert" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/deletemanager" @@ -537,7 +536,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask( } if task.Attempt < activityInfo.Attempt || activityInfo.StartedEventId != common.EmptyEventID { - softassert.Sometimes(t.logger).Info("Duplicate activity retry timer task", + t.logger.Info("Duplicate activity retry timer task", tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId), tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()), tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId), @@ -910,12 +909,6 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag( case enumspb.TIMEOUT_TYPE_HEARTBEAT: metrics.HeartbeatTimeoutCounter.With(metricsScope).Record(1) } - - softassert.Sometimes(t.logger).Debug("timer queue task timed out", - tag.NewStringTag("timer-type", timerType.String()), - tag.Operation(operation), - tag.Attempt(taskAttempt), - ) } func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules( diff --git a/service/history/workflow/update/util.go b/service/history/workflow/update/util.go index b5097403b26..6d77f1698de 100644 --- a/service/history/workflow/update/util.go +++ b/service/history/workflow/update/util.go @@ -64,7 +64,7 @@ func (i *instrumentation) countTooMany() { func (i *instrumentation) countAborted(updateID string, reason AbortReason) { i.metrics.Counter(metrics.WorkflowExecutionUpdateAborted.Name()). Record(1, metrics.ReasonTag(metrics.ReasonString(reason.String()))) - softassert.Sometimes(i.log).Debug("update aborted", + i.log.Debug("update aborted", tag.NewStringTag("reason", reason.String()), tag.NewStringTag("update-id", updateID), ) @@ -103,7 +103,7 @@ func (i *instrumentation) oneOf(counterName string) { } func (i *instrumentation) stateChange(updateID string, from, to state) { - softassert.Sometimes(i.log).Debug( + i.log.Debug( "update state change", tag.ComponentWorkflowUpdate, tag.NewStringTag("update-id", updateID),