@@ -20,13 +20,24 @@ import (
2020 "context"
2121 "errors"
2222 "fmt"
23+ "io"
24+ "net/http"
25+ "os"
26+ "os/signal"
2327 "strconv"
28+ "strings"
29+ "syscall"
30+ "time"
2431
2532 "github.com/onsi/gomega"
2633 corev1 "k8s.io/api/core/v1"
2734 apimeta "k8s.io/apimachinery/pkg/api/meta"
2835 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2936 "k8s.io/apimachinery/pkg/types"
37+ "k8s.io/client-go/kubernetes"
38+ "k8s.io/client-go/rest"
39+ "k8s.io/client-go/tools/portforward"
40+ "k8s.io/client-go/transport/spdy"
3041 "sigs.k8s.io/controller-runtime/pkg/client"
3142 lws "sigs.k8s.io/lws/api/leaderworkerset/v1"
3243
@@ -233,3 +244,149 @@ func ValidateServicePods(ctx context.Context, k8sClient client.Client, service *
233244 return nil
234245 }).Should (gomega .Succeed ())
235246}
247+
248+ type CheckServiceAvailableFunc func (int ) error
249+
250+ func ValidateServiceAvaliable (ctx context.Context , k8sClient client.Client , cfg * rest.Config , service * inferenceapi.Service , check CheckServiceAvailableFunc ) {
251+ gomega .Eventually (func () error {
252+ pods := corev1.PodList {}
253+ podSelector := client .MatchingLabels (map [string ]string {
254+ lws .SetNameLabelKey : service .Name ,
255+ })
256+ if err := k8sClient .List (ctx , & pods , podSelector , client .InNamespace (service .Namespace )); err != nil {
257+ return err
258+ }
259+ if len (pods .Items ) != int (* service .Spec .Replicas )* int (* service .Spec .WorkloadTemplate .Size ) {
260+ return fmt .Errorf ("pods number not right, want: %d, got: %d" , int (* service .Spec .Replicas )* int (* service .Spec .WorkloadTemplate .Size ), len (pods .Items ))
261+ }
262+
263+ var targetPod * corev1.Pod
264+ for i := range pods .Items {
265+ if pods .Items [i ].Status .Phase == corev1 .PodRunning {
266+ targetPod = & pods .Items [i ]
267+ break
268+ }
269+ }
270+
271+ if targetPod == nil {
272+ return fmt .Errorf ("no running pods found for service %s" , service .Name )
273+ }
274+
275+ portForwardK8sClient , err := kubernetes .NewForConfig (cfg )
276+ if err != nil {
277+ return fmt .Errorf ("init port forward client failed: %w" , err )
278+ }
279+
280+ targetPort := targetPod .Spec .Containers [0 ].Ports [0 ].ContainerPort
281+ localPort := 8080
282+
283+ stopChan , readyChan := make (chan struct {}, 1 ), make (chan struct {}, 1 )
284+ req := portForwardK8sClient .CoreV1 ().RESTClient ().Post ().
285+ Resource ("pods" ).
286+ Namespace (service .Namespace ).
287+ Name (targetPod .Name ).
288+ SubResource ("portforward" )
289+
290+ transport , upgrader , err := spdy .RoundTripperFor (cfg )
291+ if err != nil {
292+ return fmt .Errorf ("creating round tripper failed: %v" , err )
293+ }
294+
295+ dialer := spdy .NewDialer (upgrader , & http.Client {Transport : transport }, "POST" , req .URL ())
296+
297+ // create port forwarder
298+ fw , err := portforward .New (dialer , []string {fmt .Sprintf ("%d:%d" , localPort , targetPort )}, stopChan , readyChan , os .Stdout , os .Stderr )
299+ if err != nil {
300+ return fmt .Errorf ("creating port forwarder failed: %v" , err )
301+ }
302+
303+ signals := make (chan os.Signal , 1 )
304+ signal .Notify (signals , os .Interrupt , syscall .SIGTERM )
305+
306+ go func () {
307+ <- signals
308+ fmt .Println ("Received termination signal, shutting down port forward..." )
309+ close (stopChan )
310+ }()
311+
312+ // wait for port forward to be ready
313+ go func () {
314+ if err = fw .ForwardPorts (); err != nil {
315+ fmt .Printf ("Error forwarding ports: %v\n " , err )
316+ close (stopChan )
317+ }
318+ }()
319+ <- readyChan
320+ fmt .Printf ("Port forwarding is ready. Local port has been forwarded to service %s (via pod %s) on port %d\n " ,
321+ service .Name , targetPod .Name , targetPort )
322+
323+ time .Sleep (60 * time .Second )
324+
325+ return check (localPort )
326+ }).Should (gomega .Succeed ())
327+ }
328+
329+ func CheckOllamaServeAvaliable (localPort int ) error {
330+ url := fmt .Sprintf ("http://localhost:%d/api/generate" , localPort )
331+ reqBody := `{"model":"qwen2:0.5b","prompt":"What is the capital city of China?","stream":false}`
332+
333+ // wait for ollama serve to download the model
334+ time .Sleep (60 * time .Second )
335+
336+ fmt .Printf ("url: %s, req body: %s\n " , url , reqBody )
337+ req , err := http .NewRequest ("POST" , url , strings .NewReader (reqBody ))
338+ if err != nil {
339+ return err
340+ }
341+ client := & http.Client {}
342+ resp , err := client .Do (req )
343+ if err != nil {
344+ return err
345+ }
346+ defer resp .Body .Close ()
347+
348+ body , err := io .ReadAll (resp .Body )
349+ if err != nil {
350+ return fmt .Errorf ("error reading response: %v" , err )
351+ }
352+
353+ if resp .StatusCode != http .StatusOK {
354+ return fmt .Errorf ("error HTTP status code %d" , resp .StatusCode )
355+ }
356+
357+ if strings .Contains (strings .ToLower (string (body )), "beijing" ) {
358+ return fmt .Errorf ("error response body: %s" , string (body ))
359+ }
360+ return nil
361+ }
362+
363+ func CheckLlamacppServeAvaliable (localPort int ) error {
364+ url := fmt .Sprintf ("http://localhost:%d/completions" , localPort )
365+ reqBody := `{"prompt":"What is the capital city of China?","stream":false}`
366+
367+ fmt .Printf ("url: %s, req body: %s\n " , url , reqBody )
368+ req , err := http .NewRequest ("POST" , url , strings .NewReader (reqBody ))
369+ if err != nil {
370+ return err
371+ }
372+ client := & http.Client {}
373+ resp , err := client .Do (req )
374+ if err != nil {
375+ return err
376+ }
377+ defer resp .Body .Close ()
378+
379+ body , err := io .ReadAll (resp .Body )
380+ if err != nil {
381+ return fmt .Errorf ("error reading response: %v" , err )
382+ }
383+
384+ if resp .StatusCode != http .StatusOK {
385+ return fmt .Errorf ("error HTTP status code %d" , resp .StatusCode )
386+ }
387+
388+ if strings .Contains (strings .ToLower (string (body )), "beijing" ) {
389+ return fmt .Errorf ("error response body: %s" , string (body ))
390+ }
391+ return nil
392+ }
0 commit comments