Skip to content

Commit d2c0376

Browse files
feat(backend): configurable log level for driver / launcher images (#11278)
* Do not invoke get image methods twice. Signed-off-by: carter.fendley <[email protected]> * Add configurable driver / launcher log level Signed-off-by: carter.fendley <[email protected]> * Add configurable driver / launcher log level Signed-off-by: carter.fendley <[email protected]> * Update argocompiler golden files Signed-off-by: carter.fendley <[email protected]> * Handle errors from flag setting and tests Signed-off-by: carter.fendley <[email protected]> * Update gold files & image masking to use ghcr Signed-off-by: carter.fendley <[email protected]> * Update tests with new images Signed-off-by: carter.fendley <[email protected]> * Use unified var for driver / launcher log level + patch user code launcher Signed-off-by: carter.fendley <[email protected]> * Add PIPELINE_LOG_LEVEL to deployment for discoverability Signed-off-by: carter.fendley <[email protected]> --------- Signed-off-by: carter.fendley <[email protected]>
1 parent 1c4f676 commit d2c0376

17 files changed

+493
-104
lines changed

backend/src/v2/cmd/driver/main.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,21 @@ var (
6969
// the value stored in the paths will be either 'true' or 'false'
7070
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
7171
conditionPath = flag.String("condition_path", "", "Condition output path")
72+
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
7273
)
7374

7475
// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {
7576

7677
func main() {
7778
flag.Parse()
78-
err := drive()
79+
80+
glog.Infof("Setting log level to: '%s'", *logLevel)
81+
err := flag.Set("v", *logLevel)
82+
if err != nil {
83+
glog.Warningf("Failed to set log level: %s", err.Error())
84+
}
85+
86+
err = drive()
7987
if err != nil {
8088
glog.Exitf("%v", err)
8189
}
@@ -153,15 +161,16 @@ func drive() (err error) {
153161
return err
154162
}
155163
options := driver.Options{
156-
PipelineName: *pipelineName,
157-
RunID: *runID,
158-
RunName: *runName,
159-
RunDisplayName: *runDisplayName,
160-
Namespace: namespace,
161-
Component: componentSpec,
162-
Task: taskSpec,
163-
DAGExecutionID: *dagExecutionID,
164-
IterationIndex: *iterationIndex,
164+
PipelineName: *pipelineName,
165+
RunID: *runID,
166+
RunName: *runName,
167+
RunDisplayName: *runDisplayName,
168+
Namespace: namespace,
169+
Component: componentSpec,
170+
Task: taskSpec,
171+
DAGExecutionID: *dagExecutionID,
172+
IterationIndex: *iterationIndex,
173+
PipelineLogLevel: *logLevel,
165174
}
166175
var execution *driver.Execution
167176
var driverErr error

backend/src/v2/cmd/launcher-v2/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.")
4242
mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.")
4343
mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.")
44+
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
4445
)
4546

4647
func main() {
@@ -54,6 +55,12 @@ func run() error {
5455
flag.Parse()
5556
ctx := context.Background()
5657

58+
glog.Infof("Setting log level to: '%s'", *logLevel)
59+
err := flag.Set("v", *logLevel)
60+
if err != nil {
61+
glog.Warningf("Failed to set log level: %s", err.Error())
62+
}
63+
5764
if *copy != "" {
5865
// copy is used to copy this binary to a shared volume
5966
// this is a special command, ignore all other flags by returning

backend/src/v2/compiler/argocompiler/argo_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ func Test_argo_compiler(t *testing.T) {
7474
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
7575
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
7676
},
77+
{
78+
jobPath: "../testdata/hello_world.json",
79+
platformSpecPath: "",
80+
argoYAMLPath: "testdata/hello_world_log_level.yaml",
81+
envVars: map[string]string{"PIPELINE_LOG_LEVEL": "3"},
82+
},
7783
}
7884
for _, tt := range tests {
7985
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
@@ -122,14 +128,14 @@ func Test_argo_compiler(t *testing.T) {
122128
// mask the driver launcher image hash to maintain test stability
123129
for _, template := range wf.Spec.Templates {
124130
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-driver") {
125-
template.Container.Image = "gcr.io/ml-pipeline/kfp-driver"
131+
template.Container.Image = "ghcr.io/kubeflow/kfp-driver"
126132
}
127133
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-launcher") {
128-
template.Container.Image = "gcr.io/ml-pipeline/kfp-launcher"
134+
template.Container.Image = "ghcr.io/kubeflow/kfp-launcher"
129135
}
130136
for i := range template.InitContainers {
131137
if strings.Contains(template.InitContainers[i].Image, "kfp-launcher") {
132-
template.InitContainers[i].Image = "gcr.io/ml-pipeline/kfp-launcher"
138+
template.InitContainers[i].Image = "ghcr.io/kubeflow/kfp-launcher"
133139
}
134140
}
135141
}

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
DefaultDriverCommand = "driver"
4040
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
4141
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
42+
PipelineLogLevelEnvVar = "PIPELINE_LOG_LEVEL"
4243
gcsScratchLocation = "/gcs"
4344
gcsScratchName = "gcs-scratch"
4445
s3ScratchLocation = "/s3"
@@ -162,6 +163,27 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
162163
if ok {
163164
return name
164165
}
166+
167+
args := []string{
168+
"--type", "CONTAINER",
169+
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
170+
"--run_id", runID(),
171+
"--run_name", runResourceName(),
172+
"--run_display_name", c.job.DisplayName,
173+
"--dag_execution_id", inputValue(paramParentDagID),
174+
"--component", inputValue(paramComponent),
175+
"--task", inputValue(paramTask),
176+
"--container", inputValue(paramContainer),
177+
"--iteration_index", inputValue(paramIterationIndex),
178+
"--cached_decision_path", outputPath(paramCachedDecision),
179+
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
180+
"--condition_path", outputPath(paramCondition),
181+
"--kubernetes_config", inputValue(paramKubernetesConfig),
182+
}
183+
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
184+
args = append(args, "--log_level", value)
185+
}
186+
165187
t := &wfapi.Template{
166188
Name: name,
167189
Inputs: wfapi.Inputs{
@@ -182,24 +204,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
182204
},
183205
},
184206
Container: &k8score.Container{
185-
Image: GetDriverImage(),
186-
Command: GetDriverCommand(),
187-
Args: []string{
188-
"--type", "CONTAINER",
189-
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
190-
"--run_id", runID(),
191-
"--run_name", runResourceName(),
192-
"--run_display_name", c.job.DisplayName,
193-
"--dag_execution_id", inputValue(paramParentDagID),
194-
"--component", inputValue(paramComponent),
195-
"--task", inputValue(paramTask),
196-
"--container", inputValue(paramContainer),
197-
"--iteration_index", inputValue(paramIterationIndex),
198-
"--cached_decision_path", outputPath(paramCachedDecision),
199-
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
200-
"--condition_path", outputPath(paramCondition),
201-
"--kubernetes_config", inputValue(paramKubernetesConfig),
202-
},
207+
Image: c.driverImage,
208+
Command: c.driverCommand,
209+
Args: args,
203210
Resources: driverResources,
204211
},
205212
}
@@ -287,6 +294,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
287294
},
288295
}
289296
c.templates[nameContainerExecutor] = container
297+
298+
args := []string{
299+
"--copy", component.KFPLauncherPath,
300+
}
301+
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
302+
args = append(args, "--log_level", value)
303+
}
290304
executor := &wfapi.Template{
291305
Name: nameContainerImpl,
292306
Inputs: wfapi.Inputs{
@@ -345,8 +359,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
345359
InitContainers: []wfapi.UserContainer{{
346360
Container: k8score.Container{
347361
Name: "kfp-launcher",
348-
Image: GetLauncherImage(),
349-
Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath},
362+
Image: c.launcherImage,
363+
Command: []string{"launcher-v2"},
364+
Args: args,
350365
VolumeMounts: []k8score.VolumeMount{
351366
{
352367
Name: volumeNameKFPLauncher,

backend/src/v2/compiler/argocompiler/dag.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package argocompiler
1515

1616
import (
1717
"fmt"
18+
"os"
1819
"sort"
1920
"strings"
2021

@@ -535,6 +536,26 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
535536
if ok {
536537
return name
537538
}
539+
540+
args := []string{
541+
"--type", inputValue(paramDriverType),
542+
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
543+
"--run_id", runID(),
544+
"--run_name", runResourceName(),
545+
"--run_display_name", c.job.DisplayName,
546+
"--dag_execution_id", inputValue(paramParentDagID),
547+
"--component", inputValue(paramComponent),
548+
"--task", inputValue(paramTask),
549+
"--runtime_config", inputValue(paramRuntimeConfig),
550+
"--iteration_index", inputValue(paramIterationIndex),
551+
"--execution_id_path", outputPath(paramExecutionID),
552+
"--iteration_count_path", outputPath(paramIterationCount),
553+
"--condition_path", outputPath(paramCondition),
554+
}
555+
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
556+
args = append(args, "--log_level", value)
557+
}
558+
538559
t := &wfapi.Template{
539560
Name: name,
540561
Inputs: wfapi.Inputs{
@@ -555,23 +576,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
555576
},
556577
},
557578
Container: &k8score.Container{
558-
Image: c.driverImage,
559-
Command: c.driverCommand,
560-
Args: []string{
561-
"--type", inputValue(paramDriverType),
562-
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
563-
"--run_id", runID(),
564-
"--run_name", runResourceName(),
565-
"--run_display_name", c.job.DisplayName,
566-
"--dag_execution_id", inputValue(paramParentDagID),
567-
"--component", inputValue(paramComponent),
568-
"--task", inputValue(paramTask),
569-
"--runtime_config", inputValue(paramRuntimeConfig),
570-
"--iteration_index", inputValue(paramIterationIndex),
571-
"--execution_id_path", outputPath(paramExecutionID),
572-
"--iteration_count_path", outputPath(paramIterationCount),
573-
"--condition_path", outputPath(paramCondition),
574-
},
579+
Image: c.driverImage,
580+
Command: c.driverCommand,
581+
Args: args,
575582
Resources: driverResources,
576583
},
577584
}

backend/src/v2/compiler/argocompiler/importer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package argocompiler
1616

1717
import (
1818
"fmt"
19+
"os"
1920

2021
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2122
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
@@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
6465
if _, alreadyExists := c.templates[name]; alreadyExists {
6566
return name
6667
}
67-
launcherArgs := []string{
68+
args := []string{
6869
"--executor_type", "importer",
6970
"--task_spec", inputValue(paramTask),
7071
"--component_spec", inputValue(paramComponent),
@@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string {
8182
"--mlmd_server_port",
8283
fmt.Sprintf("$(%s)", component.EnvMetadataPort),
8384
}
85+
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
86+
args = append(args, "--log_level", value)
87+
}
8488
importerTemplate := &wfapi.Template{
8589
Name: name,
8690
Inputs: wfapi.Inputs{
@@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
9498
Container: &k8score.Container{
9599
Image: c.launcherImage,
96100
Command: []string{"launcher-v2"},
97-
Args: launcherArgs,
101+
Args: args,
98102
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
99103
Env: commonEnvs,
100104
Resources: driverResources,

backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ spec:
7171
- '{{inputs.parameters.kubernetes-config}}'
7272
command:
7373
- driver
74-
image: gcr.io/ml-pipeline/kfp-driver
74+
image: ghcr.io/kubeflow/kfp-driver
7575
name: ""
7676
resources:
7777
limits:
@@ -159,11 +159,12 @@ spec:
159159
- mountPath: /.config
160160
name: dot-config-scratch
161161
initContainers:
162-
- command:
163-
- launcher-v2
162+
- args:
164163
- --copy
165164
- /kfp-launcher/launch
166-
image: gcr.io/ml-pipeline/kfp-launcher
165+
command:
166+
- launcher-v2
167+
image: ghcr.io/kubeflow/kfp-launcher
167168
name: kfp-launcher
168169
resources:
169170
limits:
@@ -309,7 +310,7 @@ spec:
309310
- '{{outputs.parameters.condition.path}}'
310311
command:
311312
- driver
312-
image: gcr.io/ml-pipeline/kfp-driver
313+
image: ghcr.io/kubeflow/kfp-driver
313314
name: ""
314315
resources:
315316
limits:

backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ spec:
5959
- '{{inputs.parameters.kubernetes-config}}'
6060
command:
6161
- driver
62-
image: gcr.io/ml-pipeline/kfp-driver
62+
image: ghcr.io/kubeflow/kfp-driver
6363
name: ""
6464
resources:
6565
limits:
@@ -147,11 +147,12 @@ spec:
147147
- mountPath: /.config
148148
name: dot-config-scratch
149149
initContainers:
150-
- command:
151-
- launcher-v2
150+
- args:
152151
- --copy
153152
- /kfp-launcher/launch
154-
image: gcr.io/ml-pipeline/kfp-launcher
153+
command:
154+
- launcher-v2
155+
image: ghcr.io/kubeflow/kfp-launcher
155156
name: kfp-launcher
156157
resources:
157158
limits:
@@ -252,7 +253,7 @@ spec:
252253
- '{{outputs.parameters.condition.path}}'
253254
command:
254255
- driver
255-
image: gcr.io/ml-pipeline/kfp-driver
256+
image: ghcr.io/kubeflow/kfp-driver
256257
name: ""
257258
resources:
258259
limits:

backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ spec:
7676
- '{{inputs.parameters.kubernetes-config}}'
7777
command:
7878
- driver
79-
image: gcr.io/ml-pipeline/kfp-driver
79+
image: ghcr.io/kubeflow/kfp-driver
8080
name: ""
8181
resources:
8282
limits:
@@ -164,11 +164,12 @@ spec:
164164
- mountPath: /.config
165165
name: dot-config-scratch
166166
initContainers:
167-
- command:
168-
- launcher-v2
167+
- args:
169168
- --copy
170169
- /kfp-launcher/launch
171-
image: gcr.io/ml-pipeline/kfp-launcher
170+
command:
171+
- launcher-v2
172+
image: ghcr.io/kubeflow/kfp-launcher
172173
name: kfp-launcher
173174
resources:
174175
limits:
@@ -315,7 +316,7 @@ spec:
315316
- '{{outputs.parameters.condition.path}}'
316317
command:
317318
- driver
318-
image: gcr.io/ml-pipeline/kfp-driver
319+
image: ghcr.io/kubeflow/kfp-driver
319320
name: ""
320321
resources:
321322
limits:

0 commit comments

Comments
 (0)