Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
97c0778
Update lazy-log package
ariefrahmansyah Jul 22, 2021
48fbb8c
Refactor stream logs API
ariefrahmansyah Jul 26, 2021
629febc
Introducing ContainerLogsView component
ariefrahmansyah Jul 26, 2021
13c54df
Support logging for pyfunc image builder and batch job
ariefrahmansyah Jul 27, 2021
613e597
Fix batch job's image builder log. Support prefixing log with pod & c…
ariefrahmansyah Jul 27, 2021
9997ef0
Add batch job executor log
ariefrahmansyah Jul 27, 2021
8d15c14
Dockerfile: Add git so we Yarn installation can succeed
ariefrahmansyah Jul 27, 2021
e1ca2bd
Use node:14 as node-builder base image
ariefrahmansyah Jul 27, 2021
a5be6c0
Colorized the pod + container in log
ariefrahmansyah Jul 27, 2021
62e4fa9
Use actions/setup-node@v2 and node v14
ariefrahmansyah Jul 27, 2021
0c57d95
Update react-lazylog package to use gojekfarm to sovle yarn install i…
ariefrahmansyah Jul 27, 2021
cab8007
We still need react-lazylog's prepare script.
ariefrahmansyah Jul 27, 2021
6958e9b
Refactor stackdriver log
ariefrahmansyah Jul 27, 2021
9252127
Fix API's unit test first
ariefrahmansyah Jul 27, 2021
7aadad4
Add more test to cluster and log_service
ariefrahmansyah Jul 27, 2021
0ce58c3
Fix UI wording
ariefrahmansyah Jul 27, 2021
d2eb491
Update swagger
ariefrahmansyah Jul 27, 2021
ee32d61
Make sure color lib turned on
ariefrahmansyah Jul 27, 2021
cd0bc4f
Add build-essential and etc isntallation on Mlflows' Dockerfile
ariefrahmansyah Jul 28, 2021
b5647ca
Update API test
ariefrahmansyah Jul 28, 2021
3d154c7
Use gojekfarm's react-lazylog fork
ariefrahmansyah Jul 28, 2021
55f9de8
Update how to close channel; getContainerLogs async
ariefrahmansyah Jul 29, 2021
5d49916
Use request.Context() for termintation
ariefrahmansyah Jul 29, 2021
93d92a7
Fix API test
ariefrahmansyah Jul 29, 2021
25f3344
Modularize pprof routes into a spearate function
ariefrahmansyah Jul 29, 2021
7b9940b
Address aria's review
ariefrahmansyah Jul 29, 2021
2cf726e
Use unbuffered channel for sending log line
ariefrahmansyah Jul 29, 2021
f937d87
Periodically update component list and address reviews
ariefrahmansyah Jul 29, 2021
2f30d2c
Simplify component refresh & log api context cancellation
ariefrahmansyah Jul 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
steps:
- name: Checkout to the target branch
uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Lint UI files
Expand Down Expand Up @@ -64,9 +64,9 @@ jobs:
steps:
- name: Checkout to the target branch
uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Test UI files
Expand All @@ -86,9 +86,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Build UI static files
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ RUN go build -o bin/merlin_api ./cmd/api
# ============================================================
# Build stage 2: Build UI
# ============================================================
FROM node:14-alpine as node-builder
FROM node:14 as node-builder
WORKDIR /src/ui
COPY ui .
RUN yarn
RUN yarn install --network-concurrency 1
RUN yarn run build

# ============================================================
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ all: setup init-dep lint test clean build run
.PHONY: setup
setup:
@echo "> Setting up tools ..."
@test -x ${GOPATH}/bin/goimports || go get -u golang.org/x/tools/cmd/goimports
@test -x ${GOPATH}/bin/golint || go get -u golang.org/x/lint/golint
@test -x ${GOPATH}/bin/gotest || go get -u github.com/rakyll/gotest

Expand All @@ -24,7 +25,7 @@ init-dep: init-dep-ui init-dep-api
.PHONY: init-dep-ui
init-dep-ui:
@echo "> Initializing UI dependencies ..."
@cd ${UI_PATH} && yarn
@cd ${UI_PATH} && yarn install --network-concurrency 1

.PHONY: init-dep-api
init-dep-api:
Expand Down
60 changes: 32 additions & 28 deletions api/api/log_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package api

