Skip to content

Commit 0f0b567

Browse files
author
arpechenin
committed
- add standard grpc metrics to api-server
- add report gap histogram - optimize create or update tasks query Signed-off-by: ntny <[email protected]> Signed-off-by: arpechenin <[email protected]>
1 parent 9b252ff commit 0f0b567

File tree

8 files changed

+71
-11
lines changed

8 files changed

+71
-11
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
## Metrics
2+
3+
Standard gRPC metrics are available via https://github.com/grpc-ecosystem/go-grpc-prometheus
4+
5+
### Sample
6+
7+
gRPC RPMs by grpc_method and grpc_code for the ml-pipeline server:
8+
```yaml
9+
sum by (grpc_service, grpc_method, grpc_code) (
10+
rate(grpc_server_handled_total{app=~"ml-pipeline", kubernetes_namespace="kubeflow"}[1m])
11+
) * 60
12+
```
13+
14+
95th percentile gRPC latency by grpc_service and grpc_method for the ml-pipeline server:
15+
```yaml
16+
histogram_quantile(
17+
0.95,
18+
sum by (grpc_service, grpc_method, le) (
19+
rate(grpc_server_handling_seconds_bucket{
20+
app="ml-pipeline",
21+
kubernetes_namespace="kubeflow"
22+
}[1m])
23+
)
24+
)
25+
```
26+
27+
Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent
28+
```yaml
29+
histogram_quantile(0.95, sum(rate(resource_manager_recurring_run_report_gap_bucket[1h])) by (le))
30+
```
31+
32+

backend/src/apiserver/main.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import (
2727
"strings"
2828
"sync"
2929

30+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
31+
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
32+
3033
"github.com/fsnotify/fsnotify"
3134
"github.com/golang/glog"
3235
"github.com/gorilla/mux"
@@ -208,7 +211,17 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
208211
if err != nil {
209212
glog.Fatalf("Failed to start RPC server: %v", err)
210213
}
211-
s := grpc.NewServer(grpc.UnaryInterceptor(apiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32))
214+
215+
grpc_prometheus.EnableHandlingTimeHistogram(
216+
grpc_prometheus.WithHistogramBuckets([]float64{
217+
0.01, 0.03, 0.1, 0.3, 1, 3, 10, 15, 30, 60, 120, 300, //10 ms -> 5 min
218+
}),
219+
)
220+
m := grpc_middleware.WithUnaryServerChain(
221+
apiServerInterceptor,
222+
grpc_prometheus.UnaryServerInterceptor,
223+
)
224+
s := grpc.NewServer(m, grpc.MaxRecvMsgSize(math.MaxInt32))
212225

213226
sharedExperimentServer := server.NewExperimentServer(resourceManager, &server.ExperimentServerOptions{CollectMetrics: *collectMetricsFlag})
214227
sharedPipelineServer := server.NewPipelineServer(

backend/src/apiserver/resource/resource_manager.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net"
2323
"reflect"
2424
"strconv"
25+
"time"
2526

2627
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
2728

@@ -77,6 +78,13 @@ var (
7778
Name: "resource_manager_workflow_runs_failed",
7879
Help: "The current number of failed workflow runs",
7980
}, extraLabels)
81+
82+
// Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent.
83+
recurringPipelineRunReportGap = promauto.NewHistogram(prometheus.HistogramOpts{
84+
Name: "resource_manager_recurring_run_report_gap",
85+
Help: "Delay of recurring pipeline run report",
86+
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
87+
})
8088
)
8189

