Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
97c0778
Update lazy-log package
ariefrahmansyah Jul 22, 2021
48fbb8c
Refactor stream logs API
ariefrahmansyah Jul 26, 2021
629febc
Introducing ContainerLogsView component
ariefrahmansyah Jul 26, 2021
13c54df
Support logging for pyfunc image builder and batch job
ariefrahmansyah Jul 27, 2021
613e597
Fix batch job's image builder log. Support prefixing log with pod & c…
ariefrahmansyah Jul 27, 2021
9997ef0
Add batch job executor log
ariefrahmansyah Jul 27, 2021
8d15c14
Dockerfile: Add git so we Yarn installation can succeed
ariefrahmansyah Jul 27, 2021
e1ca2bd
Use node:14 as node-builder base image
ariefrahmansyah Jul 27, 2021
a5be6c0
Colorized the pod + container in log
ariefrahmansyah Jul 27, 2021
62e4fa9
Use actions/setup-node@v2 and node v14
ariefrahmansyah Jul 27, 2021
0c57d95
Update react-lazylog package to use gojekfarm to sovle yarn install i…
ariefrahmansyah Jul 27, 2021
cab8007
We still need react-lazylog's prepare script.
ariefrahmansyah Jul 27, 2021
6958e9b
Refactor stackdriver log
ariefrahmansyah Jul 27, 2021
9252127
Fix API's unit test first
ariefrahmansyah Jul 27, 2021
7aadad4
Add more test to cluster and log_service
ariefrahmansyah Jul 27, 2021
0ce58c3
Fix UI wording
ariefrahmansyah Jul 27, 2021
d2eb491
Update swagger
ariefrahmansyah Jul 27, 2021
ee32d61
Make sure color lib turned on
ariefrahmansyah Jul 27, 2021
cd0bc4f
Add build-essential and etc isntallation on Mlflows' Dockerfile
ariefrahmansyah Jul 28, 2021
b5647ca
Update API test
ariefrahmansyah Jul 28, 2021
3d154c7
Use gojekfarm's react-lazylog fork
ariefrahmansyah Jul 28, 2021
55f9de8
Update how to close channel; getContainerLogs async
ariefrahmansyah Jul 29, 2021
5d49916
Use request.Context() for termintation
ariefrahmansyah Jul 29, 2021
93d92a7
Fix API test
ariefrahmansyah Jul 29, 2021
25f3344
Modularize pprof routes into a spearate function
ariefrahmansyah Jul 29, 2021
7b9940b
Address aria's review
ariefrahmansyah Jul 29, 2021
2cf726e
Use unbuffered channel for sending log line
ariefrahmansyah Jul 29, 2021
f937d87
Periodically update component list and address reviews
ariefrahmansyah Jul 29, 2021
2f30d2c
Simplify component refresh & log api context cancellation
ariefrahmansyah Jul 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions api/api/log_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package api

import (
"bufio"
"fmt"
"net/http"
"time"

"github.com/gorilla/schema"

Expand All @@ -42,38 +42,34 @@ func (l *LogController) ReadLog(w http.ResponseWriter, r *http.Request) {
return
}

res, err := l.LogService.ReadLog(&query)
if err != nil {
log.Errorf("Error while retrieving log %v", err)
InternalServerError(fmt.Sprintf("Error while retrieving log for container %s: %s", query.Name, err)).WriteTo(w)
return
}
podLogs := make(chan service.PodLog)
stopCh := make(chan struct{})

// send status code and content-type
w.Header().Set("Content-Type", "plain/text; charset=UTF-8")
w.WriteHeader(http.StatusOK)

// stream the response body
defer res.Close()
buff := bufio.NewReader(res)
for {
line, readErr := buff.ReadString('\n')
_, writeErr := w.Write([]byte(line))
if writeErr != nil {
// connection from caller is closed
return
}
go func() {
for podLog := range podLogs {
// _, writeErr := w.Write([]byte(podLog.Timestamp.Format(time.RFC3339) + " " + podLog.PodName + "/" + podLog.ContainerName + ": " + podLog.TextPayload + "\n"))
_, writeErr := w.Write([]byte(podLog.Timestamp.Format(time.RFC3339) + " " + podLog.TextPayload + "\n"))
if writeErr != nil {
// connection from caller is closed
close(stopCh)
return
}

// send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
// send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
}
}
}()

if readErr != nil {
// unable to read log from container anymore most likely EOF
return
}
if err := l.LogService.StreamLogs(podLogs, stopCh, &query); err != nil {
InternalServerError(err.Error())
return
}
}
22 changes: 12 additions & 10 deletions api/cluster/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,25 @@ func (cf *containerFetcher) GetContainers(namespace string, labelSelector string
for _, pod := range podList.Items {
for _, c := range pod.Spec.Containers {
container := &models.Container{
Name: c.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
Name: c.Name,
PodName: pod.Name,
ComponentType: models.ComponentType(pod.Name),
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}

containers = append(containers, container)
}

for _, ic := range pod.Spec.InitContainers {
container := &models.Container{
Name: ic.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
Name: ic.Name,
PodName: pod.Name,
ComponentType: models.ComponentType(pod.Name),
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}

containers = append(containers, container)
Expand Down
11 changes: 6 additions & 5 deletions api/imagebuilder/imagebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ func (c *imageBuilder) GetContainers(project mlp.Project, model *models.Model, v
containers := make([]*models.Container, 0)
for _, pod := range pods.Items {
containers = append(containers, &models.Container{
Name: containerName,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: c.config.ClusterName,
GcpProject: c.config.GcpProject,
Name: containerName,
PodName: pod.Name,
ComponentType: models.ImageBuilderComponentType,
Namespace: pod.Namespace,
Cluster: c.config.ClusterName,
GcpProject: c.config.GcpProject,
})
}

Expand Down
22 changes: 22 additions & 0 deletions api/models/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,46 @@ package models

import (
"fmt"
"strings"

"github.com/google/uuid"
)

const (
onlineInferenceLabelTemplate = "serving.kubeflow.org/inferenceservice=%s"
batchInferenceLabelTemplate = "prediction-job-id=%s"

ImageBuilderComponentType = "image_builder"
ModelComponentType = "model"
TransformerComponentType = "transformer"
BatchJobDriverComponentType = "batch_job_driver"
BatchJobExecutorComponentType = "batch_job_executor"
)

type Container struct {
Name string `json:"name"`
PodName string `json:"pod_name"`
ComponentType string `json:"component_type"`
Namespace string `json:"namespace"`
Cluster string `json:"cluster"`
GcpProject string `json:"gcp_project"`
VersionEndpointID uuid.UUID `json:"version_endpoint_id"`
}

func ComponentType(podName string) string {
componentType := ""
if strings.Contains(podName, "predictor-default") {
componentType = ModelComponentType
} else if strings.Contains(podName, "transformer-default") {
componentType = TransformerComponentType
} else if strings.Contains(podName, "driver") {
componentType = BatchJobDriverComponentType
} else if strings.Contains(podName, "executor") {
componentType = BatchJobDriverComponentType
}
return componentType
}

func OnlineInferencePodLabelSelector(modelName string, versionID string) string {
serviceName := CreateInferenceServiceName(modelName, versionID)
return fmt.Sprintf(onlineInferenceLabelTemplate, serviceName)
Expand Down
Loading