Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func (fc *FlowController) EnqueueAndWait(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName())
metrics.AddFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName(), req.ByteSize())
defer metrics.SubFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName(), req.ByteSize())
Comment on lines +225 to +232
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Even though req.ByteSize() is technically immutable right now, it is a defensive best practice to capture the size in a local variable before the defer.

This guarantees that the Add and Sub operations are always mathematically symmetric. If a future refactor makes changes how ByteSize() is calculated (making it mutable), we risk the Gauge drifting permanently (e.g., subtracting more than we added or vice versa).


// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
Expand Down
21 changes: 21 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ var (
},
append([]string{"fairness_id", "priority", "inference_pool"}, ModelLabels...),
)

flowControlQueueBytes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: InferenceExtension,
Name: "flow_control_queue_bytes",
Help: metricsutil.HelpMsgWithStability("Current number of bytes associated with requests actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
},
append([]string{"fairness_id", "priority", "inference_pool"}, ModelLabels...),
)
)

// --- Inference Model Rewrite Metrics ---
Expand Down Expand Up @@ -461,6 +470,7 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(PrefixCacheHitLength)
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
metrics.Registry.MustRegister(flowControlQueueSize)
metrics.Registry.MustRegister(flowControlQueueBytes)
metrics.Registry.MustRegister(inferenceModelRewriteDecisionsTotal)
for _, collector := range customCollectors {
metrics.Registry.MustRegister(collector)
Expand Down Expand Up @@ -507,6 +517,7 @@ func Reset() {
PrefixCacheHitLength.Reset()
flowControlRequestQueueDuration.Reset()
flowControlQueueSize.Reset()
flowControlQueueBytes.Reset()
inferenceModelRewriteDecisionsTotal.Reset()
}

Expand Down Expand Up @@ -797,6 +808,16 @@ func DecFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, tar
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Dec()
}

// AddFlowControlQueueBytes increments the Flow Control queue bytes gauge.
func AddFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Add(float64(bytes))
}

// SubFlowControlQueueBytes decrements the Flow Control queue bytes gauge.
func SubFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Sub(float64(bytes))
}

// SetTTFTSLOThreshold sets the TTFT SLO threshold for a model.
// This allows dynamic threshold management and makes the threshold visible in metrics.
func SetTTFTSLOThreshold(modelName, targetModelName string, threshold float64) {
Expand Down
38 changes: 38 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,44 @@ func TestFlowControlQueueSizeMetric(t *testing.T) {
require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0")
}

func TestFlowControlQueueBytesMetric(t *testing.T) {
Reset()

const (
pool = "pool-1"
model = "llama-2"
target = "llama-base"
)

// Basic Inc/Dec
AddFlowControlQueueBytes("user-a", "100", pool, model, target, 32.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The helper function AddFlowControlQueueBytes accepts a uint64. While Go's untyped constants allow 32.0 to compile, it is cleaner to use integer literals (e.g., 32) to match the function signature.

val, err := testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-a", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-a/100 after Inc")
require.Equal(t, 32.0, val, "Gauge value should be 32 after Add for user-a/100")

SubFlowControlQueueBytes("user-a", "100", pool, model, target, 32)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-a", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-a/100 after Sub")
require.Equal(t, 0.0, val, "Gauge value should be 0 after Sub for user-a/100")

// Multiple labels
AddFlowControlQueueBytes("user-b", "200", pool, model, target, 32.0)
AddFlowControlQueueBytes("user-b", "200", pool, model, target, 16.0)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-b", "200", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-b/200")
require.Equal(t, 48.0, val, "Gauge value should be 48 for user-b/200")

SubFlowControlQueueBytes("user-b", "200", pool, model, target, 48.0)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-b", "200", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-b/200 after one Sub")
require.Equal(t, 0.0, val, "Gauge value should be 0 for user-b/200 after one Sub")

// Non-existent labels
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-c", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for non-existent user-c/100")
require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0")
}

func TestInferenceModelRewriteDecisionsTotalMetric(t *testing.T) {
Reset()

Expand Down
1 change: 1 addition & 0 deletions site-src/guides/metrics-and-observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ These metrics provide insights into the experimental flow control layer within t
|:---|:---|:---|:---|:---|
| inference_extension_flow_control_request_queue_duration_seconds | Distribution | Distribution of the total time requests spend in the flow control layer. This is measured from the moment a request enters the `EnqueueAndWait` function until it reaches a final outcome (e.g., Dispatched, Rejected, Evicted). | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `outcome`=&lt;QueueOutcome&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_extension_flow_control_queue_size | Gauge | The current number of requests being actively managed by the flow control layer. This counts requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_extension_flow_control_queue_bytes | Gauge | The current size in bytes of all requests being actively managed by the flow control layer. This includes requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |

## Scrape Metrics & Pprof profiles

Expand Down