diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 2092e0c52e1..bc069bef73e 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -33,6 +33,11 @@ const ( defaultQPS = 50.0 defaultBurst = 100 + + // Default parameters to control the number of feasible nodes to find and score + defaultMinPercentageOfNodesToFind = 5 + defaultMinNodesToFind = 100 + defaultPercentageOfNodesToFind = 100 ) // ServerOption is the main context object for the controller manager. @@ -50,9 +55,14 @@ type ServerOption struct { EnablePriorityClass bool KubeAPIBurst int KubeAPIQPS float32 - // HealthzBindAddress is the IP address and port for the health check server to serve on, + // HealthzBindAddress is the IP address and port for the health check server to serve on // defaulting to 127.0.0.1:11251 HealthzBindAddress string + + // Parameters for scheduling tuning: the number of feasible nodes to find and score + MinNodesToFind int32 + MinPercentageOfNodesToFind int32 + PercentageOfNodesToFind int32 } // ServerOpts server options @@ -84,6 +94,15 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.") + + // Minimum number of feasible nodes to find and score + fs.Int32Var(&s.MinNodesToFind, "minimum-feasible-nodes", defaultMinNodesToFind, "The minimum number of feasible nodes to find and score") + + // Minimum percentage of nodes to find and score + fs.Int32Var(&s.MinPercentageOfNodesToFind, "minimum-percentage-nodes-to-find", defaultMinPercentageOfNodesToFind, "The minimum percentage of nodes to find and score") + + // The percentage of nodes that would be scored in each scheduling cycle; if <= 0, an adpative percentage will be calcuated + fs.Int32Var(&s.PercentageOfNodesToFind, "percentage-nodes-to-find", defaultPercentageOfNodesToFind, "The percentage of nodes to find and score, if <=0 will be calcuated based on the cluster size") } // CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index 490a4a64d75..505898b8dda 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -37,13 +37,16 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerOption{ - SchedulerName: defaultSchedulerName, - SchedulePeriod: 5 * time.Minute, - DefaultQueue: defaultQueue, - ListenAddress: defaultListenAddress, - KubeAPIBurst: defaultBurst, - KubeAPIQPS: defaultQPS, - HealthzBindAddress: "127.0.0.1:11251", + SchedulerName: defaultSchedulerName, + SchedulePeriod: 5 * time.Minute, + DefaultQueue: defaultQueue, + ListenAddress: defaultListenAddress, + KubeAPIBurst: defaultBurst, + KubeAPIQPS: defaultQPS, + HealthzBindAddress: "127.0.0.1:11251", + MinNodesToFind: defaultMinNodesToFind, + MinPercentageOfNodesToFind: defaultMinPercentageOfNodesToFind, + PercentageOfNodesToFind: defaultPercentageOfNodesToFind, } if !reflect.DeepEqual(expected, s) { diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 8b64cfc018b..cc1ed9452e0 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" + "volcano.sh/volcano/cmd/scheduler/app/options" kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/cache" @@ -38,6 +39,13 @@ import ( func TestAllocate(t *testing.T) { framework.RegisterPluginBuilder("drf", drf.New) framework.RegisterPluginBuilder("proportion", proportion.New) + + options.ServerOpts = &options.ServerOption{ + MinNodesToFind: 100, + MinPercentageOfNodesToFind: 5, + PercentageOfNodesToFind: 100, + } + defer framework.CleanupPluginBuilders() tests := []struct { diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index 87970fbb1c9..fe2a2a80421 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" + "volcano.sh/volcano/cmd/scheduler/app/options" kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/cache" @@ -37,6 +38,11 @@ import ( func TestPreempt(t *testing.T) { framework.RegisterPluginBuilder("conformance", conformance.New) framework.RegisterPluginBuilder("gang", gang.New) + options.ServerOpts = &options.ServerOption{ + MinNodesToFind: 100, + MinPercentageOfNodesToFind: 5, + PercentageOfNodesToFind: 100, + } defer framework.CleanupPluginBuilders() tests := []struct { diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 7c7aba39936..be3086aa4d9 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -18,29 +18,69 @@ package util import ( "context" + "github.com/golang/glog" + "k8s.io/client-go/util/workqueue" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "math" "math/rand" "sort" "sync" - - "github.com/golang/glog" - "k8s.io/client-go/util/workqueue" - - schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + "sync/atomic" + "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/api" ) -// PredicateNodes returns nodes that fit task -func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) { - var predicateNodes []*api.NodeInfo +const baselinePercentageOfNodesToFind = 50 - var workerLock sync.Mutex +var lastProcessedNodeIndex int + +// CalculateNumOfFeasibleNodesToFind returns the number of feasible nodes that once found, +// the scheduler stops its search for more feasible nodes. +func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) { + opts := options.ServerOpts + if numAllNodes <= opts.MinNodesToFind || opts.PercentageOfNodesToFind >= 100 { + return numAllNodes + } + + adaptivePercentage := opts.PercentageOfNodesToFind + if adaptivePercentage <= 0 { + adaptivePercentage = baselinePercentageOfNodesToFind - numAllNodes/125 + if adaptivePercentage < opts.MinPercentageOfNodesToFind { + adaptivePercentage = opts.MinPercentageOfNodesToFind + } + } + + numNodes = numAllNodes * adaptivePercentage / 100 + if numNodes < opts.MinNodesToFind { + numNodes = opts.MinNodesToFind + } + return numNodes +} + +// PredicateNodes returns the specified number of nodes that fit a task +func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) { + //var workerLock sync.Mutex var errorLock sync.Mutex fe := api.NewFitErrors() + allNodes := len(nodes) + numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes)) + + //allocate enough space to avoid growing it + predicateNodes := make([]*api.NodeInfo, numNodesToFind) + + numFoundNodes := int32(0) + processedNodes := int32(0) + + //create a context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + checkNode := func(index int) { - node := nodes[index] + // Check the nodes starting from where is left off in the previous scheduling cycle, + // to make sure all nodes have the same chance of being examined across pods. + node := nodes[(lastProcessedNodeIndex+index)%allNodes] + atomic.AddInt32(&processedNodes, 1) glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", task.Namespace, task.Name, node.Name, task.Resreq, node.Idle) @@ -54,12 +94,22 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF return } - workerLock.Lock() - predicateNodes = append(predicateNodes, node) - workerLock.Unlock() + //check if the number of found nodes is more than the numNodesTofind + length := atomic.AddInt32(&numFoundNodes, 1) + if length > numNodesToFind { + cancel() + atomic.AddInt32(&numFoundNodes, -1) + } else { + predicateNodes[length-1] = node + } } - workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode) + //workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode) + workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) + + //processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap) + lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes + predicateNodes = predicateNodes[:numFoundNodes] return predicateNodes, fe }