Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pkg/telemetry/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package telemetry

import (
"context"

"github.com/superplanehq/superplane/pkg/database"
"gorm.io/gorm"
)

const (
createMetricsCallbackName = "telemetry:record_created_rows"
updateMetricsCallbackName = "telemetry:record_updated_rows"
deleteMetricsCallbackName = "telemetry:record_deleted_rows"

dbOperationCreate = "create"
dbOperationUpdate = "update"
dbOperationDelete = "delete"
)

func registerDBOperationMetricsCallbacks() error {
db := database.Conn()
var err error

if db.Callback().Create().Get(createMetricsCallbackName) == nil {
err = db.Callback().Create().After("gorm:create").Register(createMetricsCallbackName, recordCreatedRowsMetric)
if err != nil {
return err
}
}

if db.Callback().Update().Get(updateMetricsCallbackName) == nil {
err = db.Callback().Update().After("gorm:update").Register(updateMetricsCallbackName, recordUpdatedRowsMetric)
if err != nil {
return err
}
}

if db.Callback().Delete().Get(deleteMetricsCallbackName) == nil {
err = db.Callback().Delete().After("gorm:delete").Register(deleteMetricsCallbackName, recordDeletedRowsMetric)
if err != nil {
return err
}
}

return nil
}

func recordCreatedRowsMetric(tx *gorm.DB) {
recordRowsAffectedMetric(tx, dbOperationCreate)
}

func recordUpdatedRowsMetric(tx *gorm.DB) {
recordRowsAffectedMetric(tx, dbOperationUpdate)
}

func recordDeletedRowsMetric(tx *gorm.DB) {
recordRowsAffectedMetric(tx, dbOperationDelete)
}

func recordRowsAffectedMetric(tx *gorm.DB, operation string) {
if tx == nil || tx.Statement == nil || tx.RowsAffected <= 0 {
return
}

tableName := tx.Statement.Table
if tableName == "" && tx.Statement.Schema != nil {
tableName = tx.Statement.Schema.Table
}
if tableName == "" {
return
}

ctx := tx.Statement.Context
if ctx == nil {
ctx = context.Background()
}

RecordDBRowsAffected(ctx, tx.RowsAffected, tableName, operation)
}
69 changes: 69 additions & 0 deletions pkg/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"

Expand Down Expand Up @@ -34,6 +35,11 @@ var (

dbLocksCountHistogram metric.Int64Histogram
dbLongQueriesCountHistogram metric.Int64Histogram

dbRowsAffectedCounter metric.Int64Counter

pendingEventsGauge metric.Int64Gauge
pendingExecutionsGauge metric.Int64Gauge
)

