Skip to content

Commit b77744e

Browse files
committed
test(backend): Add unit test for NewLauncherV2
Signed-off-by: John Thompson <[email protected]>
1 parent 73b58e4 commit b77744e

File tree

5 files changed

+275
-44
lines changed

5 files changed

+275
-44
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package client_manager
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
7+
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
8+
"k8s.io/client-go/kubernetes"
9+
"k8s.io/client-go/rest"
10+
)
11+
12+
type ClientManagerInterface interface {
13+
K8sClient() kubernetes.Interface
14+
MetadataClient() metadata.ClientInterface
15+
CacheClient() cacheutils.Client
16+
}
17+
18+
// Ensure ClientManager implements ClientManagerInterface
19+
var _ ClientManagerInterface = (*ClientManager)(nil)
20+
21+
// ClientManager is a container for various service clients.
22+
type ClientManager struct {
23+
k8sClient kubernetes.Interface
24+
metadataClient metadata.ClientInterface
25+
cacheClient cacheutils.Client
26+
}
27+
28+
type Options struct {
29+
MLMDServerAddress string
30+
MLMDServerPort string
31+
CacheDisabled bool
32+
}
33+
34+
// NewClientManager creates and Init a new instance of ClientManager.
35+
func NewClientManager(options *Options) (*ClientManager, error) {
36+
clientManager := &ClientManager{}
37+
err := clientManager.init(options)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
return clientManager, nil
43+
}
44+
45+
func (cm *ClientManager) K8sClient() kubernetes.Interface {
46+
return cm.k8sClient
47+
}
48+
49+
func (cm *ClientManager) MetadataClient() metadata.ClientInterface {
50+
return cm.metadataClient
51+
}
52+
53+
func (cm *ClientManager) CacheClient() cacheutils.Client {
54+
return cm.cacheClient
55+
}
56+
57+
func (cm *ClientManager) init(opts *Options) error {
58+
k8sClient, err := initK8sClient()
59+
if err != nil {
60+
return err
61+
}
62+
metadataClient, err := initMetadataClient(opts.MLMDServerAddress, opts.MLMDServerPort)
63+
if err != nil {
64+
return err
65+
}
66+
cacheClient, err := initCacheClient(opts.CacheDisabled)
67+
if err != nil {
68+
return err
69+
}
70+
cm.k8sClient = k8sClient
71+
cm.metadataClient = metadataClient
72+
cm.cacheClient = cacheClient
73+
return nil
74+
}
75+
76+
func initK8sClient() (kubernetes.Interface, error) {
77+
restConfig, err := rest.InClusterConfig()
78+
if err != nil {
79+
return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err)
80+
}
81+
k8sClient, err := kubernetes.NewForConfig(restConfig)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err)
84+
}
85+
return k8sClient, nil
86+
}
87+
88+
func initMetadataClient(address string, port string) (metadata.ClientInterface, error) {
89+
return metadata.NewClient(address, port)
90+
}
91+
92+
func initCacheClient(cacheDisabled bool) (cacheutils.Client, error) {
93+
return cacheutils.NewClient(cacheDisabled)
94+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package client_manager
2+
3+
import (
4+
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
5+
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
6+
"k8s.io/client-go/kubernetes"
7+
)
8+
9+
type FakeClientManager struct {
10+
k8sClient kubernetes.Interface
11+
metadataClient metadata.ClientInterface
12+
cacheClient cacheutils.Client
13+
}
14+
15+
// Ensure FakeClientManager implements ClientManagerInterface
16+
var _ ClientManagerInterface = (*FakeClientManager)(nil)
17+
18+
func (f *FakeClientManager) K8sClient() kubernetes.Interface {
19+
return f.k8sClient
20+
}
21+
22+
func (f *FakeClientManager) MetadataClient() metadata.ClientInterface {
23+
return f.metadataClient
24+
}
25+
26+
func (f *FakeClientManager) CacheClient() cacheutils.Client {
27+
return f.cacheClient
28+
}
29+
30+
func NewFakeClientManager(k8sClient kubernetes.Interface, metadataClient metadata.ClientInterface, cacheClient cacheutils.Client) *FakeClientManager {
31+
return &FakeClientManager{
32+
k8sClient: k8sClient,
33+
metadataClient: metadataClient,
34+
cacheClient: cacheClient,
35+
}
36+
}

backend/src/v2/cmd/launcher-v2/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222

2323
"github.com/golang/glog"
24+
"github.com/kubeflow/pipelines/backend/src/v2/client_manager"
2425
"github.com/kubeflow/pipelines/backend/src/v2/component"
2526
"github.com/kubeflow/pipelines/backend/src/v2/config"
2627
)
@@ -101,7 +102,16 @@ func run() error {
101102
}
102103
return nil
103104
case "container":
104-
launcher, err := component.NewLauncherV2(ctx, *executionID, *executorInputJSON, *componentSpecJSON, flag.Args(), launcherV2Opts)
105+
clientOptions := &client_manager.Options{
106+
MLMDServerAddress: launcherV2Opts.MLMDServerAddress,
107+
MLMDServerPort: launcherV2Opts.MLMDServerPort,
108+
CacheDisabled: launcherV2Opts.CacheDisabled,
109+
}
110+
clientManager, err := client_manager.NewClientManager(clientOptions)
111+
if err != nil {
112+
return err
113+
}
114+
launcher, err := component.NewLauncherV2(ctx, *executionID, *executorInputJSON, *componentSpecJSON, flag.Args(), launcherV2Opts, clientManager)
105115
if err != nil {
106116
return err
107117
}

