Skip to content

feat(backend): add gRPC metrics to api-server (RPS/latency), optimize execution spec reporting #12010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions backend/src/apiserver/api-server-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
## Metrics

Standard gRPC metrics are available via https://github.com/grpc-ecosystem/go-grpc-middleware/tree/main/providers/prometheus

### Sample

gRPC RPMs by grpc_method and grpc_code for the ml-pipeline server:
```yaml
sum by (grpc_service, grpc_method, grpc_code) (
rate(grpc_server_handled_total{app=~"ml-pipeline", kubernetes_namespace="kubeflow"}[1m])
) * 60
```

95th percentile gRPC latency by grpc_service and grpc_method for the ml-pipeline server:
```yaml
histogram_quantile(
0.95,
sum by (grpc_service, grpc_method, le) (
rate(grpc_server_handling_seconds_bucket{
app="ml-pipeline",
kubernetes_namespace="kubeflow"
}[1m])
)
)
```

Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent
```yaml
histogram_quantile(0.95, sum(rate(resource_manager_recurring_run_report_gap_bucket[1h])) by (le))
```


25 changes: 24 additions & 1 deletion backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"

"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -208,7 +212,26 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
if err != nil {
glog.Fatalf("Failed to start RPC server: %v", err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(apiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32))

metrics := grpc_prometheus.NewServerMetrics(
grpc_prometheus.WithServerHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.01, 0.03, 0.1, 0.3, 1, 3, 10, 15, 30, 60, 120, 300,
}),
),
)

prometheus.MustRegister(metrics)

s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
apiServerInterceptor,
metrics.UnaryServerInterceptor(),
),
grpc.MaxRecvMsgSize(math.MaxInt32),
)

metrics.InitializeMetrics(s)

sharedExperimentServer := server.NewExperimentServer(resourceManager, &server.ExperimentServerOptions{CollectMetrics: *collectMetricsFlag})
sharedPipelineServer := server.NewPipelineServer(
Expand Down
16 changes: 14 additions & 2 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"reflect"
"strconv"
"time"

scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"

Expand Down Expand Up @@ -77,6 +78,13 @@ var (
Name: "resource_manager_workflow_runs_failed",
Help: "The current number of failed workflow runs",
}, extraLabels)

// Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent.
recurringPipelineRunReportGap = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "resource_manager_recurring_run_report_gap",
Help: "Delay of recurring pipeline run report",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
})
)

type ClientManagerInterface interface {
Expand Down Expand Up @@ -1238,8 +1246,8 @@ func (r *ResourceManager) DeleteJob(ctx context.Context, jobId string) error {

// Creates new tasks or updates existing ones.
// This is not a part of internal API exposed to persistence agent only.
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task) ([]*model.Task, error) {
tasks, err := r.taskStore.CreateOrUpdateTasks(t)
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task, runId string) ([]*model.Task, error) {
tasks, err := r.taskStore.CreateOrUpdateTasks(t, runId)
if err != nil {
return nil, util.Wrap(err, "Failed to create or update tasks")
}
Expand Down Expand Up @@ -1388,6 +1396,10 @@ func (r *ResourceManager) ReportWorkflowResource(ctx context.Context, execSpec u
},
}
run, err = r.runStore.CreateRun(run)
if r.options.CollectMetrics && !execStatus.StartedAtTime().Time.IsZero() {
reportGap := time.Since(execStatus.StartedAtTime().Time).Seconds()
recurringPipelineRunReportGap.Observe(reportGap)
}
if err != nil {
return nil, util.Wrapf(err, "Failed to report a workflow due to error creating run %s", runId)
} else {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/report_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s ReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec, runI
if err != nil {
return nil, util.Wrap(err, "Failed to report tasks of an execution")
}
return s.resourceManager.CreateOrUpdateTasks(tasks)
return s.resourceManager.CreateOrUpdateTasks(tasks, runId)
}

// Reports a workflow.
Expand Down
10 changes: 5 additions & 5 deletions backend/src/apiserver/storage/task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type TaskStoreInterface interface {
ListTasks(filterContext *model.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)

// Creates new tasks or updates the existing ones.
CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error)
CreateOrUpdateTasks(tasks []*model.Task, runId string) ([]*model.Task, error)
}

type TaskStore struct {
Expand Down Expand Up @@ -324,15 +324,15 @@ func (s *TaskStore) GetTask(id string) (*model.Task, error) {
}

// Updates missing fields with existing data entries.
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task, runId string) error {
var podNames []string
for _, task := range tasks {
podNames = append(podNames, task.PodName)
}
sql, args, err := sq.
Select(taskColumns...).
From("tasks").
Where(sq.Eq{"PodName": podNames}).
Where(sq.Eq{"PodName": podNames, "RunUUID": runId}).
ToSql()
if err != nil {
return util.NewInternalServerError(err, "Failed to create query to check existing tasks")
Expand All @@ -359,7 +359,7 @@ func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
}

// Creates new entries or updates existing ones.
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error) {
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task, runId string) ([]*model.Task, error) {
buildQuery := func(ts []*model.Task) (string, []interface{}, error) {
sqlInsert := sq.Insert("tasks").Columns(taskColumnsWithPayload...)
for _, t := range ts {
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, err

// Check for existing tasks and fill empty field with existing data.
// Assumes that PodName column is a unique key.
if err := s.patchWithExistingTasks(tasks); err != nil {
if err := s.patchWithExistingTasks(tasks, runId); err != nil {
return nil, util.NewInternalServerError(err, "Failed to check for existing tasks")
}
for _, task := range tasks {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/storage/task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestTaskStore_patchWithExistingTasks(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := taskStore.patchWithExistingTasks(tt.tasks)
err := taskStore.patchWithExistingTasks(tt.tasks, defaultFakeRunIdTwo)
if tt.wantErr {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), tt.errMsg)
Expand Down Expand Up @@ -611,7 +611,7 @@ func TestTaskStore_UpdateOrCreateTasks(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := taskStore.CreateOrUpdateTasks(tt.tasks)
got, err := taskStore.CreateOrUpdateTasks(tt.tasks, defaultFakeRunIdTwo)
assert.Nil(t, err)
assert.Equal(t, tt.want, got)
})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jackc/pgx/v5 v5.5.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading