Skip to content

Commit 8a066d8

Browse files
authored
fix(cli): improve runtime health checks (#3740)
1 parent d4bc85b commit 8a066d8

File tree

6 files changed

+86
-30
lines changed

6 files changed

+86
-30
lines changed

genkit-tools/common/src/manager/manager.ts

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import axios, { type AxiosError } from 'axios';
1818
import chokidar from 'chokidar';
1919
import EventEmitter from 'events';
20+
import * as fsSync from 'fs';
2021
import fs from 'fs/promises';
2122
import path from 'path';
2223
import {
@@ -390,6 +391,10 @@ export class RuntimeManager {
390391
*/
391392
private async handleNewDevUi(filePath: string) {
392393
try {
394+
if (!fsSync.existsSync(filePath)) {
395+
// file already got deleted, ignore...
396+
return;
397+
}
393398
const { content, toolsInfo } = await retriable(
394399
async () => {
395400
const content = await fs.readFile(filePath, 'utf-8');
@@ -433,6 +438,10 @@ export class RuntimeManager {
433438
*/
434439
private async handleNewRuntime(filePath: string) {
435440
try {
441+
if (!fsSync.existsSync(filePath)) {
442+
// file already got deleted, ignore...
443+
return;
444+
}
436445
const { content, runtimeInfo } = await retriable(
437446
async () => {
438447
const content = await fs.readFile(filePath, 'utf-8');
@@ -448,7 +457,12 @@ export class RuntimeManager {
448457
runtimeInfo.name = runtimeInfo.id;
449458
}
450459
const fileName = path.basename(filePath);
451-
if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) {
460+
if (
461+
await checkServerHealth(
462+
runtimeInfo.reflectionServerUrl,
463+
runtimeInfo.id
464+
)
465+
) {
452466
if (
453467
runtimeInfo.reflectionApiSpecVersion !=
454468
GENKIT_REFLECTION_API_SPEC_VERSION
@@ -529,7 +543,9 @@ export class RuntimeManager {
529543
private async performHealthChecks() {
530544
const healthCheckPromises = Object.entries(this.filenameToRuntimeMap).map(
531545
async ([fileName, runtime]) => {
532-
if (!(await checkServerHealth(runtime.reflectionServerUrl))) {
546+
if (
547+
!(await checkServerHealth(runtime.reflectionServerUrl, runtime.id))
548+
) {
533549
await this.removeRuntime(fileName);
534550
}
535551
}
@@ -541,19 +557,14 @@ export class RuntimeManager {
541557
* Removes the runtime file which will trigger the removal watcher.
542558
*/
543559
private async removeRuntime(fileName: string) {
544-
const runtime = this.filenameToRuntimeMap[fileName];
545-
if (runtime) {
546-
try {
547-
const runtimesDir = await findRuntimesDir(this.projectRoot);
548-
const runtimeFilePath = path.join(runtimesDir, fileName);
549-
await fs.unlink(runtimeFilePath);
550-
} catch (error) {
551-
logger.debug(`Failed to delete runtime file: ${error}`);
552-
}
553-
logger.debug(
554-
`Removed unhealthy runtime with ID ${runtime.id} from manager.`
555-
);
560+
try {
561+
const runtimesDir = await findRuntimesDir(this.projectRoot);
562+
const runtimeFilePath = path.join(runtimesDir, fileName);
563+
await fs.unlink(runtimeFilePath);
564+
} catch (error) {
565+
logger.debug(`Failed to delete runtime file: ${error}`);
556566
}
567+
logger.debug(`Removed unhealthy runtime ${fileName} from manager.`);
557568
}
558569
}
559570

genkit-tools/common/src/utils/utils.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,12 @@ export async function detectRuntime(directory: string): Promise<Runtime> {
139139
/**
140140
* Checks the health of a server with a /api/__health endpoint.
141141
*/
142-
export async function checkServerHealth(url: string): Promise<boolean> {
142+
export async function checkServerHealth(
143+
url: string,
144+
id?: string
145+
): Promise<boolean> {
143146
try {
144-
const response = await fetch(`${url}/api/__health`);
147+
const response = await fetch(`${url}/api/__health${id ? `?id=${id}` : ''}`);
145148
return response.status === 200;
146149
} catch (error) {
147150
if (isConnectionRefusedError(error)) {

go/genkit/reflection.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ type reflectionServer struct {
5555
RuntimeFilePath string // Path to the runtime file that was written at startup.
5656
}
5757

58+
func (s *reflectionServer) runtimeID() string {
59+
_, port, err := net.SplitHostPort(s.Addr)
60+
if err != nil {
61+
// This should not happen with a valid address.
62+
return strconv.Itoa(os.Getpid())
63+
}
64+
return fmt.Sprintf("%d-%s", os.Getpid(), port)
65+
}
66+
5867
// findAvailablePort finds the next available port starting from the given port number.
5968
func findAvailablePort(startPort int) (string, error) {
6069
for port := startPort; port < startPort+100; port++ {
@@ -91,10 +100,10 @@ func startReflectionServer(ctx context.Context, g *Genkit, errCh chan<- error, s
91100

92101
s := &reflectionServer{
93102
Server: &http.Server{
94-
Addr: addr,
95-
Handler: serveMux(g),
103+
Addr: addr,
96104
},
97105
}
106+
s.Handler = serveMux(g, s)
98107

99108
slog.Debug("starting reflection server", "addr", s.Addr)
100109

@@ -159,7 +168,7 @@ func (s *reflectionServer) writeRuntimeFile(url string) error {
159168

160169
runtimeID := os.Getenv("GENKIT_RUNTIME_ID")
161170
if runtimeID == "" {
162-
runtimeID = strconv.Itoa(os.Getpid())
171+
runtimeID = s.runtimeID()
163172
}
164173

165174
timestamp := time.Now().UTC().Format(time.RFC3339)
@@ -238,10 +247,14 @@ func findProjectRoot() (string, error) {
238247
}
239248

240249
// serveMux returns a new ServeMux configured for the required Reflection API endpoints.
241-
func serveMux(g *Genkit) *http.ServeMux {
250+
func serveMux(g *Genkit, s *reflectionServer) *http.ServeMux {
242251
mux := http.NewServeMux()
243252
// Skip wrapHandler here to avoid logging constant polling requests.
244-
mux.HandleFunc("GET /api/__health", func(w http.ResponseWriter, _ *http.Request) {
253+
mux.HandleFunc("GET /api/__health", func(w http.ResponseWriter, r *http.Request) {
254+
if id := r.URL.Query().Get("id"); id != "" && id != s.runtimeID() {
255+
http.Error(w, "Invalid runtime ID", http.StatusServiceUnavailable)
256+
return
257+
}
245258
w.WriteHeader(http.StatusOK)
246259
})
247260
mux.HandleFunc("GET /api/actions", wrapReflectionHandler(handleListActions(g)))

go/genkit/reflection_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ func TestServeMux(t *testing.T) {
9090
core.DefineAction(g.reg, "test/inc", api.ActionTypeCustom, nil, nil, inc)
9191
core.DefineAction(g.reg, "test/dec", api.ActionTypeCustom, nil, nil, dec)
9292

93-
ts := httptest.NewServer(serveMux(g))
93+
s := &reflectionServer{
94+
Server: &http.Server{},
95+
}
96+
ts := httptest.NewServer(serveMux(g, s))
97+
s.Addr = strings.TrimPrefix(ts.URL, "http://")
9498
defer ts.Close()
9599

96100
t.Parallel()
@@ -104,6 +108,26 @@ func TestServeMux(t *testing.T) {
104108
if res.StatusCode != http.StatusOK {
105109
t.Errorf("health check failed: got status %d, want %d", res.StatusCode, http.StatusOK)
106110
}
111+
112+
// Test with correct runtime ID
113+
res, err = http.Get(ts.URL + "/api/__health?id=" + s.runtimeID())
114+
if err != nil {
115+
t.Fatal(err)
116+
}
117+
defer res.Body.Close()
118+
if res.StatusCode != http.StatusOK {
119+
t.Errorf("health check with correct id failed: got status %d, want %d", res.StatusCode, http.StatusOK)
120+
}
121+
122+
// Test with incorrect runtime ID
123+
res, err = http.Get(ts.URL + "/api/__health?id=invalid-id")
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
defer res.Body.Close()
128+
if res.StatusCode != http.StatusServiceUnavailable {
129+
t.Errorf("health check with incorrect id failed: got status %d, want %d", res.StatusCode, http.StatusServiceUnavailable)
130+
}
107131
})
108132

109133
t.Run("list actions", func(t *testing.T) {

js/core/src/reflection.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ export class ReflectionServer {
8282
};
8383
}
8484

85+
get runtimeId() {
86+
return `${process.pid}${this.port !== null ? `-${this.port}` : ''}`;
87+
}
88+
8589
/**
8690
* Finds a free port to run the server on based on the original chosen port and environment.
8791
*/
@@ -112,7 +116,11 @@ export class ReflectionServer {
112116
next();
113117
});
114118

115-
server.get('/api/__health', async (_, response) => {
119+
server.get('/api/__health', async (req, response) => {
120+
if (req.query['id'] && req.query['id'] !== this.runtimeId) {
121+
response.status(503).send('Invalid runtime ID');
122+
return;
123+
}
116124
await this.registry.listActions();
117125
response.status(200).send('OK');
118126
});
@@ -322,16 +330,13 @@ export class ReflectionServer {
322330
const date = new Date();
323331
const time = date.getTime();
324332
const timestamp = date.toISOString();
325-
const runtimeId = `${process.pid}${
326-
this.port !== null ? `-${this.port}` : ''
327-
}`;
328333
this.runtimeFilePath = path.join(
329334
runtimesDir,
330-
`${runtimeId}-${time}.json`
335+
`${this.runtimeId}-${time}.json`
331336
);
332337
const fileContent = JSON.stringify(
333338
{
334-
id: process.env.GENKIT_RUNTIME_ID || runtimeId,
339+
id: process.env.GENKIT_RUNTIME_ID || this.runtimeId,
335340
pid: process.pid,
336341
name: this.options.name,
337342
reflectionServerUrl: `http://localhost:${this.port}`,

js/plugins/vertexai/src/rerankers/v2/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import { ActionType } from 'genkit/registry';
2424
import { RerankerReference, z } from 'genkit';
2525
import { getDerivedOptions } from '../../common/utils.js';
2626
import * as reranker from './reranker.js';
27-
import { VertexRerankerPluginOptions } from './types.js';
28-
export { VertexRerankerPluginOptions };
27+
import { type VertexRerankerPluginOptions } from './types.js';
28+
export { type VertexRerankerPluginOptions };
2929

3030
async function initializer(pluginOptions: VertexRerankerPluginOptions) {
3131
const clientOptions = await getDerivedOptions(

0 commit comments

Comments
 (0)