Skip to content

Commit eb1d0df

Browse files
authored
Validate Priority metadata (#8159)
## What changed? Add validation for Priority metadata. ## Why? Negative keys/weights are ignored but we should reject requests with too-long keys. ## How did you test it? - [x] added new unit test(s)
1 parent a0edfc5 commit eb1d0df

File tree

4 files changed

+71
-0
lines changed

4 files changed

+71
-0
lines changed

common/priorities/priority_util.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ import (
44
"cmp"
55

66
commonpb "go.temporal.io/api/common/v1"
7+
"go.temporal.io/api/serviceerror"
8+
)
9+
10+
const (
11+
fairnessKeyMaxLength = 64
12+
)
13+
14+
var (
15+
ErrInvalidPriority = serviceerror.NewInvalidArgument("PriorityKey can't be negative")
16+
ErrFairnessKeyLength = serviceerror.NewInvalidArgument("FairnessKey length exceeds limit")
17+
ErrInvalidFairnessWeight = serviceerror.NewInvalidArgument("FairnessWeight can't be negative")
718
)
819

920
func Merge(
@@ -22,3 +33,16 @@ func Merge(
2233
FairnessWeight: cmp.Or(override.FairnessWeight, base.FairnessWeight),
2334
}
2435
}
36+
37+
func Validate(p *commonpb.Priority) error {
38+
if p == nil {
39+
return nil
40+
} else if p.PriorityKey < 0 {
41+
return ErrInvalidPriority
42+
} else if len(p.FairnessKey) > fairnessKeyMaxLength {
43+
return ErrFairnessKeyLength
44+
} else if p.FairnessWeight < 0 {
45+
return ErrInvalidFairnessWeight
46+
}
47+
return nil
48+
}

common/priorities/priority_util_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package priorities
22

33
import (
4+
"strings"
45
"testing"
56

7+
"github.com/stretchr/testify/assert"
68
"github.com/stretchr/testify/require"
79
commonpb "go.temporal.io/api/common/v1"
810
)
@@ -79,3 +81,30 @@ func TestMerge(t *testing.T) {
7981
})
8082
}
8183
}
84+
85+
func TestValidate(t *testing.T) {
86+
testcases := []struct {
87+
p *commonpb.Priority
88+
err bool
89+
}{
90+
{p: &commonpb.Priority{}},
91+
{p: &commonpb.Priority{PriorityKey: 5}},
92+
{p: &commonpb.Priority{PriorityKey: -5}, err: true},
93+
{p: &commonpb.Priority{FairnessKey: "abcdefg"}},
94+
{p: &commonpb.Priority{FairnessKey: strings.Repeat("abcdefg", 10)}, err: true},
95+
{p: &commonpb.Priority{FairnessWeight: 0.1}},
96+
{p: &commonpb.Priority{FairnessWeight: 1e10}},
97+
{p: &commonpb.Priority{FairnessWeight: -3}, err: true},
98+
}
99+
100+
for _, tc := range testcases {
101+
t.Run("test", func(t *testing.T) {
102+
err := Validate(tc.p)
103+
if tc.err {
104+
assert.Error(t, err)
105+
} else {
106+
assert.NoError(t, err)
107+
}
108+
})
109+
}
110+
}

service/frontend/workflow_handler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"go.temporal.io/server/common/persistence/visibility/manager"
6060
"go.temporal.io/server/common/primitives"
6161
"go.temporal.io/server/common/primitives/timestamp"
62+
"go.temporal.io/server/common/priorities"
6263
"go.temporal.io/server/common/retrypolicy"
6364
"go.temporal.io/server/common/rpc"
6465
"go.temporal.io/server/common/rpc/interceptor"
@@ -487,6 +488,10 @@ func (wh *WorkflowHandler) prepareStartWorkflowRequest(
487488
request.SearchAttributes = sa
488489
}
489490

491+
if err := priorities.Validate(request.Priority); err != nil {
492+
return nil, err
493+
}
494+
490495
if err := wh.validateWorkflowCompletionCallbacks(namespaceName, request.GetCompletionCallbacks()); err != nil {
491496
return nil, err
492497
}
@@ -2065,6 +2070,10 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
20652070
request.SearchAttributes = sa
20662071
}
20672072

2073+
if err := priorities.Validate(request.Priority); err != nil {
2074+
return nil, err
2075+
}
2076+
20682077
if err := wh.validateLinks(namespaceName, request.GetLinks()); err != nil {
20692078
return nil, err
20702079
}

service/history/api/command_attr_validator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.temporal.io/server/common/dynamicconfig"
1616
"go.temporal.io/server/common/namespace"
1717
"go.temporal.io/server/common/primitives/timestamp"
18+
"go.temporal.io/server/common/priorities"
1819
"go.temporal.io/server/common/retrypolicy"
1920
"go.temporal.io/server/common/searchattribute"
2021
"go.temporal.io/server/common/tqid"
@@ -130,6 +131,10 @@ func (v *CommandAttrValidator) ValidateActivityScheduleAttributes(
130131
return failedCause, serviceerror.NewInvalidArgumentf("Invalid HeartbeatTimeout for ScheduleActivityTaskCommand: %v. ActivityId=%s ActivityType=%s", err, activityID, activityType)
131132
}
132133

134+
if err := priorities.Validate(attributes.Priority); err != nil {
135+
return failedCause, err
136+
}
137+
133138
ScheduleToCloseSet := attributes.GetScheduleToCloseTimeout().AsDuration() > 0
134139
ScheduleToStartSet := attributes.GetScheduleToStartTimeout().AsDuration() > 0
135140
StartToCloseSet := attributes.GetStartToCloseTimeout().AsDuration() > 0
@@ -558,6 +563,10 @@ func (v *CommandAttrValidator) ValidateStartChildExecutionAttributes(
558563
return enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, fmt.Errorf("invalid SearchAttributes on StartChildWorkflowCommand: %w. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns)
559564
}
560565

566+
if err := priorities.Validate(attributes.Priority); err != nil {
567+
return failedCause, err
568+
}
569+
561570
// Inherit taskqueue from parent workflow execution if not provided on command
562571
if attributes.TaskQueue == nil {
563572
attributes.TaskQueue = &taskqueuepb.TaskQueue{

0 commit comments

Comments
 (0)