Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
testresponsereceived "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test/responsereceived"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector"
Expand Down Expand Up @@ -369,24 +369,24 @@ func setupDatastore(ctx context.Context, epFactory datalayer.EndpointFactory, mo

// registerInTreePlugins registers the factory functions of all known plugins
func (r *Runner) registerInTreePlugins() {
plugins.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory)
plugins.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
plugins.Register(picker.RandomPickerType, picker.RandomPickerFactory)
plugins.Register(picker.WeightedRandomPickerType, picker.WeightedRandomPickerFactory)
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory)
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
plugins.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory)
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
fwkplugin.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory)
fwkplugin.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
fwkplugin.Register(picker.RandomPickerType, picker.RandomPickerFactory)
fwkplugin.Register(picker.WeightedRandomPickerType, picker.WeightedRandomPickerFactory)
fwkplugin.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
fwkplugin.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory)
fwkplugin.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
fwkplugin.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory)
fwkplugin.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
// Latency predictor plugins
plugins.Register(predicted_latency.PredictedLatencyPluginType, predicted_latency.PredictedLatencyFactory)
fwkplugin.Register(predicted_latency.PredictedLatencyPluginType, predicted_latency.PredictedLatencyFactory)
// register filter for test purpose only (used in conformance tests)
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
fwkplugin.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
// register response received plugin for test purpose only (used in conformance tests)
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
fwkplugin.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
// register datalayer metrics collection plugins
plugins.Register(dlmetrics.MetricsDataSourceType, dlmetrics.MetricsDataSourceFactory)
plugins.Register(dlmetrics.MetricsExtractorType, dlmetrics.ModelServerExtractorFactory)
fwkplugin.Register(dlmetrics.MetricsDataSourceType, dlmetrics.MetricsDataSourceFactory)
fwkplugin.Register(dlmetrics.MetricsExtractorType, dlmetrics.ModelServerExtractorFactory)
}