8290
type ClientManagerInterface interface {
@@ -1238,8 +1246,8 @@ func (r *ResourceManager) DeleteJob(ctx context.Context, jobId string) error {
12381246

12391247
// Creates new tasks or updates existing ones.
12401248
// This is not a part of internal API exposed to persistence agent only.
1241-
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task) ([]*model.Task, error) {
1242-
tasks, err := r.taskStore.CreateOrUpdateTasks(t)
1249+
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task, runId string) ([]*model.Task, error) {
1250+
tasks, err := r.taskStore.CreateOrUpdateTasks(t, runId)
12431251
if err != nil {
12441252
return nil, util.Wrap(err, "Failed to create or update tasks")
12451253
}
@@ -1388,6 +1396,10 @@ func (r *ResourceManager) ReportWorkflowResource(ctx context.Context, execSpec u
13881396
},
13891397
}
13901398
run, err = r.runStore.CreateRun(run)
1399+
if r.options.CollectMetrics && !execStatus.StartedAtTime().Time.IsZero() {
1400+
reportGap := time.Since(execStatus.StartedAtTime().Time).Seconds()
1401+
recurringPipelineRunReportGap.Observe(reportGap)
1402+
}
13911403
if err != nil {
13921404
return nil, util.Wrapf(err, "Failed to report a workflow due to error creating run %s", runId)
13931405
} else {

backend/src/apiserver/server/report_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (s ReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec, runI
4343
if err != nil {
4444
return nil, util.Wrap(err, "Failed to report tasks of an execution")
4545
}
46-
return s.resourceManager.CreateOrUpdateTasks(tasks)
46+
return s.resourceManager.CreateOrUpdateTasks(tasks, runId)
4747
}
4848

4949
// Reports a workflow.

backend/src/apiserver/storage/task_store.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type TaskStoreInterface interface {
6161
ListTasks(filterContext *model.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)
6262

6363
// Creates new tasks or updates the existing ones.
64-
CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error)
64+
CreateOrUpdateTasks(tasks []*model.Task, runId string) ([]*model.Task, error)
6565
}
6666

6767
type TaskStore struct {
@@ -324,15 +324,15 @@ func (s *TaskStore) GetTask(id string) (*model.Task, error) {
324324
}
325325

326326
// Updates missing fields with existing data entries.
327-
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
327+
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task, runId string) error {
328328
var podNames []string
329329
for _, task := range tasks {
330330
podNames = append(podNames, task.PodName)
331331
}
332332
sql, args, err := sq.
333333
Select(taskColumns...).
334334
From("tasks").
335-
Where(sq.Eq{"PodName": podNames}).
335+
Where(sq.Eq{"PodName": podNames, "RunUUID": runId}).
336336
ToSql()
337337
if err != nil {
338338
return util.NewInternalServerError(err, "Failed to create query to check existing tasks")
@@ -359,7 +359,7 @@ func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
359359
}
360360

361361
// Creates new entries or updates existing ones.
362-
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error) {
362+
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task, runId string) ([]*model.Task, error) {
363363
buildQuery := func(ts []*model.Task) (string, []interface{}, error) {
364364
sqlInsert := sq.Insert("tasks").Columns(taskColumnsWithPayload...)
365365
for _, t := range ts {
@@ -405,7 +405,7 @@ func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, err
405405

406406
// Check for existing tasks and fill empty field with existing data.
407407
// Assumes that PodName column is a unique key.
408-
if err := s.patchWithExistingTasks(tasks); err != nil {
408+
if err := s.patchWithExistingTasks(tasks, runId); err != nil {
409409
return nil, util.NewInternalServerError(err, "Failed to check for existing tasks")
410410
}
411411
for _, task := range tasks {

backend/src/apiserver/storage/task_store_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ func TestTaskStore_patchWithExistingTasks(t *testing.T) {
504504
}
505505
for _, tt := range tests {
506506
t.Run(tt.name, func(t *testing.T) {
507-
err := taskStore.patchWithExistingTasks(tt.tasks)
507+
err := taskStore.patchWithExistingTasks(tt.tasks, defaultFakeRunIdTwo)
508508
if tt.wantErr {
509509
assert.NotNil(t, err)
510510
assert.Contains(t, err.Error(), tt.errMsg)
@@ -611,7 +611,7 @@ func TestTaskStore_UpdateOrCreateTasks(t *testing.T) {
611611
}
612612
for _, tt := range tests {
613613
t.Run(tt.name, func(t *testing.T) {
614-
got, err := taskStore.CreateOrUpdateTasks(tt.tasks)
614+
got, err := taskStore.CreateOrUpdateTasks(tt.tasks, defaultFakeRunIdTwo)
615615
assert.Nil(t, err)
616616
assert.Equal(t, tt.want, got)
617617
})

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ require (
2626
github.com/google/uuid v1.6.0
2727
github.com/gorilla/mux v1.8.0
2828
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
29+
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
2930
github.com/grpc-ecosystem/grpc-gateway v1.16.0
3031
github.com/jackc/pgx/v5 v5.5.4
3132
github.com/jinzhu/gorm v1.9.1

go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)