Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,20 @@ run-e2e-aio: protogen-go
@echo 'Running e2e AIO tests'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e-aio

# Distributed architecture e2e (PostgreSQL + NATS via testcontainers).
# Includes NatsJWT specs (JWT-enabled NATS). Requires Docker.
# VLLMMultinode is excluded here; use test-e2e-vllm-multinode for that.
test-e2e-distributed: protogen-go
@echo 'Running distributed e2e tests (label Distributed, incl. NatsJWT)'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --label-filter='Distributed && !VLLMMultinode' --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e/distributed

# vLLM multi-node DP smoke (CPU). Builds local-ai:tests and the
# cpu-vllm backend from the current working tree, then drives a
# head + headless follower via testcontainers-go and asserts a chat
# completion. BuildKit caches both images, so re-runs only rebuild
# what changed. The test lives under tests/e2e/distributed and is
# selected by the VLLMMultinode label so it doesn't run alongside
# the other distributed-suite tests by default.
# test-e2e-distributed.
test-e2e-vllm-multinode: docker-build-e2e extract-backend-vllm protogen-go
@echo 'Running e2e vLLM multi-node DP test'
LOCALAI_IMAGE=local-ai \
Expand Down
7 changes: 6 additions & 1 deletion core/application/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)

// Connect to NATS
natsClient, err := messaging.New(cfg.Distributed.NatsURL)
natsAuth := cfg.Distributed.NatsAuthConfig()
if natsAuth.RequireAuth && (natsAuth.ServiceUserJWT == "" || natsAuth.ServiceUserSeed == "") {
return nil, fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED")
}
natsOpts := cfg.Distributed.NatsMessagingOptions("", "")
natsClient, err := messaging.New(cfg.Distributed.NatsURL, natsOpts...)
if err != nil {
return nil, fmt.Errorf("connecting to NATS: %w", err)
}
Expand Down
78 changes: 68 additions & 10 deletions core/cli/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ type AgentWorkerCMD struct {
Subject string `env:"LOCALAI_AGENT_SUBJECT" default:"agent.execute" help:"NATS subject for agent execution" group:"distributed"`
Queue string `env:"LOCALAI_AGENT_QUEUE" default:"agent-workers" help:"NATS queue group name" group:"distributed"`

NatsJWT string `env:"LOCALAI_NATS_JWT" help:"NATS user JWT override (defaults to nats_jwt from registration)" group:"distributed"`
NatsUserSeed string `env:"LOCALAI_NATS_USER_SEED" help:"NATS user seed override (defaults to nats_user_seed from registration)" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"Fallback NATS service JWT when registration does not mint agent JWT" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"Fallback NATS service seed paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT+seed to connect" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`

// Timeouts
MCPCIJobTimeout string `env:"LOCALAI_MCP_CI_JOB_TIMEOUT" default:"10m" help:"Timeout for MCP CI job execution" group:"distributed"`
}
Expand Down Expand Up @@ -81,15 +90,30 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
registrationBody["token"] = cmd.RegistrationToken
}

nodeID, apiToken, err := regClient.RegisterWithRetry(context.Background(), registrationBody, 10)
// Context cancelled on shutdown — used by registration waits, heartbeat, and
// other background goroutines.
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()

// Acquire credentials via (re)registration. When the bus requires auth and no
// static fallback is configured, wait through admin approval until the
// frontend mints credentials rather than starting unauthenticated.
credMgr := workerregistry.NewNATSCredentialManager(
func(ctx context.Context) (*workerregistry.RegisterResponse, error) {
return regClient.RegisterFull(ctx, registrationBody)
},
cmd.NatsRequireAuth && cmd.NatsJWT == "" && cmd.NatsServiceJWT == "",
)
res, err := credMgr.Acquire(shutdownCtx)
if err != nil {
return fmt.Errorf("registration failed: %w", err)
}
nodeID := res.ID
xlog.Info("Registered with frontend", "nodeID", nodeID, "frontend", cmd.RegisterTo)

// Use provisioned API token if none was set
if cmd.APIToken == "" {
cmd.APIToken = apiToken
cmd.APIToken = res.APIToken
}

// Start heartbeat
Expand All @@ -98,14 +122,40 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Warn("invalid heartbeat interval, using default 10s", "input", cmd.HeartbeatInterval, "error", err)
}
heartbeatInterval = cmp.Or(heartbeatInterval, 10*time.Second)
// Context cancelled on shutdown — used by heartbeat and other background goroutines
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()

go regClient.HeartbeatLoop(shutdownCtx, nodeID, heartbeatInterval, func() map[string]any { return map[string]any{} })

// Connect to NATS
natsClient, err := messaging.New(cmd.NatsURL)
// Resolve NATS credentials with precedence: explicit env override, then
// frontend-minted (auto-refreshed before expiry), then service fallback.
// Each static source must supply JWT and seed together.
natsTLS := messaging.TLSFiles{CA: cmd.NatsTLSCA, Cert: cmd.NatsTLSCert, Key: cmd.NatsTLSKey}
var natsOpts []messaging.Option
switch {
case cmd.NatsJWT != "" || cmd.NatsUserSeed != "":
if (cmd.NatsJWT == "") != (cmd.NatsUserSeed == "") {
return fmt.Errorf("LOCALAI_NATS_JWT and LOCALAI_NATS_USER_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsJWT, cmd.NatsUserSeed))
case credMgr.HasCredentials():
natsOpts = append(natsOpts, messaging.WithUserJWTProvider(credMgr.Provider()))
go func() {
if err := credMgr.RefreshLoop(shutdownCtx); err != nil {
xlog.Error("NATS credential refresh permanently failed; shutting down agent worker", "error", err)
shutdownCancel()
}
}()
case cmd.NatsServiceJWT != "" || cmd.NatsServiceSeed != "":
if (cmd.NatsServiceJWT == "") != (cmd.NatsServiceSeed == "") {
return fmt.Errorf("LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsServiceJWT, cmd.NatsServiceSeed))
case cmd.NatsRequireAuth:
return fmt.Errorf("NATS JWT+seed required: enable frontend minting or set LOCALAI_NATS_* env vars")
}
if natsTLS.Enabled() {
natsOpts = append(natsOpts, messaging.WithTLS(natsTLS))
}
natsClient, err := messaging.New(cmd.NatsURL, natsOpts...)
if err != nil {
return fmt.Errorf("connecting to NATS: %w", err)
}
Expand Down Expand Up @@ -183,17 +233,25 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {

xlog.Info("Agent worker ready, waiting for jobs", "subject", cmd.Subject, "queue", cmd.Queue)

// Wait for shutdown
// Wait for an OS signal or an internal fatal condition (e.g. NATS
// credentials became unrenewable), so the worker restarts and re-acquires
// rather than lingering unable to serve.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
var runErr error
select {
case <-sigCh:
case <-shutdownCtx.Done():
runErr = fmt.Errorf("agent worker shutting down: NATS credentials unavailable")
xlog.Error("Internal shutdown requested", "error", runErr)
}

xlog.Info("Shutting down agent worker")
shutdownCancel() // stop heartbeat loop immediately
dispatcher.Stop()
mcpTools.CloseAllMCPSessions()
regClient.GracefulDeregister(nodeID)
return nil
return runErr
}

// handleMCPToolRequest handles a NATS request-reply for MCP tool execution.
Expand Down
36 changes: 36 additions & 0 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ type RunCMD struct {
DistributedPrefixCacheTTL string `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE_TTL" help:"Idle-timeout for prefix-cache index entries; also drives the background eviction cadence (every TTL/2). Default 5m." group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
NatsAccountSeed string `env:"LOCALAI_NATS_ACCOUNT_SEED" help:"NATS account signing seed (SU...) used to mint per-node worker JWTs at registration" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"NATS user JWT for the frontend (and agent workers) to publish control-plane messages" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"NATS user signing seed (SU...) paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsWorkerJWTTTL string `env:"LOCALAI_NATS_WORKER_JWT_TTL" help:"Lifetime of minted per-node NATS JWTs (e.g. 24h, default 24h)" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT credentials (service JWT + account seed) when distributed mode is enabled" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI); use with tls:// in --nats-url" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
ExposeNodeHeader bool `env:"LOCALAI_EXPOSE_NODE_HEADER" default:"false" help:"Set the X-LocalAI-Node response header on inference responses (OpenAI chat/completions/embeddings, Anthropic /v1/messages, Ollama /api/chat,/api/generate,/api/embed) with the ID of the worker that served the request. Disabled by default: the node ID reveals internal topology and should not be exposed on a public endpoint. Best-effort: under heavy concurrency the header may reflect a recent routing decision rather than this exact request's." group:"distributed"`

Version bool
Expand Down Expand Up @@ -283,6 +291,34 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}
if r.NatsAccountSeed != "" {
opts = append(opts, config.WithNatsAccountSeed(r.NatsAccountSeed))
}
if r.NatsServiceJWT != "" {
opts = append(opts, config.WithNatsServiceJWT(r.NatsServiceJWT))
}
if r.NatsServiceSeed != "" {
opts = append(opts, config.WithNatsServiceSeed(r.NatsServiceSeed))
}
if r.NatsWorkerJWTTTL != "" {
d, err := time.ParseDuration(r.NatsWorkerJWTTTL)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_WORKER_JWT_TTL %q: %w", r.NatsWorkerJWTTTL, err)
}
opts = append(opts, config.WithNatsWorkerJWTTTL(d))
}
if r.NatsRequireAuth {
opts = append(opts, config.EnableNatsRequireAuth)
}
if r.NatsTLSCA != "" {
opts = append(opts, config.WithNatsTLSCA(r.NatsTLSCA))
}
if r.NatsTLSCert != "" {
opts = append(opts, config.WithNatsTLSCert(r.NatsTLSCert))
}
if r.NatsTLSKey != "" {
opts = append(opts, config.WithNatsTLSKey(r.NatsTLSKey))
}
if r.AutoApproveNodes {
opts = append(opts, config.EnableAutoApproveNodes)
}
Expand Down
2 changes: 1 addition & 1 deletion core/cli/worker/worker_vllm.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *VLLMDistributed) Run(ctx *cliContext.Context) error {
FrontendURL: r.RegisterTo,
RegistrationToken: r.RegistrationToken,
}
nodeID, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
nodeID, _, _, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
if regErr != nil {
return fmt.Errorf("registering with frontend: %w", regErr)
}
Expand Down
50 changes: 31 additions & 19 deletions core/cli/workerregistry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,65 +58,77 @@ func (c *RegistrationClient) setAuth(req *http.Request) {

// RegisterResponse is the JSON body returned by /api/node/register.
type RegisterResponse struct {
ID string `json:"id"`
APIToken string `json:"api_token,omitempty"`
ID string `json:"id"`
Status string `json:"status,omitempty"` // "pending" until an admin approves the node
APIToken string `json:"api_token,omitempty"`
NatsJWT string `json:"nats_jwt,omitempty"`
NatsUserSeed string `json:"nats_user_seed,omitempty"`
}

// Register sends a single registration request and returns the node ID and
// (optionally) an auto-provisioned API token.
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (string, string, error) {
// RegisterFull sends a single registration request and returns the full
// response (node ID, approval status, and optional API token / NATS creds).
// Re-registration is idempotent: the frontend preserves the node row and mints
// a fresh NATS JWT each call, so this doubles as the credential-refresh call.
func (c *RegistrationClient) RegisterFull(ctx context.Context, body map[string]any) (*RegisterResponse, error) {
jsonBody, _ := json.Marshal(body)
url := c.baseURL() + "/api/node/register"

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return "", "", fmt.Errorf("creating request: %w", err)
return nil, fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
c.setAuth(req)

resp, err := c.httpClient().Do(req)
if err != nil {
return "", "", fmt.Errorf("posting to %s: %w", url, err)
return nil, fmt.Errorf("posting to %s: %w", url, err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", "", fmt.Errorf("registration failed with status %d", resp.StatusCode)
return nil, fmt.Errorf("registration failed with status %d", resp.StatusCode)
}

var result RegisterResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", "", fmt.Errorf("decoding response: %w", err)
return nil, fmt.Errorf("decoding response: %w", err)
}
return result.ID, result.APIToken, nil
return &result, nil
}

// Register sends a single registration request and returns the node ID and
// optional credentials (API token for agent workers, NATS JWT when configured).
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
res, err := c.RegisterFull(ctx, body)
if err != nil {
return "", "", "", "", err
}
return res.ID, res.APIToken, res.NatsJWT, res.NatsUserSeed, nil
}

// RegisterWithRetry retries registration with exponential backoff.
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (string, string, error) {
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
backoff := 2 * time.Second
maxBackoff := 30 * time.Second

var nodeID, apiToken string
var err error

for attempt := 1; attempt <= maxRetries; attempt++ {
nodeID, apiToken, err = c.Register(ctx, body)
nodeID, apiToken, natsJWT, natsSeed, err = c.Register(ctx, body)
if err == nil {
return nodeID, apiToken, nil
return nodeID, apiToken, natsJWT, natsSeed, nil
}
if attempt == maxRetries {
return "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
return "", "", "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
xlog.Warn("Registration failed, retrying", "attempt", attempt, "next_retry", backoff, "error", err)
select {
case <-ctx.Done():
return "", "", ctx.Err()
return "", "", "", "", ctx.Err()
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
return nodeID, apiToken, err
return nodeID, apiToken, natsJWT, natsSeed, err
}

// Heartbeat sends a single heartbeat POST with the given body.
Expand Down
Loading
Loading