Skip to content

Commit 993fc21

Browse files
authored
add scheduling framework extender (#365)
Signed-off-by: saintube <saintube@foxmail.com>
1 parent b2fcc22 commit 993fc21

File tree

3 files changed

+111
-5
lines changed

3 files changed

+111
-5
lines changed

cmd/koord-scheduler/app/server.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ import (
6565
type Option func(frameworkext.ExtendedHandle, runtime.Registry) error
6666

6767
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
68-
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
68+
func NewSchedulerCommand(schedulingHooks []frameworkext.SchedulingPhaseHook, registryOptions ...Option) *cobra.Command {
6969
opts := options.NewOptions()
7070

7171
cmd := &cobra.Command{
@@ -78,7 +78,7 @@ scenarios,ensuring the runtime quality of different workloads and users' demands
7878
for cost reduction and efficiency enhancement.
7979
`,
8080
Run: func(cmd *cobra.Command, args []string) {
81-
if err := runCommand(cmd, opts, registryOptions...); err != nil {
81+
if err := runCommand(cmd, opts, schedulingHooks, registryOptions...); err != nil {
8282
fmt.Fprintf(os.Stderr, "%v\n", err)
8383
os.Exit(1)
8484
}
@@ -110,7 +110,7 @@ for cost reduction and efficiency enhancement.
110110
}
111111

112112
// runCommand runs the scheduler.
113-
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
113+
func runCommand(cmd *cobra.Command, opts *options.Options, schedulingHooks []frameworkext.SchedulingPhaseHook, registryOptions ...Option) error {
114114
verflag.PrintAndExitIfRequested()
115115
cliflag.PrintFlags(cmd.Flags())
116116

@@ -122,7 +122,7 @@ func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Op
122122
cancel()
123123
}()
124124

125-
cc, sched, err := Setup(ctx, opts, registryOptions...)
125+
cc, sched, err := Setup(ctx, opts, schedulingHooks, registryOptions...)
126126
if err != nil {
127127
return err
128128
}
@@ -307,7 +307,7 @@ func WithPlugin(name string, factory runtime.PluginFactory) Option {
307307
}
308308

309309
// Setup creates a completed config and a scheduler based on the command args and options
310-
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
310+
func Setup(ctx context.Context, opts *options.Options, schedulingHooks []frameworkext.SchedulingPhaseHook, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
311311
if cfg, err := latest.Default(); err != nil {
312312
return nil, nil, err
313313
} else {
@@ -373,5 +373,12 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
373373
// TODO(joseph): Some extensions can also be made in the future,
374374
// such as replacing some interfaces in Scheduler to implement custom logic
375375

376+
// extend framework to hook run plugin functions
377+
extendedFrameworkFactory := frameworkext.NewFrameworkExtenderFactory(extendedHandle, schedulingHooks...)
378+
for k, v := range sched.Profiles {
379+
sched.Profiles[k] = extendedFrameworkFactory.New(v)
380+
}
381+
// TODO: register event handlers for scheduler instance
382+
376383
return &cc, sched, nil
377384
}

cmd/koord-scheduler/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/component-base/logs"
2525

2626
"github.com/koordinator-sh/koordinator/cmd/koord-scheduler/app"
27+
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
2728
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/compatibledefaultpreemption"
2829
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/loadaware"
2930
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/nodenumaresource"
@@ -35,10 +36,15 @@ import (
3536
func main() {
3637
rand.Seed(time.Now().UnixNano())
3738

39+
// Register custom scheduling hooks for pre-process scheduling context before call plugins.
40+
// e.g. change the nodeInfo and make a copy before calling filter plugins
41+
var schedulingHooks []frameworkext.SchedulingPhaseHook
42+
3843
// Register custom plugins to the scheduler framework.
3944
// Later they can consist of scheduler profile(s) and hence
4045
// used by various kinds of workloads.
4146
command := app.NewSchedulerCommand(
47+
schedulingHooks,
4248
app.WithPlugin(loadaware.Name, loadaware.New),
4349
app.WithPlugin(nodenumaresource.Name, nodenumaresource.New),
4450
app.WithPlugin(compatibledefaultpreemption.Name, compatibledefaultpreemption.New),

pkg/scheduler/frameworkext/framework_extender.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ limitations under the License.
1717
package frameworkext
1818

1919
import (
20+
"context"
2021
"sync"
2122

2223
nrtinformers "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/informers/externalversions"
24+
corev1 "k8s.io/api/core/v1"
2325
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/klog/v2"
2427
"k8s.io/kubernetes/pkg/scheduler/framework"
2528
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
2629

@@ -96,6 +99,96 @@ func (ext *frameworkExtendedHandleImpl) NodeResourceTopologySharedInformerFactor
9699
return ext.nrtSharedInformerFactory
97100
}
98101

102+
type FrameworkExtender interface {
103+
framework.Framework
104+
}
105+
106+
type FrameworkExtenderFactory interface {
107+
New(f framework.Framework) FrameworkExtender
108+
}
109+
110+
type SchedulingPhaseHook interface {
111+
Name() string
112+
}
113+
114+
type PreFilterPhaseHook interface {
115+
SchedulingPhaseHook
116+
PreFilterHook(handle ExtendedHandle, state *framework.CycleState, pod *corev1.Pod) (*corev1.Pod, bool)
117+
}
118+
119+
type FilterPhaseHook interface {
120+
SchedulingPhaseHook
121+
FilterHook(handle ExtendedHandle, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool)
122+
}
123+
124+
type frameworkExtenderFactoryImpl struct {
125+
handle ExtendedHandle
126+
127+
// extend framework with SchedulingPhaseHook
128+
preFilterHooks []PreFilterPhaseHook
129+
filterHooks []FilterPhaseHook
130+
}
131+
132+
func NewFrameworkExtenderFactory(handle ExtendedHandle, hooks ...SchedulingPhaseHook) FrameworkExtenderFactory {
133+
i := &frameworkExtenderFactoryImpl{
134+
handle: handle,
135+
}
136+
for _, h := range hooks {
137+
// a hook may register in multiple phases
138+
preFilter, ok := h.(PreFilterPhaseHook)
139+
if ok {
140+
i.preFilterHooks = append(i.preFilterHooks, preFilter)
141+
}
142+
filter, ok := h.(FilterPhaseHook)
143+
if ok {
144+
i.filterHooks = append(i.filterHooks, filter)
145+
}
146+
}
147+
return i
148+
}
149+
150+
func (i *frameworkExtenderFactoryImpl) New(f framework.Framework) FrameworkExtender {
151+
return &frameworkExtenderImpl{
152+
Framework: f,
153+
handle: i.handle,
154+
preFilterHooks: i.preFilterHooks,
155+
filterHooks: i.filterHooks,
156+
}
157+
}
158+
159+
var _ framework.Framework = &frameworkExtenderImpl{}
160+
161+
type frameworkExtenderImpl struct {
162+
framework.Framework
163+
handle ExtendedHandle
164+
165+
preFilterHooks []PreFilterPhaseHook
166+
filterHooks []FilterPhaseHook
167+
}
168+
169+
func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod) *framework.Status {
170+
for _, hook := range ext.preFilterHooks {
171+
newPod, hooked := hook.PreFilterHook(ext.handle, cycleState, pod)
172+
if hooked {
173+
klog.V(5).InfoS("RunPreFilterPlugins hooked", "meet PreFilterPhaseHook", "hook", hook.Name(), "pod", klog.KObj(pod))
174+
return ext.Framework.RunPreFilterPlugins(ctx, cycleState, newPod)
175+
}
176+
}
177+
return ext.Framework.RunPreFilterPlugins(ctx, cycleState, pod)
178+
}
179+
180+
func (ext *frameworkExtenderImpl) RunFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) framework.PluginToStatus {
181+
for _, hook := range ext.filterHooks {
182+
// hook can change the args (cycleState, pod, nodeInfo) for filter plugins
183+
newPod, newNodeInfo, hooked := hook.FilterHook(ext.handle, cycleState, pod, nodeInfo)
184+
if hooked {
185+
klog.V(5).InfoS("RunFilterPlugins hooked", "meet FilterPhaseHook", "hook", hook.Name(), "pod", klog.KObj(pod))
186+
return ext.Framework.RunFilterPlugins(ctx, cycleState, newPod, newNodeInfo)
187+
}
188+
}
189+
return ext.Framework.RunFilterPlugins(ctx, cycleState, pod, nodeInfo)
190+
}
191+
99192
// PluginFactoryProxy is used to proxy the call to the PluginFactory function and pass in the ExtendedHandle for the custom plugin
100193
func PluginFactoryProxy(extendHandle ExtendedHandle, factoryFn frameworkruntime.PluginFactory) frameworkruntime.PluginFactory {
101194
return func(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {

0 commit comments

Comments
 (0)