-
-
Notifications
You must be signed in to change notification settings - Fork 44
feat: add e2e test to verify service is avaliable #310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,13 +20,23 @@ import ( | |
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "os" | ||
| "os/signal" | ||
| "strconv" | ||
| "strings" | ||
| "syscall" | ||
|
|
||
| "github.com/onsi/gomega" | ||
| corev1 "k8s.io/api/core/v1" | ||
| apimeta "k8s.io/apimachinery/pkg/api/meta" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/rest" | ||
| "k8s.io/client-go/tools/portforward" | ||
| "k8s.io/client-go/transport/spdy" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| lws "sigs.k8s.io/lws/api/leaderworkerset/v1" | ||
|
|
||
|
|
@@ -233,3 +243,112 @@ func ValidateServicePods(ctx context.Context, k8sClient client.Client, service * | |
| return nil | ||
| }).Should(gomega.Succeed()) | ||
| } | ||
|
|
||
| type CheckServiceAvailableFunc func() error | ||
|
|
||
| func ValidateServiceAvaliable(ctx context.Context, k8sClient client.Client, cfg *rest.Config, service *inferenceapi.Service, check CheckServiceAvailableFunc) { | ||
| gomega.Eventually(func() error { | ||
| pods := corev1.PodList{} | ||
| podSelector := client.MatchingLabels(map[string]string{ | ||
| lws.SetNameLabelKey: service.Name, | ||
| }) | ||
| if err := k8sClient.List(ctx, &pods, podSelector, client.InNamespace(service.Namespace)); err != nil { | ||
| return err | ||
| } | ||
| if len(pods.Items) != int(*service.Spec.Replicas)*int(*service.Spec.WorkloadTemplate.Size) { | ||
| return fmt.Errorf("pods number not right, want: %d, got: %d", int(*service.Spec.Replicas)*int(*service.Spec.WorkloadTemplate.Size), len(pods.Items)) | ||
| } | ||
|
|
||
| var targetPod *corev1.Pod | ||
| for i := range pods.Items { | ||
| if pods.Items[i].Status.Phase == corev1.PodRunning { | ||
| targetPod = &pods.Items[i] | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if targetPod == nil { | ||
| return fmt.Errorf("no running pods found for service %s", service.Name) | ||
| } | ||
|
|
||
| portForwardK8sClient, err := kubernetes.NewForConfig(cfg) | ||
| if err != nil { | ||
| return fmt.Errorf("init port forward client failed: %w", err) | ||
| } | ||
|
|
||
| targetPort := targetPod.Spec.Containers[0].Ports[0].ContainerPort | ||
|
|
||
| stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) | ||
| req := portForwardK8sClient.CoreV1().RESTClient().Post(). | ||
| Resource("pods"). | ||
| Namespace(service.Namespace). | ||
| Name(targetPod.Name). | ||
| SubResource("portforward") | ||
|
|
||
| transport, upgrader, err := spdy.RoundTripperFor(cfg) | ||
| if err != nil { | ||
| return fmt.Errorf("creating round tripper failed: %v", err) | ||
| } | ||
|
|
||
| dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) | ||
|
|
||
| // create port forwarder | ||
| fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", modelSource.DEFAULT_BACKEND_PORT, targetPort)}, stopChan, readyChan, os.Stdout, os.Stderr) | ||
| if err != nil { | ||
| return fmt.Errorf("creating port forwarder failed: %v", err) | ||
| } | ||
| // stop port forward when done | ||
| defer fw.Close() | ||
|
|
||
| signals := make(chan os.Signal, 1) | ||
| signal.Notify(signals, os.Interrupt, syscall.SIGTERM) | ||
|
|
||
| go func() { | ||
| <-signals | ||
| fmt.Println("Received termination signal, shutting down port forward...") | ||
| close(stopChan) | ||
| }() | ||
|
|
||
| // wait for port forward to be ready | ||
| go func() { | ||
| if err = fw.ForwardPorts(); err != nil { | ||
| fmt.Printf("Error forwarding ports: %v\n", err) | ||
| close(stopChan) | ||
| } | ||
| }() | ||
| <-readyChan | ||
| return check() | ||
| }).Should(gomega.Succeed()) | ||
|
||
| } | ||
|
|
||
| func CheckServiceAvaliable() error { | ||
| url := fmt.Sprintf("http://localhost:%d/completions", modelSource.DEFAULT_BACKEND_PORT) | ||
| reqBody := `{"prompt":"What is the capital city of China?","stream":false}` | ||
|
|
||
| req, err := http.NewRequest("POST", url, strings.NewReader(reqBody)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| client := &http.Client{} | ||
| resp, err := client.Do(req) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer func() { | ||
| _ = resp.Body.Close() | ||
| }() | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| return fmt.Errorf("error HTTP status code %d", resp.StatusCode) | ||
| } | ||
|
|
||
| body, err := io.ReadAll(resp.Body) | ||
| if err != nil { | ||
| return fmt.Errorf("error reading response: %v", err) | ||
| } | ||
|
|
||
| if !strings.Contains(strings.ToLower(string(body)), "beijing") { | ||
| return fmt.Errorf("error response body: %s", string(body)) | ||
| } | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.