import (
"bufio"
"fmt"
"net/http"

Expand All @@ -34,6 +33,13 @@ type LogController struct {

// ReadLog parses log requests and fetches logs.
func (l *LogController) ReadLog(w http.ResponseWriter, r *http.Request) {
// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
InternalServerError("Streaming unsupported!").WriteTo(w)
return
}

var query service.LogQuery
err := decoder.Decode(&query, r.URL.Query())
if err != nil {
Expand All @@ -42,38 +48,36 @@ func (l *LogController) ReadLog(w http.ResponseWriter, r *http.Request) {
return
}

res, err := l.LogService.ReadLog(&query)
if err != nil {
log.Errorf("Error while retrieving log %v", err)
InternalServerError(fmt.Sprintf("Error while retrieving log for container %s: %s", query.Name, err)).WriteTo(w)
return
}
logLineCh := make(chan string, 1000)
stopCh := make(chan struct{})

// send status code and content-type
w.Header().Set("Content-Type", "plain/text; charset=UTF-8")
w.WriteHeader(http.StatusOK)
// Listen to the closing of the http connection via the CloseNotifier
notify := r.Context().Done()
go func() {
<-notify
close(stopCh)
}()

// stream the response body
defer res.Close()
buff := bufio.NewReader(res)
for {
line, readErr := buff.ReadString('\n')
_, writeErr := w.Write([]byte(line))
if writeErr != nil {
// connection from caller is closed
return
}
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")

go func() {
for {
logLine := <-logLineCh

fmt.Fprint(w, logLine)

// send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher, ok := w.(http.Flusher)
if ok {
// Send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher.Flush()
}
}()

if readErr != nil {
// unable to read log from container anymore most likely EOF
return
}
if err := l.LogService.StreamLogs(logLineCh, stopCh, &query); err != nil {
InternalServerError(err.Error()).WriteTo(w)
return
}
}
4 changes: 1 addition & 3 deletions api/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ func NewRouter(appCtx AppContext) *mux.Router {
}

rawRoutes := []RawRoutes{
{
http.MethodGet, "/logs", http.HandlerFunc(logController.ReadLog), "ReadLogs",
},
{http.MethodGet, "/logs", http.HandlerFunc(logController.ReadLog), "ReadLogs"},
}

var authzMiddleware *middleware.Authorizer
Expand Down
28 changes: 14 additions & 14 deletions api/cluster/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ func (cf *containerFetcher) GetContainers(namespace string, labelSelector string
containers := make([]*models.Container, 0)
for _, pod := range podList.Items {
for _, c := range pod.Spec.Containers {
container := &models.Container{
Name: c.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}
container := models.NewContainer(
c.Name,
pod.Name,
pod.Namespace,
cf.metadata.ClusterName,
cf.metadata.GcpProject,
)

containers = append(containers, container)
}

for _, ic := range pod.Spec.InitContainers {
container := &models.Container{
Name: ic.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}
container := models.NewContainer(
ic.Name,
pod.Name,
pod.Namespace,
cf.metadata.ClusterName,
cf.metadata.GcpProject,
)

containers = append(containers, container)
}
Expand Down
14 changes: 14 additions & 0 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package cluster

import (
"io"
"time"

kfsv1alpha2 "github.com/kubeflow/kfserving/pkg/apis/serving/v1alpha2"
kfservice "github.com/kubeflow/kfserving/pkg/client/clientset/versioned/typed/serving/v1alpha2"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -35,6 +37,10 @@ import (
type Controller interface {
Deploy(modelService *models.Service) (*models.Service, error)
Delete(modelService *models.Service) (*models.Service, error)

ListPods(namespace, labelSelector string) (*v1.PodList, error)
StreamPodLogs(namespace, podName string, opts *v1.PodLogOptions) (io.ReadCloser, error)

ContainerFetcher
}

Expand Down Expand Up @@ -219,3 +225,11 @@ func (k *controller) waitInferenceServiceReady(service *kfsv1alpha2.InferenceSer
}
}
}

func (c *controller) ListPods(namespace, labelSelector string) (*v1.PodList, error) {
return c.clusterClient.Pods(namespace).List(metav1.ListOptions{LabelSelector: labelSelector})
}

func (c *controller) StreamPodLogs(namespace, podName string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
return c.clusterClient.Pods(namespace).GetLogs(podName, opts).Stream()
}
58 changes: 58 additions & 0 deletions api/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,61 @@ func isIn(container v1.Container, containers []*models.Container, podName string
}
return false
}

func Test_controller_ListPods(t *testing.T) {
namespace := "test-namespace"

v1Client := fake.NewSimpleClientset()
v1Client.PrependReactor(listMethod, podResource, func(action ktesting.Action) (bool, runtime.Object, error) {
return true, &v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-model-1-predictor-default-a",
Labels: map[string]string{
"serving.knative.dev/service": "test-model-1-predictor-default",
},
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{Name: "storage-initializer"},
},
Containers: []v1.Container{
{Name: "kfserving-container"},
{Name: "queue-proxy"},
{Name: "inferenceservice-logger"},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-model-1-predictor-default-b",
Labels: map[string]string{
"serving.knative.dev/service": "test-model-1-predictor-default",
},
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{Name: "storage-initializer"},
},
Containers: []v1.Container{
{Name: "kfserving-container"},
{Name: "queue-proxy"},
{Name: "inferenceservice-logger"},
},
},
},
}}, nil
})

ctl := &controller{
clusterClient: v1Client.CoreV1(),
}

podList, err := ctl.ListPods(namespace, "serving.knative.dev/service=test-model-1-predictor-default")

assert.Nil(t, err)
assert.Equal(t, 2, len(podList.Items))
assert.Equal(t, "test-model-1-predictor-default-a", podList.Items[0].ObjectMeta.Name)
assert.Equal(t, "test-model-1-predictor-default-b", podList.Items[1].ObjectMeta.Name)
}
52 changes: 51 additions & 1 deletion api/cluster/mocks/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading