Skip to content

Commit af6d34e

Browse files
feat: background workers = non-HTTP workers with shared state
Background workers are long-running PHP workers outside the HTTP cycle that observe their environment and publish configuration via set_vars/get_vars. HTTP workers read this state per-request with version-based caching. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 097563d commit af6d34e

40 files changed

+2125
-47
lines changed

background_worker.go

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
package frankenphp
2+
3+
// #include "frankenphp.h"
4+
import "C"
5+
import (
6+
"fmt"
7+
"log/slog"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
"unsafe"
12+
)
13+
14+
type backgroundWorkerState struct {
15+
varsPtr unsafe.Pointer // *C.HashTable, persistent, managed by C
16+
varsVersion atomic.Uint64 // incremented on each set_vars call
17+
mu sync.RWMutex
18+
ready chan struct{}
19+
readyOnce sync.Once
20+
}
21+
22+
type BackgroundWorkerRegistry struct {
23+
entrypoint string
24+
autoStartNames []string // names to start at boot (from Caddyfile match)
25+
maxWorkers int // max lazy-started instances (0 = unlimited)
26+
mu sync.Mutex
27+
workers map[string]*backgroundWorkerState
28+
}
29+
30+
func NewBackgroundWorkerRegistry(entrypoint string) *BackgroundWorkerRegistry {
31+
return &BackgroundWorkerRegistry{
32+
entrypoint: entrypoint,
33+
workers: make(map[string]*backgroundWorkerState),
34+
}
35+
}
36+
37+
func (registry *BackgroundWorkerRegistry) AddAutoStartNames(names ...string) {
38+
registry.autoStartNames = append(registry.autoStartNames, names...)
39+
}
40+
41+
func (registry *BackgroundWorkerRegistry) SetMaxWorkers(max int) {
42+
registry.maxWorkers = max
43+
}
44+
45+
func (registry *BackgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) {
46+
registry.mu.Lock()
47+
defer registry.mu.Unlock()
48+
49+
if bgw := registry.workers[name]; bgw != nil {
50+
return bgw, true, nil
51+
}
52+
53+
if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers {
54+
return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker with match", name, registry.maxWorkers)
55+
}
56+
57+
bgw := &backgroundWorkerState{
58+
ready: make(chan struct{}),
59+
}
60+
registry.workers[name] = bgw
61+
62+
return bgw, false, nil
63+
}
64+
65+
func (registry *BackgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) {
66+
registry.mu.Lock()
67+
defer registry.mu.Unlock()
68+
69+
if registry.workers[name] == bgw {
70+
delete(registry.workers, name)
71+
}
72+
}
73+
74+
var (
75+
lockedVarsMu sync.Mutex
76+
lockedVarsStack = make(map[int][][]*backgroundWorkerState)
77+
)
78+
79+
func startBackgroundWorker(thread *phpThread, bgWorkerName string) error {
80+
if bgWorkerName == "" {
81+
return fmt.Errorf("background worker name must not be empty")
82+
}
83+
84+
registry := getRegistry(thread)
85+
if registry == nil || registry.entrypoint == "" {
86+
return fmt.Errorf("no background worker configured in this php_server")
87+
}
88+
89+
return startBackgroundWorkerWithRegistry(registry, bgWorkerName)
90+
}
91+
92+
func startBackgroundWorkerWithRegistry(registry *BackgroundWorkerRegistry, bgWorkerName string) error {
93+
bgw, exists, err := registry.reserve(bgWorkerName)
94+
if err != nil {
95+
return err
96+
}
97+
if exists {
98+
return nil
99+
}
100+
101+
worker, err := newWorker(workerOpt{
102+
name: bgWorkerName,
103+
fileName: registry.entrypoint,
104+
num: 1,
105+
env: PrepareEnv(nil),
106+
watch: []string{},
107+
maxConsecutiveFailures: -1,
108+
})
109+
if err != nil {
110+
registry.remove(bgWorkerName, bgw)
111+
112+
return fmt.Errorf("failed to create background worker: %w", err)
113+
}
114+
115+
worker.httpEnabled = false
116+
worker.backgroundWorker = bgw
117+
worker.backgroundRegistry = registry
118+
119+
bgWorkerThread := getInactivePHPThread()
120+
if bgWorkerThread == nil {
121+
registry.remove(bgWorkerName, bgw)
122+
123+
return fmt.Errorf("no available PHP thread for background worker (increase max_threads)")
124+
}
125+
126+
scalingMu.Lock()
127+
workers = append(workers, worker)
128+
scalingMu.Unlock()
129+
130+
convertToWorkerThread(bgWorkerThread, worker)
131+
132+
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
133+
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName))
134+
}
135+
136+
return nil
137+
}
138+
139+
// startAutoBackgroundWorkers finds all registries with auto-start names and starts them.
140+
// Called before initWorkers so background workers have a head start on set_vars.
141+
func startAutoBackgroundWorkers(workerOpts []workerOpt) error {
142+
seen := make(map[*BackgroundWorkerRegistry]struct{})
143+
for _, w := range workerOpts {
144+
if w.backgroundRegistry != nil {
145+
if _, ok := seen[w.backgroundRegistry]; !ok {
146+
seen[w.backgroundRegistry] = struct{}{}
147+
if err := w.backgroundRegistry.startAutoWorkers(); err != nil {
148+
return err
149+
}
150+
}
151+
}
152+
}
153+
return nil
154+
}
155+
156+
// startAutoWorkers starts all background workers configured with match names.
157+
func (registry *BackgroundWorkerRegistry) startAutoWorkers() error {
158+
for _, name := range registry.autoStartNames {
159+
if err := startBackgroundWorkerWithRegistry(registry, name); err != nil {
160+
return fmt.Errorf("failed to auto-start background worker %q: %w", name, err)
161+
}
162+
}
163+
return nil
164+
}
165+
166+
func getRegistry(thread *phpThread) *BackgroundWorkerRegistry {
167+
if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundRegistry != nil {
168+
return handler.worker.backgroundRegistry
169+
}
170+
if fc, ok := fromContext(thread.context()); ok {
171+
return fc.backgroundRegistry
172+
}
173+
174+
return nil
175+
}
176+
177+
//export go_frankenphp_start_background_worker
178+
func go_frankenphp_start_background_worker(threadIndex C.uintptr_t, name *C.char, nameLen C.size_t) *C.char {
179+
bgWorkerName := C.GoStringN(name, C.int(nameLen))
180+
181+
if err := startBackgroundWorker(phpThreads[threadIndex], bgWorkerName); err != nil {
182+
return C.CString(err.Error())
183+
}
184+
185+
return nil
186+
}
187+
188+
// go_frankenphp_worker_wait_and_get waits for background workers to be ready and returns
189+
// their persistent HashTable pointers. outPtrs must point to a C-allocated array
190+
// of nameCount void* slots.
191+
//
192+
//export go_frankenphp_worker_wait_and_get
193+
func go_frankenphp_worker_wait_and_get(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, outPtrs *unsafe.Pointer, callerVersions *C.ulong, outVersions *C.ulong) *C.char {
194+
registry := getRegistry(phpThreads[threadIndex])
195+
if registry == nil {
196+
return C.CString("no background worker configured in this php_server")
197+
}
198+
199+
n := int(nameCount)
200+
nameSlice := unsafe.Slice(names, n)
201+
nameLenSlice := unsafe.Slice(nameLens, n)
202+
ptrSlice := unsafe.Slice(outPtrs, n)
203+
204+
sks := make([]*backgroundWorkerState, n)
205+
goNames := make([]string, n)
206+
for i := 0; i < n; i++ {
207+
goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i]))
208+
registry.mu.Lock()
209+
sks[i] = registry.workers[goNames[i]]
210+
registry.mu.Unlock()
211+
if sks[i] == nil {
212+
return C.CString("background worker not found: " + goNames[i])
213+
}
214+
}
215+
216+
// Fast path: check if all versions match - skip lock + copy entirely
217+
if callerVersions != nil {
218+
callerVSlice := unsafe.Slice(callerVersions, n)
219+
allMatch := true
220+
for i, sk := range sks {
221+
if uint64(callerVSlice[i]) != sk.varsVersion.Load() {
222+
allMatch = false
223+
break
224+
}
225+
}
226+
if allMatch {
227+
// Versions unchanged - caller can reuse cached data
228+
for i := 0; i < n; i++ {
229+
ptrSlice[i] = nil
230+
}
231+
return nil
232+
}
233+
}
234+
235+
timeout := time.Duration(timeoutMs) * time.Millisecond
236+
done := make(chan error, 1)
237+
go func() {
238+
timer := time.NewTimer(timeout)
239+
defer timer.Stop()
240+
for i, sk := range sks {
241+
select {
242+
case <-sk.ready:
243+
// background worker has called set_vars
244+
case <-timer.C:
245+
done <- fmt.Errorf("timeout waiting for background worker: %s", goNames[i])
246+
return
247+
}
248+
}
249+
done <- nil
250+
}()
251+
252+
if err := <-done; err != nil {
253+
return C.CString(err.Error())
254+
}
255+
256+
outVSlice := unsafe.Slice(outVersions, n)
257+
for i, sk := range sks {
258+
sk.mu.RLock()
259+
ptrSlice[i] = sk.varsPtr
260+
outVSlice[i] = C.ulong(sk.varsVersion.Load())
261+
}
262+
263+
// Push locked background workers onto a stack so release_vars can unlock them
264+
// without re-lookup. Stack supports reentrant get_vars calls.
265+
lockedVarsMu.Lock()
266+
lockedVarsStack[int(threadIndex)] = append(lockedVarsStack[int(threadIndex)], sks)
267+
lockedVarsMu.Unlock()
268+
269+
return nil
270+
}
271+
272+
//export go_frankenphp_worker_release_vars
273+
func go_frankenphp_worker_release_vars(threadIndex C.uintptr_t) {
274+
lockedVarsMu.Lock()
275+
stack := lockedVarsStack[int(threadIndex)]
276+
if len(stack) == 0 {
277+
lockedVarsMu.Unlock()
278+
return
279+
}
280+
sks := stack[len(stack)-1]
281+
lockedVarsStack[int(threadIndex)] = stack[:len(stack)-1]
282+
lockedVarsMu.Unlock()
283+
284+
for _, sk := range sks {
285+
sk.mu.RUnlock()
286+
}
287+
}
288+
289+
//export go_frankenphp_worker_set_vars
290+
func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char {
291+
thread := phpThreads[threadIndex]
292+
293+
handler, ok := thread.handler.(*workerThread)
294+
if !ok || handler.worker.httpEnabled || handler.worker.backgroundWorker == nil {
295+
return C.CString("frankenphp_worker_set_vars() can only be called from a background worker")
296+
}
297+
298+
sk := handler.worker.backgroundWorker
299+
300+
sk.mu.Lock()
301+
*oldPtr = sk.varsPtr
302+
sk.varsPtr = varsPtr
303+
sk.varsVersion.Add(1)
304+
sk.mu.Unlock()
305+
306+
sk.readyOnce.Do(func() {
307+
handler.markBackgroundReady()
308+
close(sk.ready)
309+
})
310+
311+
return nil
312+
}

0 commit comments

Comments
 (0)