Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apix/config/v1alpha1/endpointpickerconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,19 @@ type PriorityBandConfig struct {
// If 0 or omitted, the system default is used.
// Default: 1 GB.
MaxBytes *int64 `json:"maxBytes,omitempty"`

// +optional
// FairnessPolicyRef specifies the name of the policy that governs flow selection.
// If omitted, the system default ("global-strict-fairness-policy") is used.
FairnessPolicyRef string `json:"fairnessPolicyRef,omitempty"`

// +optional
// OrderingPolicyRef specifies the name of the policy that governs request selection within a flow.
// If omitted, the system default ("fcfs-ordering-policy") is used.
OrderingPolicyRef string `json:"orderingPolicyRef,omitempty"`
}

func (pbc PriorityBandConfig) String() string {
return fmt.Sprintf("{Priority: %d, MaxBytes: %v}", pbc.Priority, pbc.MaxBytes)
return fmt.Sprintf("{Priority: %d, MaxBytes: %v, FairnessPolicyRef: %s, OrderingPolicyRef: %s}",
pbc.Priority, pbc.MaxBytes, pbc.FairnessPolicyRef, pbc.OrderingPolicyRef)
}
7 changes: 7 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/fairness"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/ordering"
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/scheduling/picker"
Expand Down Expand Up @@ -378,6 +380,11 @@ func (r *Runner) registerInTreePlugins() {
fwkplugin.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
fwkplugin.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory)
fwkplugin.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
// Flow Control plugins
fwkplugin.Register(fairness.GlobalStrictFairnessPolicyType, fairness.GlobalStrictFairnessPolicyFactory)
fwkplugin.Register(fairness.RoundRobinFairnessPolicyType, fairness.RoundRobinFairnessPolicyFactory)
fwkplugin.Register(ordering.FCFSOrderingPolicyType, ordering.FCFSOrderingPolicyFactory)
fwkplugin.Register(ordering.EDFOrderingPolicyType, ordering.EDFOrderingPolicyFactory)
// Latency predictor plugins
fwkplugin.Register(predictedlatency.PredictedLatencyPluginType, predictedlatency.PredictedLatencyFactory)
// register filter for test purpose only (used in conformance tests)
Expand Down
82 changes: 74 additions & 8 deletions pkg/epp/config/loader/configloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/fairness"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/ordering"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
flowcontrolmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol/mocks"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
Expand Down Expand Up @@ -240,6 +241,37 @@ func TestInstantiateAndConfigure(t *testing.T) {
require.NotNil(t, cfg.FlowControlConfig.Controller, "Controller config should be present")
require.Equal(t, 1*time.Minute, cfg.FlowControlConfig.Controller.DefaultRequestTTL, "DefaultRequestTTL should match yaml")
require.Equal(t, 1*time.Second, cfg.FlowControlConfig.Controller.ExpiryCleanupInterval, "ExpiryCleanupInterval should use default")

// Verify plugins were injected into the Raw Config.
foundFairness := false
foundOrdering := false
for _, p := range rawCfg.Plugins {
if p.Name == "global-strict-fairness-policy" {
foundFairness = true
}
if p.Name == "fcfs-ordering-policy" {
foundOrdering = true
}
}
require.True(t, foundFairness, "Loader should inject global-strict-fairness-policy")
require.True(t, foundOrdering, "Loader should inject fcfs-ordering-policy")

// Verify plugins exist in the Handle (Runtime).
require.NotNil(t, handle.Plugin("global-strict-fairness-policy"),
"Fairness policy should be instantiated in handle")
require.NotNil(t, handle.Plugin("fcfs-ordering-policy"), "Ordering policy should be instantiated in handle")

// Verify Registry Config wired them up.
require.NotNil(t, cfg.FlowControlConfig.Registry.DefaultPriorityBand.OrderingPolicy,
"DefaultPriorityBand should have a hydrated OrderingPolicy instance (plugin resolution failed)")
require.NotNil(t, cfg.FlowControlConfig.Registry.DefaultPriorityBand.FairnessPolicy,
"DefaultPriorityBand should have a hydrated FairnessPolicy instance (plugin resolution failed)")
require.Equal(t, registry.DefaultOrderingPolicyRef,
cfg.FlowControlConfig.Registry.DefaultPriorityBand.OrderingPolicy.TypedName().Name,
"DefaultPriorityBand should automatically be configured with the system default Ordering Policy")
require.Equal(t, registry.DefaultFairnessPolicyRef,
cfg.FlowControlConfig.Registry.DefaultPriorityBand.FairnessPolicy.TypedName().Name,
"DefaultPriorityBand should automatically be configured with the system default Fairness Policy")
},
},
{
Expand All @@ -251,6 +283,26 @@ func TestInstantiateAndConfigure(t *testing.T) {
require.Nil(t, cfg.FlowControlConfig, "Internal config should be nil when FeatureGate is disabled")
},
},
{
name: "Success - Complex Flow Control Config",
configText: successComplexFlowControlConfigText,
wantErr: false,
validate: func(t *testing.T, handle fwkplugin.Handle, rawCfg *configapi.EndpointPickerConfig, cfg *config.Config) {
require.NotNil(t, cfg.FlowControlConfig, "FlowControl config should be loaded")
require.Contains(t, cfg.FlowControlConfig.Registry.PriorityBands, 100, "Should contain priority band 100")
band := cfg.FlowControlConfig.Registry.PriorityBands[100]

// Verify custom policies.
require.Equal(t, "customFCFS", band.OrderingPolicy.TypedName().Name,
"Should use custom ordering policy name")
require.Equal(t, ordering.FCFSOrderingPolicyType, band.OrderingPolicy.TypedName().Type,
"Should be FCFS type")
require.Equal(t, "customFairness", band.FairnessPolicy.TypedName().Name,
"Should use custom fairness policy name")
require.Equal(t, fairness.GlobalStrictFairnessPolicyType, band.FairnessPolicy.TypedName().Type,
"Should be GlobalStrict type")
},
},

