Skip to content

Commit 7637f8c

Browse files
localai-botmudler
andauthored
feat(distributed): declarative per-model scheduling via env/args (#10308)
* feat(distributed): add SpreadAll column and authoritative scheduling seeding Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): parse declarative model scheduling config (env/file) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): reconcile spread_all to one replica per matching node Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): wire LOCALAI_MODEL_SCHEDULING env/args and startup seeding Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): expose spread_all on the scheduling API endpoint Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): add spread-to-all-nodes mode to the scheduling UI Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): document LOCALAI_MODEL_SCHEDULING env/args Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): clarify replica modes and all-nodes spread in scheduling config Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
1 parent f0e001b commit 7637f8c

13 files changed

Lines changed: 577 additions & 11 deletions

File tree

core/application/distributed.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,21 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
161161
}
162162
xlog.Info("Node registry initialized")
163163

164+
// Seed declarative per-model scheduling config (LOCALAI_MODEL_SCHEDULING /
165+
// LOCALAI_MODEL_SCHEDULING_CONFIG). Authoritative: overwrites matching models
166+
// on every boot. Runs before the reconciler starts so the first tick already
167+
// sees the desired state. Models not listed are left untouched.
168+
if cfg.Distributed.ModelSchedulingJSON != "" || cfg.Distributed.ModelSchedulingConfigPath != "" {
169+
schedConfigs, err := nodes.ParseSchedulingSeed(cfg.Distributed.ModelSchedulingJSON, cfg.Distributed.ModelSchedulingConfigPath)
170+
if err != nil {
171+
return nil, fmt.Errorf("parsing declarative model scheduling config: %w", err)
172+
}
173+
if err := registry.SeedModelScheduling(context.Background(), schedConfigs); err != nil {
174+
return nil, fmt.Errorf("seeding declarative model scheduling config: %w", err)
175+
}
176+
xlog.Info("Applied declarative model scheduling config", "models", len(schedConfigs))
177+
}
178+
164179
// Collect SmartRouter option values; the router itself is created after all
165180
// dependencies (including FileStager and Unloader) are ready.
166181
var routerAuthToken string