func InitMetrics(ctx context.Context) error {
Expand Down Expand Up @@ -168,6 +174,38 @@ func InitMetrics(ctx context.Context) error {
return err
}

dbRowsAffectedCounter, err = meter.Int64Counter(
"db.rows.affected.count",
metric.WithDescription("Number of database rows affected by operation"),
metric.WithUnit("1"),
)
if err != nil {
return err
}

pendingEventsGauge, err = meter.Int64Gauge(
"workflow_events.pending.count",
metric.WithDescription("Current number of pending workflow events"),
metric.WithUnit("1"),
)
if err != nil {
return err
}

pendingExecutionsGauge, err = meter.Int64Gauge(
"workflow_node_executions.pending.count",
metric.WithDescription("Current number of pending workflow node executions"),
metric.WithUnit("1"),
)
if err != nil {
return err
}

err = registerDBOperationMetricsCallbacks()
if err != nil {
return err
}

StartPeriodicMetricsReporter()

metricsReady.Store(true)
Expand Down Expand Up @@ -283,3 +321,34 @@ func RecordDBLongQueriesCount(ctx context.Context, count int64) {

dbLongQueriesCountHistogram.Record(ctx, count)
}

func RecordDBRowsAffected(ctx context.Context, count int64, tableName, operation string) {
if !metricsReady.Load() {
return
}

dbRowsAffectedCounter.Add(
ctx,
count,
metric.WithAttributes(
attribute.String("table", tableName),
attribute.String("operation", operation),
),
)
}

func RecordPendingEventsCount(ctx context.Context, count int64) {
if !metricsReady.Load() {
return
}

pendingEventsGauge.Record(ctx, count)
}

func RecordPendingExecutionsCount(ctx context.Context, count int64) {
if !metricsReady.Load() {
return
}

pendingExecutionsGauge.Record(ctx, count)
}
48 changes: 48 additions & 0 deletions pkg/telemetry/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (p *Periodic) report() {
p.reportDatabaseLocks()
p.reportLongQueries()
p.reportStuckQueueItems()
p.reportPendingEvents()
p.reportPendingExecutions()
}

func (p *Periodic) reportDatabaseLocks() {
Expand Down Expand Up @@ -75,6 +77,24 @@ func (p *Periodic) reportLongQueries() {
RecordDBLongQueriesCount(p.ctx, count)
}

func (p *Periodic) reportPendingEvents() {
count, err := countPendingEvents()
if err != nil {
return
}

RecordPendingEventsCount(p.ctx, count)
}

func (p *Periodic) reportPendingExecutions() {
count, err := countPendingExecutions()
if err != nil {
return
}

RecordPendingExecutionsCount(p.ctx, count)
}

func countStuckQueueNodes() (int64, error) {
db := database.Conn()

Expand Down Expand Up @@ -104,3 +124,31 @@ func countStuckQueueNodes() (int64, error) {

return count, nil
}

func countPendingEvents() (int64, error) {
var count int64

err := database.Conn().
Raw("SELECT COUNT(id) FROM workflow_events WHERE state = 'pending'").
Scan(&count).
Error
if err != nil {
return 0, err
}

return count, nil
}
Comment thread
cursor[bot] marked this conversation as resolved.

func countPendingExecutions() (int64, error) {
var count int64

err := database.Conn().
Raw("SELECT COUNT(id) FROM workflow_node_executions WHERE state = 'pending'").
Scan(&count).
Error
if err != nil {
return 0, err
}

return count, nil
}
Comment thread
cursor[bot] marked this conversation as resolved.
56 changes: 56 additions & 0 deletions pkg/telemetry/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,62 @@ func TestCountStuckQueueNodes_NodeWithNonFinishedExecutionIsNotCounted(t *testin
require.Equal(t, int64(0), count)
}

func TestCountPendingEvents(t *testing.T) {
database.TruncateTables()

steps := stuckQueueItemsTestSteps{t: t}
steps.CreateWorkflow()
steps.CreateWorkflowNode()
steps.CreateRootEvent()

routedEvent := &models.CanvasEvent{
WorkflowID: steps.workflow.ID,
NodeID: steps.node.NodeID,
Channel: "default",
Data: datatypes.JSONType[any]{},
State: models.CanvasEventStateRouted,
}

require.NoError(t, database.Conn().Create(routedEvent).Error)

count, err := countPendingEvents()
require.NoError(t, err)
require.Equal(t, int64(1), count)
}
Comment thread
lucaspin marked this conversation as resolved.

func TestCountPendingExecutions(t *testing.T) {
database.TruncateTables()

steps := stuckQueueItemsTestSteps{t: t}
steps.CreateWorkflow()
steps.CreateWorkflowNode()
steps.CreateRootEvent()

pendingExecution := &models.CanvasNodeExecution{
WorkflowID: steps.workflow.ID,
NodeID: steps.node.NodeID,
RootEventID: steps.rootEvent.ID,
EventID: steps.rootEvent.ID,
State: models.CanvasNodeExecutionStatePending,
}

require.NoError(t, database.Conn().Create(pendingExecution).Error)

startedExecution := &models.CanvasNodeExecution{
WorkflowID: steps.workflow.ID,
NodeID: steps.node.NodeID,
RootEventID: steps.rootEvent.ID,
EventID: steps.rootEvent.ID,
State: models.CanvasNodeExecutionStateStarted,
}

require.NoError(t, database.Conn().Create(startedExecution).Error)

count, err := countPendingExecutions()
require.NoError(t, err)
require.Equal(t, int64(1), count)
}

type stuckQueueItemsTestSteps struct {
t *testing.T
workflow *models.Canvas
Expand Down
Loading