Skip to content

Commit 6f61aca

Browse files
committed
Remove softassert.Sometimes
1 parent 2c9211f commit 6f61aca

File tree

12 files changed

+13
-120
lines changed

12 files changed

+13
-120
lines changed

common/persistence/cassandra/mutable_state_task_store.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
p "go.temporal.io/server/common/persistence"
1111
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
1212
"go.temporal.io/server/common/persistence/serialization"
13-
"go.temporal.io/server/common/softassert"
1413
"go.temporal.io/server/service/history/tasks"
1514
)
1615

@@ -214,7 +213,6 @@ func (d *MutableStateTaskStore) AddHistoryTasks(
214213
if !applied {
215214
if previousRangeID, ok := previous["range_id"].(int64); ok && previousRangeID != request.RangeID {
216215
// CreateWorkflowExecution failed because rangeID was modified
217-
softassert.Sometimes(d.Logger).Debug("ShardOwnershipLost: Failed to add tasks")
218216
return &p.ShardOwnershipLostError{
219217
ShardID: request.ShardID,
220218
Msg: fmt.Sprintf("Failed to add tasks. Request RangeID: %v, Actual RangeID: %v", request.RangeID, previousRangeID),

common/persistence/cassandra/shard_store.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"go.temporal.io/server/common/log"
99
p "go.temporal.io/server/common/persistence"
1010
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
11-
"go.temporal.io/server/common/softassert"
1211
)
1312

1413
const (
@@ -146,7 +145,6 @@ func (d *ShardStore) UpdateShard(
146145
for k, v := range previous {
147146
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
148147
}
149-
softassert.Sometimes(d.Logger).Debug("ShardOwnershipLostError: Failed to update shard")
150148
return &p.ShardOwnershipLostError{
151149
ShardID: request.ShardID,
152150
Msg: fmt.Sprintf("Failed to update shard. previous_range_id: %v, columns: (%v)",

common/persistence/sql/shard.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"go.temporal.io/server/common/log"
1010
"go.temporal.io/server/common/persistence"
1111
"go.temporal.io/server/common/persistence/sql/sqlplugin"
12-
"go.temporal.io/server/common/softassert"
1312
)
1413

1514
type sqlShardStore struct {
@@ -134,7 +133,6 @@ func lockShard(
134133
switch err {
135134
case nil:
136135
if rangeID != oldRangeID {
137-
softassert.Sometimes(logger).Debug("ShardOwnershipLostError: Failed to update shard")
138136
return &persistence.ShardOwnershipLostError{
139137
ShardID: shardID,
140138
Msg: fmt.Sprintf("Failed to update shard. Previous range ID: %v; new range ID: %v", oldRangeID, rangeID),

common/softassert/softassert.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ import (
1616
"go.temporal.io/server/common/log/tag"
1717
)
1818

19-
type sometimesLogger struct {
20-
logger log.Logger
21-
}
22-
2319
// That performs a soft assertion by logging an error if the given condition is false.
2420
// It is meant to indicate a condition is always expected to be true.
2521
// Returns true if the condition is met, otherwise false.
@@ -47,43 +43,3 @@ func That(logger log.Logger, condition bool, staticMessage string, tags ...tag.T
4743
func Fail(logger log.Logger, staticMessage string, tags ...tag.Tag) {
4844
logger.Error("failed assertion: "+staticMessage, append([]tag.Tag{tag.FailedAssertion}, tags...)...)
4945
}
50-
51-
// Sometimes is used to log a message of a noteworthy but non-problematic event.
52-
//
53-
// Example:
54-
// softassert.Sometimes(logger).Warn("termination event", tag.NewStringTag("state", object.state))
55-
func Sometimes(logger log.Logger) *sometimesLogger {
56-
return &sometimesLogger{logger: logger}
57-
}
58-
59-
// Debug logs a message at debug level.
60-
//
61-
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
62-
// Dynamic information should be passed via `tags`.
63-
func (s *sometimesLogger) Debug(staticMessage string, tags ...tag.Tag) {
64-
s.logger.Debug(staticMessage, tags...)
65-
}
66-
67-
// Info logs a message at info level.
68-
//
69-
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
70-
// Dynamic information should be passed via `tags`.
71-
func (s *sometimesLogger) Info(staticMessage string, tags ...tag.Tag) {
72-
s.logger.Info(staticMessage, tags...)
73-
}
74-
75-
// Warn logs a message at warn level.
76-
//
77-
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
78-
// Dynamic information should be passed via `tags`.
79-
func (s *sometimesLogger) Warn(staticMessage string, tags ...tag.Tag) {
80-
s.logger.Warn(staticMessage, tags...)
81-
}
82-
83-
// Error logs a message at error level.
84-
//
85-
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
86-
// Dynamic information should be passed via `tags`.
87-
func (s *sometimesLogger) Error(staticMessage string, tags ...tag.Tag) {
88-
s.logger.Error(staticMessage, tags...)
89-
}

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"go.temporal.io/server/common/persistence"
3131
"go.temporal.io/server/common/persistence/visibility/manager"
3232
"go.temporal.io/server/common/searchattribute"
33-
"go.temporal.io/server/common/softassert"
3433
"go.temporal.io/server/common/tasktoken"
3534
"go.temporal.io/server/common/worker_versioning"
3635
"go.temporal.io/server/service/history/api"
@@ -130,12 +129,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
130129
metrics.StaleMutableStateCounter.With(handler.metricsHandler).Record(
131130
1,
132131
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope))
133-
softassert.Sometimes(handler.logger).Debug("stale mutable state detected",
134-
tag.WorkflowID(token.GetWorkflowId()),
135-
tag.WorkflowRunID(token.GetRunId()),
136-
tag.WorkflowScheduledEventID(token.GetScheduledEventId()),
137-
tag.NewInt64("mutable-state-next-event-id", mutableState.GetNextEventID()),
138-
)
139132
return false
140133
}
141134
return true
@@ -173,7 +166,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
173166
// This is NOT 100% bulletproof solution because this write operation may also fail.
174167
// TODO: remove this call when GetWorkflowExecutionHistory includes speculative WFT events.
175168
if clearStickyErr := handler.clearStickyTaskQueue(ctx, workflowLease.GetContext()); clearStickyErr != nil {
176-
softassert.Sometimes(handler.logger).Error("Failed to clear stickiness after speculative workflow task failed to complete.",
169+
handler.logger.Error("Failed to clear stickiness after speculative workflow task failed to complete.",
177170
tag.NewErrorTag("clear-sticky-error", clearStickyErr),
178171
tag.Error(retError),
179172
tag.WorkflowID(token.GetWorkflowId()),
@@ -232,7 +225,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
232225
if retError != nil {
233226
cancelled := effects.Cancel(ctx)
234227
if cancelled {
235-
softassert.Sometimes(handler.logger).Info("Canceled effects due to error",
228+
handler.logger.Info("Canceled effects due to error",
236229
tag.Error(retError),
237230
tag.WorkflowID(token.GetWorkflowId()),
238231
tag.WorkflowRunID(token.GetRunId()),
@@ -291,11 +284,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
291284
metrics.NamespaceTag(nsName),
292285
)
293286
metrics.WorkflowTaskHeartbeatTimeoutCounter.With(scope).Record(1)
294-
softassert.Sometimes(handler.logger).Debug("workflow task heartbeat timed out",
295-
tag.WorkflowNamespaceID(nsName),
296-
tag.WorkflowID(token.GetWorkflowId()),
297-
tag.WorkflowRunID(token.GetRunId()),
298-
)
299287
completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask)
300288
if err != nil {
301289
return nil, err
@@ -350,12 +338,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
350338
// and admitted updates are lost. Uncomment this check when durable admitted is implemented
351339
// or updates stay in the registry after WFT is failed.
352340
hasBufferedEventsOrMessages := ms.HasBufferedEvents() // || updateRegistry.HasOutgoingMessages(false)
353-
if hasBufferedEventsOrMessages {
354-
softassert.Sometimes(handler.logger).Debug("workflow has buffered events/messages",
355-
tag.WorkflowID(token.GetWorkflowId()),
356-
tag.WorkflowRunID(token.GetRunId()),
357-
)
358-
}
359341
if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil {
360342
wtFailedCause = newWorkflowTaskFailedCause(
361343
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY,
@@ -465,7 +447,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
465447
metrics.FailureTag(wtFailedCause.failedCause.String()),
466448
metrics.FirstAttemptTag(currentWorkflowTask.Attempt),
467449
)
468-
softassert.Sometimes(handler.logger).Info("Failing the workflow task.",
450+
handler.logger.Info("Failing the workflow task.",
469451
tag.Value(wtFailedCause.Message()),
470452
tag.WorkflowID(token.GetWorkflowId()),
471453
tag.WorkflowRunID(token.GetRunId()),
@@ -641,12 +623,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
641623
// if updateErr resulted in TransactionSizeLimitError then fail workflow
642624
switch updateErr.(type) {
643625
case *persistence.TransactionSizeLimitError:
644-
softassert.Sometimes(handler.logger).Debug("workflow terminated due to size limit",
645-
tag.WorkflowID(token.GetWorkflowId()),
646-
tag.WorkflowRunID(token.GetRunId()),
647-
tag.Error(updateErr),
648-
)
649-
650626
// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
651627
// clears mutable state if error is returned
652628
ms, err = weContext.LoadMutableState(ctx, handler.shardContext)

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"go.temporal.io/server/common/payloads"
3535
"go.temporal.io/server/common/protocol"
3636
"go.temporal.io/server/common/searchattribute"
37-
"go.temporal.io/server/common/softassert"
3837
"go.temporal.io/server/common/tasktoken"
3938
"go.temporal.io/server/common/worker_versioning"
4039
"go.temporal.io/server/service/history/api"
@@ -240,7 +239,7 @@ func (handler *workflowTaskCompletedHandler) rejectUnprocessedUpdates(
240239
handler.effects)
241240

242241
if len(rejectedUpdateIDs) > 0 {
243-
softassert.Sometimes(handler.logger).Warn(
242+
handler.logger.Warn(
244243
"Workflow task completed w/o processing updates.",
245244
tag.WorkflowNamespaceID(wfKey.NamespaceID),
246245
tag.WorkflowID(wfKey.WorkflowID),
@@ -374,21 +373,12 @@ func (handler *workflowTaskCompletedHandler) handleMessage(
374373
}
375374
if upd == nil {
376375
// Update was not found in the registry and can't be resurrected.
377-
softassert.Sometimes(handler.logger).Debug("update lost and cannot be resurrected",
378-
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
379-
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
380-
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId))
381376
return handler.failWorkflowTask(
382377
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE,
383378
serviceerror.NewNotFoundf("update %s wasn't found on the server. This is most likely a transient error which will be resolved automatically by retries", message.ProtocolInstanceId))
384379
}
385380

386381
if err := upd.OnProtocolMessage(message, workflow.WithEffects(handler.effects, handler.mutableState)); err != nil {
387-
softassert.Sometimes(handler.logger).Debug("update failed",
388-
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
389-
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
390-
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId),
391-
tag.NewStringTag("error", err.Error()))
392382
return handler.failWorkflowTaskOnInvalidArgument(
393383
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err)
394384
}

service/history/queues/dlq_writer.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"go.temporal.io/server/common/metrics"
1212
"go.temporal.io/server/common/namespace"
1313
"go.temporal.io/server/common/persistence"
14-
"go.temporal.io/server/common/softassert"
1514
"go.temporal.io/server/service/history/tasks"
1615
)
1716

@@ -119,7 +118,7 @@ func (q *DLQWriter) WriteTaskToDLQ(
119118
} else {
120119
namespaceTag = tag.WorkflowNamespace(string(ns.Name()))
121120
}
122-
softassert.Sometimes(q.logger).Warn("Task enqueued to DLQ",
121+
q.logger.Warn("Task enqueued to DLQ",
123122
tag.DLQMessageID(resp.Metadata.ID),
124123
tag.SourceCluster(sourceCluster),
125124
tag.TargetCluster(targetCluster),

service/history/queues/executable.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"go.temporal.io/server/common/log/tag"
3030
"go.temporal.io/server/common/metrics"
3131
"go.temporal.io/server/common/namespace"
32-
"go.temporal.io/server/common/softassert"
3332
ctasks "go.temporal.io/server/common/tasks"
3433
"go.temporal.io/server/common/telemetry"
3534
"go.temporal.io/server/common/util"
@@ -467,7 +466,6 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
467466
if errors.As(err, &resourceExhaustedErr) {
468467
switch resourceExhaustedErr.Cause { //nolint:exhaustive
469468
case enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW:
470-
softassert.Sometimes(e.logger).Debug("task throttled due to busy workflow", tag.TaskType(e.GetType()))
471469
err = consts.ErrResourceExhaustedBusyWorkflow
472470
case enumspb.RESOURCE_EXHAUSTED_CAUSE_APS_LIMIT:
473471
err = consts.ErrResourceExhaustedAPSLimit
@@ -510,14 +508,7 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
510508
func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
511509
var terr MaybeTerminalTaskError
512510
if errors.As(err, &terr) {
513-
isTerminal := terr.IsTerminalTaskError()
514-
if isTerminal {
515-
softassert.Sometimes(e.logger).Debug("terminal task error detected",
516-
tag.TaskType(e.GetType()),
517-
tag.Error(err),
518-
)
519-
}
520-
return isTerminal
511+
return terr.IsTerminalTaskError()
521512
}
522513

523514
if _, isDataLoss := err.(*serviceerror.DataLoss); isDataLoss {
@@ -527,10 +518,6 @@ func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
527518
isInternalError := common.IsInternalError(err)
528519
if isInternalError {
529520
metrics.TaskInternalErrorCounter.With(e.metricsHandler).Record(1)
530-
softassert.Sometimes(e.logger).Debug("internal non-retryable task processing error",
531-
tag.TaskType(e.GetType()),
532-
tag.Error(err),
533-
)
534521
// Only DQL/drop when configured to
535522
shouldDLQ := e.dlqInternalErrors()
536523
return shouldDLQ
@@ -588,9 +575,9 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
588575
tag.NewStringTag("task-category", e.GetCategory().Name()),
589576
)
590577
if attempt > taskCriticalLogMetricAttempts {
591-
softassert.Sometimes(logger).Error("Critical error processing task, retrying.", tag.OperationCritical)
578+
logger.Error("Critical error processing task, retrying.", tag.OperationCritical)
592579
} else {
593-
softassert.Sometimes(logger).Warn("Fail to process task")
580+
logger.Warn("Fail to process task")
594581
}
595582

596583
if e.isUnexpectedNonRetryableError(err) {

service/history/replication/executable_task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (e *ExecutableTaskImpl) Resend(
345345
) (bool, error) {
346346
remainingAttempt--
347347
if remainingAttempt < 0 {
348-
softassert.Sometimes(e.Logger).Error("resend history attempts exceeded",
348+
e.Logger.Error("resend history attempts exceeded",
349349
tag.WorkflowNamespaceID(retryErr.NamespaceId),
350350
tag.WorkflowID(retryErr.WorkflowId),
351351
tag.WorkflowRunID(retryErr.RunId),
@@ -773,7 +773,7 @@ func (e *ExecutableTaskImpl) MarkPoisonPill() error {
773773
taskInfo := e.ReplicationTask().GetRawTaskInfo()
774774

775775
if e.markPoisonPillAttempts >= MarkPoisonPillMaxAttempts {
776-
softassert.Sometimes(e.Logger).Error("MarkPoisonPill reached max attempts",
776+
e.Logger.Error("MarkPoisonPill reached max attempts",
777777
tag.SourceCluster(e.SourceClusterName()),
778778
tag.ReplicationTask(taskInfo),
779779
)

service/history/shard/context_impl.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646
"go.temporal.io/server/common/primitives/timestamp"
4747
"go.temporal.io/server/common/rpc"
4848
"go.temporal.io/server/common/searchattribute"
49-
"go.temporal.io/server/common/softassert"
5049
"go.temporal.io/server/common/util"
5150
"go.temporal.io/server/service/history/configs"
5251
"go.temporal.io/server/service/history/consts"
@@ -2280,7 +2279,6 @@ func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) {
22802279

22812280
// newShardClosedErrorWithShardID when shard is closed and a req cannot be processed
22822281
func (s *ContextImpl) newShardClosedErrorWithShardID() *persistence.ShardOwnershipLostError {
2283-
softassert.Sometimes(s.contextTaggedLogger).Debug("ShardOwnershipLostError: Shard closed")
22842282
return &persistence.ShardOwnershipLostError{
22852283
ShardID: s.shardID, // immutable
22862284
Msg: "shard closed",

0 commit comments

Comments
 (0)