backend/src/v2/component/launcher_v2.go

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/golang/protobuf/proto"
3232
"github.com/golang/protobuf/ptypes/timestamp"
3333
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
34-
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
34+
"github.com/kubeflow/pipelines/backend/src/v2/client_manager"
3535

3636
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
3737
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
@@ -41,7 +41,6 @@ import (
4141
"google.golang.org/protobuf/encoding/protojson"
4242
"google.golang.org/protobuf/types/known/structpb"
4343
"k8s.io/client-go/kubernetes"
44-
"k8s.io/client-go/rest"
4544
)
4645

4746
type LauncherV2Options struct {
@@ -63,11 +62,7 @@ type LauncherV2 struct {
6362
command string
6463
args []string
6564
options LauncherV2Options
66-
67-
// clients
68-
metadataClient metadata.ClientInterface
69-
k8sClient kubernetes.Interface
70-
cacheClient cacheutils.Client
65+
clientManager client_manager.ClientManagerInterface
7166
}
7267

7368
// Client is the struct to hold the Kubernetes Clientset
@@ -76,7 +71,15 @@ type kubernetesClient struct {
7671
}
7772

7873
// NewLauncherV2 is a factory function that returns an instance of LauncherV2.
79-
func NewLauncherV2(ctx context.Context, executionID int64, executorInputJSON, componentSpecJSON string, cmdArgs []string, opts *LauncherV2Options) (l *LauncherV2, err error) {
74+
func NewLauncherV2(
75+
ctx context.Context,
76+
executionID int64,
77+
executorInputJSON,
78+
componentSpecJSON string,
79+
cmdArgs []string,
80+
opts *LauncherV2Options,
81+
clientManager client_manager.ClientManagerInterface,
82+
) (l *LauncherV2, err error) {
8083
defer func() {
8184
if err != nil {
8285
err = fmt.Errorf("failed to create component launcher v2: %w", err)
@@ -102,32 +105,14 @@ func NewLauncherV2(ctx context.Context, executionID int64, executorInputJSON, co
102105
if err != nil {
103106
return nil, err
104107
}
105-
restConfig, err := rest.InClusterConfig()
106-
if err != nil {
107-
return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err)
108-
}
109-
k8sClient, err := kubernetes.NewForConfig(restConfig)
110-
if err != nil {
111-
return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err)
112-
}
113-
metadataClient, err := metadata.NewClient(opts.MLMDServerAddress, opts.MLMDServerPort)
114-
if err != nil {
115-
return nil, err
116-
}
117-
cacheClient, err := cacheutils.NewClient(opts.CacheDisabled)
118-
if err != nil {
119-
return nil, err
120-
}
121108
return &LauncherV2{
122-
executionID: executionID,
123-
executorInput: executorInput,
124-
component: component,
125-
command: cmdArgs[0],
126-
args: cmdArgs[1:],
127-
options: *opts,
128-
metadataClient: metadataClient,
129-
k8sClient: k8sClient,
130-
cacheClient: cacheClient,
109+
executionID: executionID,
110+
executorInput: executorInput,
111+
component: component,
112+
command: cmdArgs[0],
113+
args: cmdArgs[1:],
114+
options: *opts,
115+
clientManager: clientManager,
131116
}, nil
132117
}
133118

@@ -195,12 +180,12 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
195180
glog.Infof("publish success.")
196181
// At the end of the current task, we check the statuses of all tasks in
197182
// the current DAG and update the DAG's status accordingly.
198-
dag, err := l.metadataClient.GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue())
183+
dag, err := l.clientManager.MetadataClient().GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue())
199184
if err != nil {
200185
glog.Errorf("DAG Status Update: failed to get DAG: %s", err.Error())
201186
}
202-
pipeline, _ := l.metadataClient.GetPipelineFromExecution(ctx, execution.GetID())
203-
err = l.metadataClient.UpdateDAGExecutionsState(ctx, dag, pipeline)
187+
pipeline, _ := l.clientManager.MetadataClient().GetPipelineFromExecution(ctx, execution.GetID())
188+
err = l.clientManager.MetadataClient().UpdateDAGExecutionsState(ctx, dag, pipeline)
204189
if err != nil {
205190
glog.Errorf("failed to update DAG state: %s", err.Error())
206191
}
@@ -220,7 +205,7 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
220205
if err != nil {
221206
return err
222207
}
223-
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.options.Namespace, bucketConfig)
208+
bucket, err := objectstore.OpenBucket(ctx, l.clientManager.K8sClient(), l.options.Namespace, bucketConfig)
224209
if err != nil {
225210
return err
226211
}
@@ -235,9 +220,9 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
235220
l.args,
236221
bucket,
237222
bucketConfig,
238-
l.metadataClient,
223+
l.clientManager.MetadataClient(),
239224
l.options.Namespace,
240-
l.k8sClient,
225+
l.clientManager.K8sClient(),
241226
l.options.PublishLogs,
242227
)
243228
if err != nil {
@@ -260,7 +245,7 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
260245
FinishedAt: &timestamp.Timestamp{Seconds: time.Now().Unix()},
261246
Fingerprint: fingerPrint,
262247
}
263-
return l.cacheClient.CreateExecutionCache(ctx, task)
248+
return l.clientManager.CacheClient().CreateExecutionCache(ctx, task)
264249
}
265250

