Skip to content
21 changes: 20 additions & 1 deletion cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (

defaultQPS = 50.0
defaultBurst = 100

// Default arameters to control the number of feasible nodes to find and score
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/arameters/parameters ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the errors. make verify and unit-test passed. Thanks.

defaultMinPercentageOfNodesToFind=5
defaultMinNodesToFind=100
defaultPercentageOfNodesToFind=100
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -50,9 +55,14 @@ type ServerOption struct {
EnablePriorityClass bool
KubeAPIBurst int
KubeAPIQPS float32
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// HealthzBindAddress is the IP address and port for the health check server to serve on
// defaulting to 127.0.0.1:11251
HealthzBindAddress string

// Parameters for scheduling tuning: the number of feasible nodes to find and score
MinNodesToFind int32
MinPercentageOfNodesToFind int32
PercentageOfNodesToFind int32
}

// ServerOpts server options
Expand Down Expand Up @@ -84,6 +94,15 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")

// Minimum number of feasible nodes to find and score
fs.Int32Var(&s.MinNodesToFind,"minimum-feasible-nodes", defaultMinNodesToFind, "The minimum number of feasible nodes to find and score")

// Minimum percentage of nodes to find and score
fs.Int32Var(&s.MinPercentageOfNodesToFind,"minimum-percentage-nodes-to-find", defaultMinPercentageOfNodesToFind, "The minimum percentage of nodes to find and score")

// The percentage of nodes that would be scored in each scheduling cycle; if <= 0, an adpative percentage will be calcuated
fs.Int32Var(&s.PercentageOfNodesToFind,"percentage-nodes-to-find", defaultPercentageOfNodesToFind,"The percentage of nodes to find and score, if <=0 will be calcuated based on the cluster size")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
Expand Down
3 changes: 3 additions & 0 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func TestAddFlags(t *testing.T) {
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
HealthzBindAddress: "127.0.0.1:11251",
MinNodesToFind defaultMinNodesToFind
MinPercentageOfNodesToFind defaultMinPercentageOfNodesToFind
PercentageOfNodesToFind defaultPercentageOfNodesToFind
}

if !reflect.DeepEqual(expected, s) {
Expand Down
78 changes: 64 additions & 14 deletions pkg/scheduler/util/scheduler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,67 @@ import (
"math/rand"
"sort"
"sync"

"sync/atomic"
"github.com/golang/glog"
"k8s.io/client-go/util/workqueue"

"k8s.io/client-go/util/workqueue"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/cmd/scheduler/app/options"
)

// PredicateNodes returns nodes that fit task
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
var predicateNodes []*api.NodeInfo

var workerLock sync.Mutex
const baselinePercentageOfNodesToFind = 50
var lastProcessedNodeIndex int

// CalaculateNumFeasibleNodesToFind returns the number of feasible nodes that once found,
// the scheduler stops its search for more feasible nodes.
func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
opts := options.ServerOpts
if numAllNodes <= opts.MinNodesToFind || opts.PercentageOfNodesToFind >= 100 {
return numAllNodes
}

adaptivePercentage := opts.PercentageOfNodesToFind
if adaptivePercentage <= 0 {
adaptivePercentage = baselinePercentageOfNodesToFind - numAllNodes/125
if adaptivePercentage < opts.MinPercentageOfNodesToFind {
adaptivePercentage = opts.MinPercentageOfNodesToFind
}
}

numNodes = numAllNodes * adaptivePercentage / 100
if numNodes < opts.MinNodesToFind {
numNodes = opts.MinNodesToFind
}
return numNodes
}

// PredicateNodes returns the specified number of nodes that fit a task
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
//var workerLock sync.Mutex

var errorLock sync.Mutex
fe := api.NewFitErrors()

allNodes := len(nodes)
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))

//allocate enough space to avoid growing it
predicateNodes := make([]*api.NodeInfo, numNodesToFind)

numFoundNodes := int32(0)
processedNodes := int32(0)

//create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())

checkNode := func(index int) {
node := nodes[index]
// Check the nodes starting from where is left off in the previous scheduling cycle,
// to make sure all nodes have the same chance of being examined across pods.
node := nodes[(lastProcessedNodeIndex+index) % allNodes]
atomic.AddInt32(&processedNodes,1)
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)

// TODO (k82cn): Enable eCache for performance improvement.
if err := fn(task, node); err != nil {
Expand All @@ -54,12 +94,22 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF
return
}

workerLock.Lock()
predicateNodes = append(predicateNodes, node)
workerLock.Unlock()
}
//check if the number of found nodes is more than the numNodesTofind
length := atomic.AddInt32(&numFoundNodes, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&numFoundNodes, -1)
} else {
predicateNodes[length-1] = node
}
}

//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
predicateNodes = predicateNodes[:numFoundNodes]
return predicateNodes, fe
}

Expand Down