core/cli/run.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ type RunCMD struct {
172172
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
173173
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
174174
ExposeNodeHeader bool `env:"LOCALAI_EXPOSE_NODE_HEADER" default:"false" help:"Set the X-LocalAI-Node response header on inference responses (OpenAI chat/completions/embeddings, Anthropic /v1/messages, Ollama /api/chat,/api/generate,/api/embed) with the ID of the worker that served the request. Disabled by default: the node ID reveals internal topology and should not be exposed on a public endpoint. Best-effort: under heavy concurrency the header may reflect a recent routing decision rather than this exact request's." group:"distributed"`
175+
ModelScheduling string `env:"LOCALAI_MODEL_SCHEDULING" help:"Declarative per-model scheduling config applied at startup (inline JSON list of {model_name,node_selector,min_replicas,max_replicas,replicas:\"all\"}). Authoritative: overwrites matching models on every boot. Distributed mode only." group:"distributed"`
176+
ModelSchedulingConfig string `env:"LOCALAI_MODEL_SCHEDULING_CONFIG" help:"Path to a YAML file with the same per-model scheduling list as LOCALAI_MODEL_SCHEDULING. Distributed mode only." group:"distributed"`
175177

176178
Version bool
177179

@@ -347,6 +349,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
347349
if r.ExposeNodeHeader {
348350
opts = append(opts, config.WithExposeNodeHeader(true))
349351
}
352+
if r.ModelScheduling != "" {
353+
opts = append(opts, config.WithModelSchedulingJSON(r.ModelScheduling))
354+
}
355+
if r.ModelSchedulingConfig != "" {
356+
opts = append(opts, config.WithModelSchedulingConfigPath(r.ModelSchedulingConfig))
357+
}
358+
if !r.Distributed && (r.ModelScheduling != "" || r.ModelSchedulingConfig != "") {
359+
xlog.Warn("LOCALAI_MODEL_SCHEDULING / LOCALAI_MODEL_SCHEDULING_CONFIG is set but distributed mode is disabled (LOCALAI_DISTRIBUTED=false) - ignoring")
360+
}
350361

351362
if r.DisableMetricsEndpoint {
352363
opts = append(opts, config.DisableMetricsEndpoint)

core/config/distributed_config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ type DistributedConfig struct {
8484
// drives the background eviction cadence (eviction runs every TTL/2). Zero
8585
// means use the prefixcache package default (5m).
8686
PrefixCacheTTL time.Duration
87+
// ModelSchedulingJSON is an inline JSON list of per-model scheduling configs
88+
// applied authoritatively at startup (LOCALAI_MODEL_SCHEDULING).
89+
ModelSchedulingJSON string
90+
// ModelSchedulingConfigPath is a path to a YAML file with the same list
91+
// (LOCALAI_MODEL_SCHEDULING_CONFIG).
92+
ModelSchedulingConfigPath string
8793
}
8894

8995
// Validate checks that the distributed configuration is internally consistent.
@@ -290,6 +296,21 @@ func WithPrefixCacheTTL(d time.Duration) AppOption {
290296
}
291297
}
292298

299+
// WithModelSchedulingJSON sets the inline-JSON declarative scheduling config.
300+
func WithModelSchedulingJSON(s string) AppOption {
301+
return func(o *ApplicationConfig) {
302+
o.Distributed.ModelSchedulingJSON = s
303+
}
304+
}
305+
306+
// WithModelSchedulingConfigPath sets the path to a YAML declarative scheduling
307+
// config file.
308+
func WithModelSchedulingConfigPath(path string) AppOption {
309+
return func(o *ApplicationConfig) {
310+
o.Distributed.ModelSchedulingConfigPath = path
311+
}
312+
}
313+
293314
// Flag names for distributed timeout / interval configuration. These are
294315
// the kebab-case identifiers kong derives from the matching RunCMD struct
295316
// fields; they appear in Validate error messages and any other operator-

core/http/endpoints/localai/nodes.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,12 +937,13 @@ func GetSchedulingEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
937937
// distinguishable from an explicit zero. On update, an omitted prefix-cache
938938
// field preserves the model's previously-configured value instead of resetting
939939
// it (see SetSchedulingEndpoint's PATCH-style merge). ModelName, NodeSelector,
940-
// MinReplicas and MaxReplicas keep their full-replace PUT semantics.
940+
// MinReplicas, MaxReplicas and SpreadAll keep their full-replace PUT semantics.
941941
type SetSchedulingRequest struct {
942942
ModelName string `json:"model_name"`
943943
NodeSelector map[string]string `json:"node_selector,omitempty"`
944944
MinReplicas int `json:"min_replicas"`
945945
MaxReplicas int `json:"max_replicas"`
946+
SpreadAll bool `json:"spread_all,omitempty"`
946947
RoutePolicy *string `json:"route_policy,omitempty"`
947948
BalanceAbsThreshold *int `json:"balance_abs_threshold,omitempty"`
948949
BalanceRelThreshold *float64 `json:"balance_rel_threshold,omitempty"`
@@ -959,6 +960,9 @@ func validateSchedulingRequest(req SetSchedulingRequest, routePolicy string, abs
959960
if req.ModelName == "" {
960961
return errors.New("model_name is required")
961962
}
963+
if req.SpreadAll && (req.MinReplicas != 0 || req.MaxReplicas != 0) {
964+
return errors.New("spread_all and min_replicas/max_replicas are mutually exclusive")
965+
}
962966
if req.MinReplicas < 0 {
963967
return errors.New("min_replicas must be >= 0")
964968
}
@@ -1045,6 +1049,7 @@ func SetSchedulingEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
10451049
NodeSelector: selectorJSON,
10461050
MinReplicas: req.MinReplicas,
10471051
MaxReplicas: req.MaxReplicas,
1052+
SpreadAll: req.SpreadAll,
10481053
RoutePolicy: routePolicy,
10491054
BalanceAbsThreshold: absThr,
10501055
BalanceRelThreshold: relThr,
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package localai
2+
3+
import (
4+
. "github.com/onsi/ginkgo/v2"
5+
. "github.com/onsi/gomega"
6+
)
7+
8+
var _ = Describe("validateSchedulingRequest spread_all", func() {
9+
It("rejects spread_all combined with min_replicas", func() {
10+
err := validateSchedulingRequest(SetSchedulingRequest{
11+
ModelName: "m", SpreadAll: true, MinReplicas: 2,
12+
}, "", 0, 0, 0)
13+
Expect(err).To(MatchError(ContainSubstring("mutually exclusive")))
14+
})
15+
16+
It("accepts spread_all alone", func() {
17+
err := validateSchedulingRequest(SetSchedulingRequest{
18+
ModelName: "m", SpreadAll: true,
19+
}, "", 0, 0, 0)
20+
Expect(err).ToNot(HaveOccurred())
21+
})
22+
})

core/http/react-ui/src/pages/Nodes.jsx

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -506,15 +506,17 @@ function SchedulingForm({ onSave, onCancel }) {
506506
const isValid = () => {
507507
if (!modelName) return false
508508
if (mode === 'placement') return hasSelector
509+
if (mode === 'spread') return true
509510
return minReplicas > 0 || maxReplicas > 0
510511
}
511512

512513
const handleSubmit = () => {
513514
onSave({
514515
model_name: modelName,
515516
node_selector: hasSelector ? selector : undefined,
516-
min_replicas: mode === 'placement' ? 0 : minReplicas,
517-
max_replicas: mode === 'placement' ? 0 : maxReplicas,
517+
min_replicas: mode === 'autoscaling' ? minReplicas : 0,
518+
max_replicas: mode === 'autoscaling' ? maxReplicas : 0,
519+
spread_all: mode === 'spread',
518520
route_policy: routePolicy,
519521
balance_abs_threshold: balanceAbsThreshold,
520522
balance_rel_threshold: balanceRelThreshold,
@@ -542,10 +544,19 @@ function SchedulingForm({ onSave, onCancel }) {
542544
>
543545
<i className="fas fa-arrows-up-down" aria-hidden="true" /> Auto-scale
544546
</button>
547+
<button
548+
type="button" role="radio" aria-checked={mode === 'spread'}
549+
className={`segmented__item${mode === 'spread' ? ' is-active' : ''}`}
550+
onClick={() => setMode('spread')}
551+
>
552+
<i className="fas fa-network-wired" aria-hidden="true" /> Spread to all
553+
</button>
545554
</div>
546555
<p style={{ fontSize: '0.8125rem', color: 'var(--color-text-muted)', margin: '0 0 var(--spacing-lg) 0' }}>
547556
{mode === 'placement'
548557
? 'Restrict this model to specific nodes. Loaded on demand, evictable when idle.'
558+
: mode === 'spread'
559+
? 'Run one replica on every node matching the selector (all healthy nodes when empty). Tracks nodes joining and leaving.'
549560
: 'Maintain a target replica count across the cluster. Min \u2265 1 protects from eviction.'}
550561
</p>
551562

@@ -1563,10 +1574,11 @@ export default function Nodes() {
15631574
</tr></thead>
15641575
<tbody>
15651576
{schedulingConfigs.map(cfg => {
1566-
const isAutoScaling = cfg.min_replicas > 0 || cfg.max_replicas > 0
1577+
const isSpread = !!cfg.spread_all
1578+
const isAutoScaling = !isSpread && (cfg.min_replicas > 0 || cfg.max_replicas > 0)
15671579
const hasSelector = !!cfg.node_selector
1568-
const modeLabel = isAutoScaling ? 'Auto-scaling' : hasSelector ? 'Placement' : 'Inactive'
1569-
const modeColor = isAutoScaling ? 'var(--color-success)' : hasSelector ? 'var(--color-primary)' : 'var(--color-text-muted)'
1580+
const modeLabel = isSpread ? 'Spread' : isAutoScaling ? 'Auto-scaling' : hasSelector ? 'Placement' : 'Inactive'
1581+
const modeColor = isSpread ? 'var(--color-warning)' : isAutoScaling ? 'var(--color-success)' : hasSelector ? 'var(--color-primary)' : 'var(--color-text-muted)'
15701582
// Cooldown: reconciler tripped the circuit breaker because cluster
15711583
// capacity is exhausted. Surface so the operator sees it instead
15721584
// of the model silently failing to scale.
@@ -1597,10 +1609,16 @@ export default function Nodes() {
15971609
})() : <span style={{ color: 'var(--color-text-muted)', fontSize: '0.8125rem' }}>Any node</span>}
15981610
</td>
15991611
<td style={{ fontFamily: 'var(--font-mono)' }}>
1600-
{isAutoScaling ? cfg.min_replicas : '-'}
1612+
{isSpread
1613+
? <span style={{
1614+
display: 'inline-block', fontSize: '0.75rem', padding: '2px 8px', borderRadius: "var(--radius-sm)",
1615+
background: 'var(--color-bg-tertiary)', border: '1px solid var(--color-warning)',
1616+
color: 'var(--color-warning)', fontWeight: 600, fontFamily: 'var(--font-sans)',
1617+
}}>Spread: all matching nodes</span>
1618+
: isAutoScaling ? cfg.min_replicas : '-'}
16011619
</td>
16021620
<td style={{ fontFamily: 'var(--font-mono)' }}>
1603-
{isAutoScaling ? (cfg.max_replicas || 'no limit') : '-'}
1621+
{isSpread ? '-' : isAutoScaling ? (cfg.max_replicas || 'no limit') : '-'}
16041622
</td>
16051623
<td style={{ fontSize: '0.8125rem' }}>
16061624
{cfg.route_policy || 'default'}

core/services/nodes/reconciler.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,28 @@ func (rc *ReplicaReconciler) candidateNodeIDsForSelector(ctx context.Context, cf
399399
}
400400

401401
func (rc *ReplicaReconciler) reconcileModel(ctx context.Context, cfg ModelSchedulingConfig) {
402+
// spread_all: derive a dynamic replica target equal to the number of nodes
403+
// currently matching the selector (all healthy backend nodes when the
404+
// selector is empty). Feeding it through Min==Max==target reuses every
405+
// existing path: the floor scales up toward target (capped at capacity),
406+
// Max==target stops busy-burst/pressure overshooting, and idle scale-down
407+
// trims above target. The target re-tracks node join/leave each tick. cfg is
408+
// a by-value copy, so mutating it here is local to this tick.
409+
if cfg.SpreadAll {
410+
matched, err := rc.registry.FindNodesBySelector(ctx, parseSelector(cfg.NodeSelector))
411+
if err != nil {
412+
xlog.Warn("Reconciler: spread_all failed to resolve matching nodes", "model", cfg.ModelName, "error", err)
413+
return
414+
}
415+
if len(matched) == 0 {
416+
xlog.Info("Reconciler: spread_all has no matching nodes; nothing to schedule",
417+
"model", cfg.ModelName, "selector", cfg.NodeSelector)
418+
return
419+
}
420+
cfg.MinReplicas = len(matched)
421+
cfg.MaxReplicas = len(matched)
422+
}
423+
402424
// Cooldown gate: if we previously decided this config is unsatisfiable,
403425
// don't even bother checking until the cooldown expires. ClearAllUnsatisfiable
404426
// (fired by node lifecycle events) bypasses this by zeroing the column.

core/services/nodes/reconciler_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ func (f *fakeScheduler) ScheduleAndLoadModel(_ context.Context, modelName string
3434
return f.scheduleNode, f.scheduleErr
3535
}
3636

37+
func mustGetSched(r *NodeRegistry, model string) ModelSchedulingConfig {
38+
cfg, err := r.GetModelScheduling(context.Background(), model)
39+
Expect(err).ToNot(HaveOccurred())
40+
Expect(cfg).ToNot(BeNil())
41+
return *cfg
42+
}
43+
3744
var _ = Describe("ReplicaReconciler", func() {
3845
var (
3946
db *gorm.DB
@@ -78,6 +85,45 @@ var _ = Describe("ReplicaReconciler", func() {
7885
Expect(registry.SetModelScheduling(context.Background(), cfg)).To(Succeed())
7986
}
8087

88+
Context("spread_all mode", func() {
89+
It("targets one replica per matching node (empty selector = all nodes)", func() {
90+
n1 := registerNode("s1", "10.1.0.1:50051")
91+
registerNode("s2", "10.1.0.2:50051")
92+
// spread config, no selector -> all healthy backend nodes (2)
93+
Expect(registry.SetModelScheduling(context.Background(), &ModelSchedulingConfig{
94+
ModelName: "spread-model", SpreadAll: true,
95+
})).To(Succeed())
96+
97+
scheduler := &fakeScheduler{scheduleNode: n1}
98+
reconciler := NewReplicaReconciler(ReplicaReconcilerOptions{
99+
Registry: registry,
100+
Scheduler: scheduler,
101+
})
102+
103+
reconciler.reconcileModel(context.Background(), mustGetSched(registry, "spread-model"))
104+
105+
// With current==0 and a target of 2, the MinReplicas floor path
106+
// schedules up to cluster capacity (2 nodes).
107+
Expect(len(scheduler.scheduleCalls)).To(Equal(2))
108+
})
109+
110+
It("is a no-op when no nodes match", func() {
111+
Expect(registry.SetModelScheduling(context.Background(), &ModelSchedulingConfig{
112+
ModelName: "spread-model", SpreadAll: true,
113+
NodeSelector: `{"tier":"nope"}`,
114+
})).To(Succeed())
115+
116+
scheduler := &fakeScheduler{}
117+
reconciler := NewReplicaReconciler(ReplicaReconcilerOptions{
118+
Registry: registry,
119+
Scheduler: scheduler,
120+
})
121+
122+
reconciler.reconcileModel(context.Background(), mustGetSched(registry, "spread-model"))
123+
Expect(scheduler.scheduleCalls).To(BeEmpty())
124+
})
125+
})
126+
81127
Context("model below min_replicas", func() {
82128
It("scales up to min_replicas", func() {
83129
node := registerNode("node-1", "10.0.0.1:50051")

core/services/nodes/registry.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,18 @@ type NodeLabel struct {
135135
// - Both → auto-scale on matching nodes
136136
// - Neither → no-op (default behavior)
137137
//
138-
// Auto-scaling is enabled when MinReplicas > 0 or MaxReplicas > 0.
138+
// Auto-scaling is enabled when MinReplicas > 0, MaxReplicas > 0, or SpreadAll is set.
139139
type ModelSchedulingConfig struct {
140140
ID string `gorm:"primaryKey;size:36" json:"id"`
141141
ModelName string `gorm:"uniqueIndex;size:255" json:"model_name"`
142142
NodeSelector string `gorm:"type:text" json:"node_selector,omitempty"` // JSON {"key":"value",...}
143143
MinReplicas int `gorm:"default:0" json:"min_replicas"`
144144
MaxReplicas int `gorm:"default:0" json:"max_replicas"`
145+
// SpreadAll requests one replica on every node matching NodeSelector
146+
// (every healthy backend node when the selector is empty), tracked as
147+
// nodes join and leave. Mutually exclusive with MinReplicas/MaxReplicas.
148+
// The reconciler turns this into a dynamic Min==Max target each tick.
149+
SpreadAll bool `gorm:"column:spread_all;default:false" json:"spread_all,omitempty"`
145150
// Prefix-cache-aware routing (epic #10063). RoutePolicy "" means inherit
146151
// the cluster-wide default. Thresholds are per-model overrides; 0 means
147152
// inherit the global default.
@@ -1392,14 +1397,28 @@ func (r *NodeRegistry) SetModelScheduling(ctx context.Context, config *ModelSche
13921397
Clauses(clause.OnConflict{
13931398
Columns: []clause.Column{{Name: "model_name"}},
13941399
DoUpdates: clause.AssignmentColumns([]string{
1395-
"node_selector", "min_replicas", "max_replicas",
1400+
"node_selector", "min_replicas", "max_replicas", "spread_all",
13961401
"route_policy", "balance_abs_threshold", "balance_rel_threshold", "min_prefix_match",
13971402
"updated_at",
13981403
}),
13991404
}).
14001405
Create(config).Error
14011406
}
14021407

1408+
// SeedModelScheduling authoritatively applies a batch of scheduling configs at
1409+
// startup. Each config is upserted (full-replace on model_name), overwriting any
1410+
// prior row for that model. Models not present in configs are left untouched.
1411+
func (r *NodeRegistry) SeedModelScheduling(ctx context.Context, configs []ModelSchedulingConfig) error {
1412+
for i := range configs {
1413+
if err := r.SetModelScheduling(ctx, &configs[i]); err != nil {
1414+
return fmt.Errorf("seeding scheduling config for model %q: %w", configs[i].ModelName, err)
1415+
}
1416+
xlog.Info("Seeded model scheduling config", "model", configs[i].ModelName,
1417+
"spread_all", configs[i].SpreadAll, "min", configs[i].MinReplicas, "max", configs[i].MaxReplicas)
1418+
}
1419+
return nil
1420+
}
1421+
14031422
// GetModelScheduling returns the scheduling config for a model, or nil if none exists.
14041423
func (r *NodeRegistry) GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error) {
14051424
var config ModelSchedulingConfig
@@ -1423,7 +1442,7 @@ func (r *NodeRegistry) ListModelSchedulings(ctx context.Context) ([]ModelSchedul
14231442
// ListAutoScalingConfigs returns scheduling configs where auto-scaling is enabled.
14241443
func (r *NodeRegistry) ListAutoScalingConfigs(ctx context.Context) ([]ModelSchedulingConfig, error) {
14251444
var configs []ModelSchedulingConfig
1426-
err := r.db.WithContext(ctx).Where("min_replicas > 0 OR max_replicas > 0").Find(&configs).Error
1445+
err := r.db.WithContext(ctx).Where("min_replicas > 0 OR max_replicas > 0 OR spread_all = ?", true).Find(&configs).Error
14271446
return configs, err
14281447
}
14291448

0 commit comments

Comments
 (0)