Skip to content

Commit 9afe23e

Browse files
authored
fix(backend): ignore unknown fields for pb json unmarshaling (kubeflow#11662)
Signed-off-by: Humair Khan <[email protected]>
1 parent ebaaf75 commit 9afe23e

File tree

9 files changed

+93
-32
lines changed

9 files changed

+93
-32
lines changed

backend/src/apiserver/server/list_request_util.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"strconv"
2222
"strings"
2323

24-
"github.com/golang/protobuf/jsonpb"
2524
apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
2625
apiv2beta1 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"
2726
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
@@ -148,13 +147,13 @@ func parseAPIFilter(encoded string, apiVersion string) (interface{}, error) {
148147
switch apiVersion {
149148
case "v2beta1":
150149
f := &apiv2beta1.Filter{}
151-
if err := jsonpb.UnmarshalString(decoded, f); err != nil {
150+
if err := util.UnmarshalString(decoded, f); err != nil {
152151
return nil, util.NewInvalidInputError("failed to parse valid filter from %q: %v", encoded, err)
153152
}
154153
return f, nil
155154
case "v1beta1":
156155
f := &apiv1beta1.Filter{}
157-
if err := jsonpb.UnmarshalString(decoded, f); err != nil {
156+
if err := util.UnmarshalString(decoded, f); err != nil {
158157
return nil, util.NewInvalidInputError("failed to parse valid filter from %q: %v", encoded, err)
159158
}
160159
return f, nil

backend/src/common/util/json.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
package util
1616

1717
import (
18-
"encoding/json"
18+
"github.com/golang/protobuf/jsonpb"
1919

20+
"encoding/json"
2021
"github.com/golang/glog"
22+
"github.com/golang/protobuf/proto"
23+
"strings"
2124
)
2225

2326
func UnmarshalJsonOrFail(data string, v interface{}) {
@@ -63,3 +66,10 @@ func UnmarshalJsonWithError(data interface{}, v *interface{}) error {
6366
}
6467
return nil
6568
}
69+
70+
// UnmarshalString unmarshals a JSON object from s into m.
71+
// Allows unknown fields
72+
func UnmarshalString(s string, m proto.Message) error {
73+
unmarshaler := jsonpb.Unmarshaler{AllowUnknownFields: true}
74+
return unmarshaler.Unmarshal(strings.NewReader(s), m)
75+
}

backend/src/common/util/pipelinerun.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"github.com/ghodss/yaml"
2626
"github.com/golang/glog"
27-
"github.com/golang/protobuf/jsonpb"
2827
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
2928
exec "github.com/kubeflow/pipelines/backend/src/common"
3029
swfregister "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow"
@@ -664,7 +663,7 @@ func collectTaskRunMetricsOrNil(
664663
// ReportRunMetricsRequest as a workaround to hold user's metrics, which is a superset of what
665664
// user can provide.
666665
reportMetricsRequest := new(api.ReportRunMetricsRequest)
667-
err = jsonpb.UnmarshalString(metricsJSON, reportMetricsRequest)
666+
err = UnmarshalString(metricsJSON, reportMetricsRequest)
668667
if err != nil {
669668
// User writes invalid metrics JSON.
670669
// TODO(#1426): report the error back to api server to notify user

backend/src/common/util/workflow.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/argoproj/argo-workflows/v3/workflow/packer"
3030
"github.com/argoproj/argo-workflows/v3/workflow/validate"
3131
"github.com/golang/glog"
32-
"github.com/golang/protobuf/jsonpb"
3332
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
3433
exec "github.com/kubeflow/pipelines/backend/src/common"
3534
swfregister "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow"
@@ -517,7 +516,7 @@ func collectNodeMetricsOrNil(runID string, nodeStatus *workflowapi.NodeStatus, r
517516
// ReportRunMetricsRequest as a workaround to hold user's metrics, which is a superset of what
518517
// user can provide.
519518
reportMetricsRequest := new(api.ReportRunMetricsRequest)
520-
err = jsonpb.UnmarshalString(metricsJSON, reportMetricsRequest)
519+
err = UnmarshalString(metricsJSON, reportMetricsRequest)
521520
if err != nil {
522521
// User writes invalid metrics JSON.
523522
// TODO(#1426): report the error back to api server to notify user

backend/src/v2/cmd/driver/main.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@ import (
1919
"encoding/json"
2020
"flag"
2121
"fmt"
22-
"os"
23-
"path/filepath"
24-
"strconv"
25-
26-
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
27-
"github.com/kubeflow/pipelines/backend/src/v2/driver"
28-
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
22+
"github.com/kubeflow/pipelines/backend/src/common/util"
2923

3024
"github.com/golang/glog"
3125
"github.com/golang/protobuf/jsonpb"
3226
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
27+
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
3328
"github.com/kubeflow/pipelines/backend/src/v2/config"
29+
"github.com/kubeflow/pipelines/backend/src/v2/driver"
3430
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
31+
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
32+
"os"
33+
"path/filepath"
34+
"strconv"
3535
)
3636

3737
const (
@@ -116,37 +116,33 @@ func drive() (err error) {
116116
}
117117
glog.Infof("input ComponentSpec:%s\n", prettyPrint(*componentSpecJson))
118118
componentSpec := &pipelinespec.ComponentSpec{}
119-
if err := jsonpb.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
119+
if err := util.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
120120
return fmt.Errorf("failed to unmarshal component spec, error: %w\ncomponentSpec: %v", err, prettyPrint(*componentSpecJson))
121121
}
122122
var taskSpec *pipelinespec.PipelineTaskSpec
123123
if *taskSpecJson != "" {
124124
glog.Infof("input TaskSpec:%s\n", prettyPrint(*taskSpecJson))
125125
taskSpec = &pipelinespec.PipelineTaskSpec{}
126-
if err := jsonpb.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
126+
if err := util.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
127127
return fmt.Errorf("failed to unmarshal task spec, error: %w\ntask: %v", err, taskSpecJson)
128128
}
129129
}
130130
glog.Infof("input ContainerSpec:%s\n", prettyPrint(*containerSpecJson))
131131
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
132-
if err := jsonpb.UnmarshalString(*containerSpecJson, containerSpec); err != nil {
132+
if err := util.UnmarshalString(*containerSpecJson, containerSpec); err != nil {
133133
return fmt.Errorf("failed to unmarshal container spec, error: %w\ncontainerSpec: %v", err, containerSpecJson)
134134
}
135135
var runtimeConfig *pipelinespec.PipelineJob_RuntimeConfig
136136
if *runtimeConfigJson != "" {
137137
glog.Infof("input RuntimeConfig:%s\n", prettyPrint(*runtimeConfigJson))
138138
runtimeConfig = &pipelinespec.PipelineJob_RuntimeConfig{}
139-
if err := jsonpb.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
139+
if err := util.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
140140
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
141141
}
142142
}
143-
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
144-
if *k8sExecConfigJson != "" {
145-
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*k8sExecConfigJson))
146-
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
147-
if err := jsonpb.UnmarshalString(*k8sExecConfigJson, k8sExecCfg); err != nil {
148-
return fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, k8sExecConfigJson)
149-
}
143+
k8sExecCfg, err := parseExecConfigJson(k8sExecConfigJson)
144+
if err != nil {
145+
return err
150146
}
151147
namespace, err := config.InPodNamespace()
152148
if err != nil {
@@ -204,11 +200,24 @@ func drive() (err error) {
204200
IterationCount: *iterationCountPath,
205201
CachedDecision: *cachedDecisionPath,
206202
Condition: *conditionPath,
207-
PodSpecPatch: *podSpecPatchPath}
203+
PodSpecPatch: *podSpecPatchPath,
204+
}
208205

209206
return handleExecution(execution, *driverType, executionPaths)
210207
}
211208

209+
func parseExecConfigJson(k8sExecConfigJson *string) (*kubernetesplatform.KubernetesExecutorConfig, error) {
210+
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
211+
if *k8sExecConfigJson != "" {
212+
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*k8sExecConfigJson))
213+
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
214+
if err := util.UnmarshalString(*k8sExecConfigJson, k8sExecCfg); err != nil {
215+
return nil, fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, k8sExecConfigJson)
216+
}
217+
}
218+
return k8sExecCfg, nil
219+
}
220+
212221
func handleExecution(execution *driver.Execution, driverType string, executionPaths *ExecutionPaths) error {
213222
if execution.ID != 0 {
214223
glog.Infof("output execution.ID=%v", execution.ID)

backend/src/v2/cmd/driver/main_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,55 @@
11
package main
22

33
import (
4+
"github.com/golang/protobuf/proto"
45
"github.com/kubeflow/pipelines/backend/src/v2/driver"
6+
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
7+
"github.com/stretchr/testify/assert"
58
"os"
69
"testing"
710
)
811

12+
func strPtr(s string) *string {
13+
return &s
14+
}
15+
16+
func TestSpecParsing(t *testing.T) {
17+
tt := []struct {
18+
name string
19+
input *string
20+
expected *kubernetesplatform.KubernetesExecutorConfig
21+
wantErr bool
22+
}{
23+
{
24+
"Valid - test kubecfg value parse.",
25+
strPtr("{\"imagePullSecret\":[{\"secret_name\":\"value1\"}]}"),
26+
&kubernetesplatform.KubernetesExecutorConfig{
27+
ImagePullSecret: []*kubernetesplatform.ImagePullSecret{
28+
{SecretName: "value1"},
29+
},
30+
},
31+
false,
32+
},
33+
{
34+
"Valid - test kubecfg value ignores unknown field.",
35+
strPtr("{\"imagePullSecret\":[{\"secret_name\":\"value1\"}], \"unknown_field\": \"something\"}"),
36+
&kubernetesplatform.KubernetesExecutorConfig{
37+
ImagePullSecret: []*kubernetesplatform.ImagePullSecret{
38+
{SecretName: "value1"},
39+
},
40+
},
41+
false,
42+
},
43+
}
44+
45+
for _, tc := range tt {
46+
t.Logf("Running test case: %s", tc.name)
47+
cfg, err := parseExecConfigJson(tc.input)
48+
assert.Equal(t, tc.wantErr, err != nil)
49+
assert.True(t, proto.Equal(tc.expected, cfg))
50+
}
51+
}
52+
953
func Test_handleExecutionContainer(t *testing.T) {
1054
execution := &driver.Execution{}
1155

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ package argocompiler
1616

1717
import (
1818
"fmt"
19+
"github.com/kubeflow/pipelines/backend/src/common/util"
1920
"os"
2021
"strconv"
2122
"strings"
2223

2324
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2425
"github.com/golang/glog"
25-
"github.com/golang/protobuf/jsonpb"
2626
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2727
"github.com/kubeflow/pipelines/backend/src/v2/component"
2828
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
@@ -419,7 +419,7 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
419419

420420
if kubernetesConfigParam != nil {
421421
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
422-
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
422+
if err := util.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
423423
extendPodMetadata(&executor.Metadata, k8sExecCfg)
424424
}
425425
}

backend/src/v2/compiler/visitor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package compiler
2424
import (
2525
"bytes"
2626
"fmt"
27+
"github.com/kubeflow/pipelines/backend/src/common/util"
2728
"sort"
2829

2930
"github.com/golang/protobuf/jsonpb"
@@ -187,7 +188,7 @@ func GetPipelineSpec(job *pipelinespec.PipelineJob) (*pipelinespec.PipelineSpec,
187188
return nil, fmt.Errorf("failed marshal pipeline spec to json: %w", err)
188189
}
189190
spec := &pipelinespec.PipelineSpec{}
190-
if err := jsonpb.UnmarshalString(json, spec); err != nil {
191+
if err := util.UnmarshalString(json, spec); err != nil {
191192
return nil, fmt.Errorf("failed to parse pipeline spec: %v", err)
192193
}
193194
return spec, nil

backend/src/v2/compiler/visitor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ package compiler_test
1515

1616
import (
1717
"fmt"
18+
"github.com/kubeflow/pipelines/backend/src/common/util"
1819
"os"
1920
"testing"
2021

21-
"github.com/golang/protobuf/jsonpb"
2222
"github.com/google/go-cmp/cmp"
2323
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2424
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
@@ -93,7 +93,7 @@ func load(t *testing.T, path string) *pipelinespec.PipelineJob {
9393
}
9494
json := string(content)
9595
job := &pipelinespec.PipelineJob{}
96-
if err := jsonpb.UnmarshalString(json, job); err != nil {
96+
if err := util.UnmarshalString(json, job); err != nil {
9797
t.Errorf("Failed to parse pipeline job, error: %s, job: %v", err, json)
9898
}
9999
return job

0 commit comments

Comments
 (0)