func (r *Runner) parseConfigurationPhaseOne(ctx context.Context, opts *runserver.Options) (*configapi.EndpointPickerConfig, error) {
Expand Down Expand Up @@ -438,7 +438,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {

func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
logger := log.FromContext(ctx)
handle := plugins.NewEppHandle(ctx, makePodListFunc(ds))
handle := fwkplugin.NewEppHandle(ctx, makePodListFunc(ds))
cfg, err := loader.InstantiateAndConfigure(rawConfig, handle, logger)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bbr/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package plugins

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
)

type BBRPlugin interface {
plugins.Plugin
plugin.Plugin

// Execute runs the plugin logic on the request body and a map of headers.
// A plugin's imnplementation logic CAN mutate the body of the message.
Expand Down
12 changes: 6 additions & 6 deletions pkg/epp/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
framework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
Expand Down Expand Up @@ -77,7 +77,7 @@ func LoadRawConfig(configBytes []byte, logger logr.Logger) (*configapi.EndpointP
// scheduler construction.
func InstantiateAndConfigure(
rawConfig *configapi.EndpointPickerConfig,
handle plugins.Handle,
handle fwkplugin.Handle,
logger logr.Logger,
) (*config.Config, error) {

Expand Down Expand Up @@ -138,7 +138,7 @@ func decodeRawConfig(configBytes []byte) (*configapi.EndpointPickerConfig, error
return cfg, nil
}

func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error {
func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle fwkplugin.Handle) error {
pluginNames := sets.New[string]()
for _, spec := range configuredPlugins {
if spec.Type == "" {
Expand All @@ -149,7 +149,7 @@ func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins
}
pluginNames.Insert(spec.Name)

factory, ok := plugins.Registry[spec.Type]
factory, ok := fwkplugin.Registry[spec.Type]
if !ok {
return fmt.Errorf("plugin type '%s' is not registered", spec.Type)
}
Expand All @@ -166,7 +166,7 @@ func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins

func buildSchedulerConfig(
configProfiles []configapi.SchedulingProfile,
handle plugins.Handle,
handle fwkplugin.Handle,
) (*scheduling.SchedulerConfig, error) {

profiles := make(map[string]*framework.SchedulerProfile)
Expand Down Expand Up @@ -253,7 +253,7 @@ func buildSaturationConfig(apiConfig *configapi.SaturationDetector) *utilization
return cfg
}

func buildDataLayerConfig(rawDataConfig *configapi.DataLayerConfig, dataLayerEnabled bool, handle plugins.Handle) (*datalayer.Config, error) {
func buildDataLayerConfig(rawDataConfig *configapi.DataLayerConfig, dataLayerEnabled bool, handle fwkplugin.Handle) (*datalayer.Config, error) {
if dataLayerEnabled && (rawDataConfig == nil || rawDataConfig.Sources == nil) { // enabled but no configuration
return nil, errors.New("the Datalayer has been enabled. You must specify the Data section in the configuration")
}
Expand Down
52 changes: 26 additions & 26 deletions pkg/epp/config/loader/configloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
framework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
Expand Down Expand Up @@ -161,14 +161,14 @@ func TestInstantiateAndConfigure(t *testing.T) {
name string
configText string
wantErr bool
validate func(t *testing.T, handle plugins.Handle, cfg *configapi.EndpointPickerConfig)
validate func(t *testing.T, handle fwkplugin.Handle, cfg *configapi.EndpointPickerConfig)
}{
// --- Success Scenarios ---
{
name: "Success - Complex Scheduler",
configText: successSchedulerConfigText,
wantErr: false,
validate: func(t *testing.T, handle plugins.Handle, cfg *configapi.EndpointPickerConfig) {
validate: func(t *testing.T, handle fwkplugin.Handle, cfg *configapi.EndpointPickerConfig) {
// 1. Verify all explicit plugins exist in the registry
require.NotNil(t, handle.Plugin("testScorer"), "Explicit scorer should be instantiated")
require.NotNil(t, handle.Plugin("maxScorePicker"), "Explicit picker should be instantiated")
Expand All @@ -192,7 +192,7 @@ func TestInstantiateAndConfigure(t *testing.T) {
name: "Success - Default Scorer Weight",
configText: successWithNoWeightText,
wantErr: false,
validate: func(t *testing.T, _ plugins.Handle, cfg *configapi.EndpointPickerConfig) {
validate: func(t *testing.T, _ fwkplugin.Handle, cfg *configapi.EndpointPickerConfig) {
require.Len(t, cfg.SchedulingProfiles, 1, "Unexpected profile structure")
require.Len(t, cfg.SchedulingProfiles[0].Plugins, 2, "Expected Scorer + Default Picker")
w := cfg.SchedulingProfiles[0].Plugins[0].Weight
Expand All @@ -204,7 +204,7 @@ func TestInstantiateAndConfigure(t *testing.T) {
name: "Success - Default Profile Handler Injection",
configText: successWithNoProfileHandlersText,
wantErr: false,
validate: func(t *testing.T, handle plugins.Handle, cfg *configapi.EndpointPickerConfig) {
validate: func(t *testing.T, handle fwkplugin.Handle, cfg *configapi.EndpointPickerConfig) {
require.True(t, hasPluginType(handle, profile.SingleProfileHandlerType),
"Defaults: SingleProfileHandler was not injected")
},
Expand All @@ -213,7 +213,7 @@ func TestInstantiateAndConfigure(t *testing.T) {
name: "Success - Picker Before Scorer",
configText: successPickerBeforeScorerText,
wantErr: false,
validate: func(t *testing.T, _ plugins.Handle, cfg *configapi.EndpointPickerConfig) {
validate: func(t *testing.T, _ fwkplugin.Handle, cfg *configapi.EndpointPickerConfig) {
require.Len(t, cfg.SchedulingProfiles, 1)
prof := cfg.SchedulingProfiles[0]
require.Equal(t, "test-picker", prof.Plugins[0].PluginRef, "Picker should be the first plugin")
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestBuildSaturationConfig(t *testing.T) {

// --- Helpers & Mocks ---

func hasPluginType(handle plugins.Handle, typeName string) bool {
func hasPluginType(handle fwkplugin.Handle, typeName string) bool {
for _, p := range handle.GetAllPlugins() {
if p.TypedName().Type == typeName {
return true
Expand All @@ -405,10 +405,10 @@ func hasPluginType(handle plugins.Handle, typeName string) bool {
}

type mockPlugin struct {
t plugins.TypedName
t fwkplugin.TypedName
}

func (m *mockPlugin) TypedName() plugins.TypedName { return m.t }
func (m *mockPlugin) TypedName() fwkplugin.TypedName { return m.t }

// Mock Scorer
type mockScorer struct{ mockPlugin }
Expand Down Expand Up @@ -479,20 +479,20 @@ func registerTestPlugins(t *testing.T) {
t.Helper()

// Helper to generate simple factories.
register := func(name string, factory plugins.FactoryFunc) {
plugins.Register(name, factory)
register := func(name string, factory fwkplugin.FactoryFunc) {
fwkplugin.Register(name, factory)
}

mockFactory := func(tType string) plugins.FactoryFunc {
return func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &mockPlugin{t: plugins.TypedName{Name: name, Type: tType}}, nil
mockFactory := func(tType string) fwkplugin.FactoryFunc {
return func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return &mockPlugin{t: fwkplugin.TypedName{Name: name, Type: tType}}, nil
}
}

// Register standard test mocks.
register(testPluginType, mockFactory(testPluginType))

plugins.Register(testScorerType, func(name string, params json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
fwkplugin.Register(testScorerType, func(name string, params json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
// Attempt to unmarshal to trigger errors for invalid JSON in tests.
if len(params) > 0 {
var p struct {
Expand All @@ -502,26 +502,26 @@ func registerTestPlugins(t *testing.T) {
return nil, err
}
}
return &mockScorer{mockPlugin{t: plugins.TypedName{Name: name, Type: testScorerType}}}, nil
return &mockScorer{mockPlugin{t: fwkplugin.TypedName{Name: name, Type: testScorerType}}}, nil
})

plugins.Register(testPickerType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &mockPicker{mockPlugin{t: plugins.TypedName{Name: name, Type: testPickerType}}}, nil
fwkplugin.Register(testPickerType, func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return &mockPicker{mockPlugin{t: fwkplugin.TypedName{Name: name, Type: testPickerType}}}, nil
})

plugins.Register(testProfileHandler, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &mockHandler{mockPlugin{t: plugins.TypedName{Name: name, Type: testProfileHandler}}}, nil
fwkplugin.Register(testProfileHandler, func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return &mockHandler{mockPlugin{t: fwkplugin.TypedName{Name: name, Type: testProfileHandler}}}, nil
})

plugins.Register(testSourceType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &mockSource{mockPlugin{t: plugins.TypedName{Name: name, Type: testSourceType}}}, nil
fwkplugin.Register(testSourceType, func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return &mockSource{mockPlugin{t: fwkplugin.TypedName{Name: name, Type: testSourceType}}}, nil
})

plugins.Register(testExtractorType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &mockExtractor{mockPlugin{t: plugins.TypedName{Name: name, Type: testExtractorType}}}, nil
fwkplugin.Register(testExtractorType, func(name string, _ json.RawMessage, _ fwkplugin.Handle) (fwkplugin.Plugin, error) {
return &mockExtractor{mockPlugin{t: fwkplugin.TypedName{Name: name, Type: testExtractorType}}}, nil
})

// Ensure system defaults are registered too.
plugins.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
fwkplugin.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
fwkplugin.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
}
16 changes: 8 additions & 8 deletions pkg/epp/config/loader/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
framework "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
)
Expand Down Expand Up @@ -50,7 +50,7 @@ func applyStaticDefaults(cfg *configapi.EndpointPickerConfig) {
// applySystemDefaults injects required components that were omitted from the config.
// It handles "System" defaults: logic that requires inspecting instantiated plugins (via the handle) to ensure the
// system graph is complete.
func applySystemDefaults(cfg *configapi.EndpointPickerConfig, handle plugins.Handle) error {
func applySystemDefaults(cfg *configapi.EndpointPickerConfig, handle fwkplugin.Handle) error {
allPlugins := handle.GetAllPluginsWithNames()
if err := ensureSchedulingLayer(cfg, handle, allPlugins); err != nil {
return fmt.Errorf("failed to apply scheduling system defaults: %w", err)
Expand All @@ -66,8 +66,8 @@ func applySystemDefaults(cfg *configapi.EndpointPickerConfig, handle plugins.Han
// they are not explicitly configured.
func ensureSchedulingLayer(
cfg *configapi.EndpointPickerConfig,
handle plugins.Handle,
allPlugins map[string]plugins.Plugin,
handle fwkplugin.Handle,
allPlugins map[string]fwkplugin.Plugin,
) error {
if len(cfg.SchedulingProfiles) == 0 {
defaultProfile := configapi.SchedulingProfile{Name: "default"}
Expand Down Expand Up @@ -141,8 +141,8 @@ func ensureSchedulingLayer(
// ensureFlowControlLayer guarantees that the flow control subsystem is structurally complete.
func ensureFlowControlLayer(
cfg *configapi.EndpointPickerConfig,
handle plugins.Handle,
_ map[string]plugins.Plugin,
handle fwkplugin.Handle,
_ map[string]fwkplugin.Plugin,
) error {
return registerDefaultPlugin(cfg, handle, interflow.GlobalStrictFairnessPolicyType)
}
Expand All @@ -151,11 +151,11 @@ func ensureFlowControlLayer(
// the config spec.
func registerDefaultPlugin(
cfg *configapi.EndpointPickerConfig,
handle plugins.Handle,
handle fwkplugin.Handle,
pluginType string,
) error {
name := pluginType
factory, ok := plugins.Registry[pluginType]
factory, ok := fwkplugin.Registry[pluginType]
if !ok {
return fmt.Errorf("plugin type '%s' not found in registry", pluginType)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"reflect"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
)

// DataSource provides raw data to registered Extractors.
type DataSource interface {
plugins.Plugin
plugin.Plugin
// Extractors returns a list of registered Extractor names.
Extractors() []string
// AddExtractor adds an extractor to the data source. Multiple
Expand All @@ -46,7 +46,7 @@ type DataSource interface {

// Extractor transforms raw data into structured attributes.
type Extractor interface {
plugins.Plugin
plugin.Plugin
// ExpectedType defines the type expected by the extractor.
ExpectedInputType() reflect.Type
// Extract transforms the raw data source output into a concrete structured
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/datalayer/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/stretchr/testify/assert"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
)

const (
Expand All @@ -31,7 +31,7 @@ const (

func TestRegisterAndGetSource(t *testing.T) {
reg := DataSourceRegistry{}
ds := &FakeDataSource{typedName: &plugins.TypedName{Type: testType, Name: testType}}
ds := &FakeDataSource{typedName: &fwkplugin.TypedName{Type: testType, Name: testType}}

err := reg.Register(ds)
assert.NoError(t, err, "expected no error on first registration")
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/datalayer/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ import (
"sync/atomic"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugin"
)

const (
fakeSource = "fake-data-source"
)

type FakeDataSource struct {
typedName *plugins.TypedName
typedName *plugin.TypedName
callCount int64
Metrics map[types.NamespacedName]*Metrics
Errors map[types.NamespacedName]error
}

func (fds *FakeDataSource) TypedName() plugins.TypedName {
func (fds *FakeDataSource) TypedName() plugin.TypedName {
if fds.typedName != nil {
return *fds.typedName
}
return plugins.TypedName{
return plugin.TypedName{
Type: fakeSource,
Name: fakeSource,
}
Expand Down
Loading