// --- Instantiation Errors ---
{
Expand Down Expand Up @@ -296,42 +348,56 @@ func TestInstantiateAndConfigure(t *testing.T) {
wantErr: true,
},

// --- Architectural Errors ---
// --- Feature Validation: Scheduling ---
{
name: "Error (Architectural) - Two Pickers in One Profile",
name: "Error (Scheduling) - Two Pickers in One Profile",
configText: errorTwoPickersText,
wantErr: true,
},
{
name: "Error (Deep Architectural) - Multiple Profile Handlers",
name: "Error (Scheduling) - Multiple Profile Handlers",
configText: errorTwoProfileHandlersText,
wantErr: true,
},
{
name: "Error (Deep Architectural) - Missing Profile Handler",
name: "Error (Scheduling) - Missing Profile Handler",
configText: errorNoProfileHandlersText,
wantErr: true,
},
{
name: "Error (Deep Architectural) - Multi-Profile with Single Handler",
name: "Error (Scheduling) - Multi-Profile with Single Handler",
configText: errorMultiProfilesUseSingleProfileHandlerText,
wantErr: true,
},

// --- Feature Validation: Data Layer ---
{
name: "Error - Missing Data Config",
name: "Error (DataLayer) - Missing Data Config",
configText: errorMissingDataConfigText,
wantErr: true,
},
{
name: "Error - Bad Source Reference",
name: "Error (DataLayer) - Bad Source Reference",
configText: errorBadSourceReferenceText,
wantErr: true,
},
{
name: "Error - Bad Extractor Reference",
name: "Error (DataLayer) - Bad Extractor Reference",
configText: errorBadExtractorReferenceText,
wantErr: true,
},

// --- Feature Validation: Flow Control ---
{
name: "Error (FlowControl) - Missing Policy Plugin",
configText: errorFlowControlMissingPluginText,
wantErr: true,
},
{
name: "Error (FlowControl) - Wrong Plugin Type",
configText: errorFlowControlWrongPluginTypeText,
wantErr: true,
},
}