266251
return nil
@@ -306,7 +291,7 @@ func (l *LauncherV2) prePublish(ctx context.Context) (execution *metadata.Execut
306291
err = fmt.Errorf("failed to pre-publish Pod info to ML Metadata: %w", err)
307292
}
308293
}()
309-
execution, err = l.metadataClient.GetExecution(ctx, l.executionID)
294+
execution, err = l.clientManager.MetadataClient().GetExecution(ctx, l.executionID)
310295
if err != nil {
311296
return nil, err
312297
}
@@ -315,7 +300,7 @@ func (l *LauncherV2) prePublish(ctx context.Context) (execution *metadata.Execut
315300
PodUID: l.options.PodUID,
316301
Namespace: l.options.Namespace,
317302
}
318-
return l.metadataClient.PrePublishExecution(ctx, execution, ecfg)
303+
return l.clientManager.MetadataClient().PrePublishExecution(ctx, execution, ecfg)
319304
}
320305

321306
// TODO(Bobgy): consider passing output artifacts info from executor output.
@@ -336,7 +321,7 @@ func (l *LauncherV2) publish(
336321
// TODO(Bobgy): when adding artifacts, we will need execution.pipeline to be non-nil, because we need
337322
// to publish output artifacts to the context too.
338323
// return l.metadataClient.PublishExecution(ctx, execution, outputParameters, outputArtifacts, pb.Execution_COMPLETE)
339-
return l.metadataClient.PublishExecution(ctx, execution, outputParameters, outputArtifacts, status)
324+
return l.clientManager.MetadataClient().PublishExecution(ctx, execution, outputParameters, outputArtifacts, status)
340325
}
341326

342327
// executeV2 handles placeholder substitution for inputs, calls execute to

0 commit comments

Comments
 (0)