for _, tc := range tests {
Expand Down
16 changes: 10 additions & 6 deletions pkg/epp/config/loader/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"fmt"

configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/fairness"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/ordering"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
framework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/scheduling/picker"
Expand Down Expand Up @@ -143,12 +142,17 @@ func ensureSchedulingLayer(
func ensureFlowControlLayer(
cfg *configapi.EndpointPickerConfig,
handle fwkplugin.Handle,
_ map[string]fwkplugin.Plugin,
allPlugins map[string]fwkplugin.Plugin,
) error {
if err := registerDefaultPlugin(cfg, handle, ordering.FCFSOrderingPolicyType); err != nil {
return err
if _, ok := allPlugins[registry.DefaultOrderingPolicyRef]; !ok {
if err := registerDefaultPlugin(cfg, handle, registry.DefaultOrderingPolicyRef); err != nil {
return err
}
}
return registerDefaultPlugin(cfg, handle, fairness.GlobalStrictFairnessPolicyType)
if _, ok := allPlugins[registry.DefaultFairnessPolicyRef]; !ok {
return registerDefaultPlugin(cfg, handle, registry.DefaultFairnessPolicyRef)
}
return nil
}

// registerDefaultPlugin instantiates a plugin with empty configuration (defaults) and adds it to both the handle and
Expand Down
66 changes: 66 additions & 0 deletions pkg/epp/config/loader/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ flowControl:
maxBytes: 1024
`

// successComplexFlowControlConfigText tests that Flow Control configuration with custom plugins is correctly loaded.
const successComplexFlowControlConfigText = `
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- name: maxScore
type: max-score-picker
- name: customFCFS
type: fcfs-ordering-policy
- name: customFairness
type: global-strict-fairness-policy
schedulingProfiles:
- name: default
plugins:
- pluginRef: maxScore
featureGates:
- flowControl
flowControl:
priorityBands:
- priority: 100
orderingPolicyRef: customFCFS
fairnessPolicyRef: customFairness
`

// --- Invalid Configurations (Syntax/Structure) ---

// errorBadYamlText contains invalid YAML syntax.
Expand Down Expand Up @@ -455,3 +479,45 @@ featureGates:
- dataLayer
- flowControl
`

// errorFlowControlMissingPluginText references a policy that does not exist.
const errorFlowControlMissingPluginText = `
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- name: maxScore
type: max-score-picker
schedulingProfiles:
- name: default
plugins:
- pluginRef: maxScore
featureGates:
- flowControl
flowControl:
priorityBands:
- priority: 100
orderingPolicyRef: non-existent-policy
`

// errorFlowControlWrongPluginTypeText references a plugin of the wrong type (Scorer instead of Policy).
const errorFlowControlWrongPluginTypeText = `
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- name: maxScore
type: max-score-picker
- name: testScorer
type: test-scorer
parameters:
blockSize: 32
schedulingProfiles:
- name: default
plugins:
- pluginRef: maxScore
featureGates:
- flowControl
flowControl:
priorityBands:
- priority: 100
orderingPolicyRef: testScorer # Wrong type
`
4 changes: 0 additions & 4 deletions pkg/epp/flowcontrol/framework/plugins/fairness/besthead.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
// TODO: rename files to `global_strict.go` and `global_strict_test.go`.
const GlobalStrictFairnessPolicyType = "global-strict-fairness-policy"

func init() {
fwkplugin.Register(GlobalStrictFairnessPolicyType, GlobalStrictFairnessPolicyFactory)
}

func GlobalStrictFairnessPolicyFactory(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return newGlobalStrict(name), nil
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/epp/flowcontrol/framework/plugins/fairness/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,8 @@ import (
// round-robin strategy.
const RoundRobinFairnessPolicyType = "round-robin-fairness-policy"

func init() {
fwkplugin.Register(
RoundRobinFairnessPolicyType,
func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return newRoundRobin(name), nil
},
)
func RoundRobinFairnessPolicyFactory(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return newRoundRobin(name), nil
}

// roundRobin implements FairnessPolicy.
Expand Down
26 changes: 17 additions & 9 deletions pkg/epp/flowcontrol/framework/plugins/ordering/edf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,33 @@ import (
// time-bound requests, using FCFS as a tie-breaker for fairness.
const EDFOrderingPolicyType = "edf-ordering-policy"

func init() {
plugin.Register(EDFOrderingPolicyType, func(string, json.RawMessage, plugin.Handle) (plugin.Plugin, error) {
return newEDFPolicy(), nil
})
func EDFOrderingPolicyFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
return newEDFPolicy().withName(name), nil
}

// EDFPolicy implements an OrderingPolicy based on the Earliest Deadline First (EDF) scheduling algorithm.
// Requests with earlier absolute deadlines (EnqueueTime + EffectiveTTL) are dispatched first.
// See the documentation for the exported EDFOrderingPolicyType constant for detailed behavioral guarantees.
type EDFPolicy struct{}
type EDFPolicy struct {
name string
}

var _ flowcontrol.OrderingPolicy = &EDFPolicy{}

func newEDFPolicy() *EDFPolicy {
return &EDFPolicy{}
return &EDFPolicy{
name: EDFOrderingPolicyType,
}
}

func (p *EDFPolicy) withName(name string) *EDFPolicy {
if name != "" {
p.name = name
}
return p
}

func (p *EDFPolicy) Name() string {
return EDFOrderingPolicyType
return p.name
}

// RequiredQueueCapabilities returns the queue capabilities required by this policy.
Expand All @@ -62,7 +70,7 @@ func (p *EDFPolicy) RequiredQueueCapabilities() []flowcontrol.QueueCapability {
func (p *EDFPolicy) TypedName() plugin.TypedName {
return plugin.TypedName{
Type: EDFOrderingPolicyType,
Name: EDFOrderingPolicyType,
Name: p.name,
}
}

Expand Down
Loading