Skip to content

Commit 337c519

Browse files
committed
feat(spanner): worker proxy for executor framework
1 parent fe91af8 commit 337c519

6 files changed

Lines changed: 684 additions & 5 deletions

File tree

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
/*!
2+
* Copyright 2026 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import * as grpc from '@grpc/grpc-js';
18+
import * as protoLoader from '@grpc/proto-loader';
19+
import yargs from 'yargs';
20+
import * as path from 'path';
21+
import * as fs from 'fs';
22+
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
23+
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';
24+
import {Resource} from '@opentelemetry/resources';
25+
import {ATTR_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
26+
import {
27+
BatchSpanProcessor,
28+
TraceIdRatioBasedSampler,
29+
} from '@opentelemetry/sdk-trace-base';
30+
import {CloudExecutorImpl} from './cloud-executor-impl';
31+
import {HealthImplementation} from 'grpc-health-check';
32+
import {ReflectionService} from '@grpc/reflection';
33+
34+
const PROTO_PATH = path.join(
35+
__dirname,
36+
'../../protos/google/spanner/executor/v1/cloud_executor.proto',
37+
);
38+
39+
const OPTION_SPANNER_PORT = 'spanner_port';
40+
const OPTION_PROXY_PORT = 'proxy_port';
41+
const OPTION_CERTIFICATE = 'cert';
42+
const OPTION_SERVICE_KEY_FILE = 'service_key_file';
43+
const OPTION_USE_PLAIN_TEXT_CHANNEL = 'use_plain_text_channel';
44+
const OPTION_ENABLE_GRPC_FAULT_INJECTOR = 'enable_grpc_fault_injector';
45+
const OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO =
46+
'multiplexed_session_operations_ratio';
47+
48+
/**
49+
* class WorkerProxy which acts as a proxy server that forwards requests to the Spanner executor
50+
*/
51+
export class WorkerProxy {
52+
public static spannerPort = 0;
53+
public static proxyPort = 0;
54+
public static cert = '';
55+
public static serviceKeyFile = '';
56+
public static multiplexedSessionOperationsRatio = 0.0;
57+
public static usePlainTextChannel = false;
58+
public static enableGrpcFaultInjector = false;
59+
public static openTelemetrySdk: NodeTracerProvider;
60+
61+
public static readonly PROJECT_ID = 'spanner-cloud-systest';
62+
public static readonly CLOUD_TRACE_ENDPOINT =
63+
'staging-cloudtrace.sandbox.googleapis.com:443';
64+
65+
private static readonly MIN_PORT = 0;
66+
private static readonly MAX_PORT = 65535;
67+
private static readonly MIN_RATIO = 0.0;
68+
private static readonly MAX_RATIO = 1.0;
69+
private static readonly TRACE_SAMPLING_RATE = 0.01;
70+
71+
/**
72+
* Sets up the OpenTelemetry SDK
73+
*/
74+
public static async setupOpenTelemetrySdk(): Promise<NodeTracerProvider> {
75+
const exporterConfig: any = {
76+
projectId: WorkerProxy.PROJECT_ID,
77+
apiEndpoint: WorkerProxy.CLOUD_TRACE_ENDPOINT,
78+
};
79+
80+
if (WorkerProxy.serviceKeyFile) {
81+
exporterConfig.keyFilename = WorkerProxy.serviceKeyFile;
82+
}
83+
84+
const traceExporter = new TraceExporter(exporterConfig);
85+
86+
const provider = new NodeTracerProvider({
87+
resource: new Resource({
88+
[ATTR_SERVICE_NAME]: 'spanner-node-worker-proxy',
89+
}) as any,
90+
sampler: new TraceIdRatioBasedSampler(WorkerProxy.TRACE_SAMPLING_RATE),
91+
spanProcessors: [new BatchSpanProcessor(traceExporter as any)],
92+
});
93+
94+
provider.register();
95+
return provider;
96+
}
97+
98+
/**
99+
* Builds the command line options for the worker proxy
100+
*/
101+
public static buildOptions(args: string[]): any {
102+
const parser = yargs(args);
103+
104+
parser.option(OPTION_SPANNER_PORT, {
105+
type: 'number',
106+
description: 'Port of Spanner Frontend to which to send requests.',
107+
});
108+
parser.option(OPTION_PROXY_PORT, {
109+
type: 'number',
110+
description: 'Proxy port to start worker proxy on.',
111+
});
112+
parser.option(OPTION_CERTIFICATE, {
113+
type: 'string',
114+
description: 'Certificate used to connect to Spanner GFE.',
115+
});
116+
parser.option(OPTION_SERVICE_KEY_FILE, {
117+
type: 'string',
118+
description: 'Service key file used to set authentication.',
119+
});
120+
parser.option(OPTION_USE_PLAIN_TEXT_CHANNEL, {
121+
type: 'boolean',
122+
description:
123+
'Use a plain text gRPC channel (intended for the Cloud Spanner Emulator).',
124+
});
125+
parser.option(OPTION_ENABLE_GRPC_FAULT_INJECTOR, {
126+
type: 'boolean',
127+
description: 'Enable grpc fault injector in cloud client executor.',
128+
});
129+
parser.option(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO, {
130+
type: 'number',
131+
description: 'Ratio of operations to use multiplexed sessions.',
132+
});
133+
134+
try {
135+
return parser.parseSync();
136+
} catch (e: any) {
137+
throw new Error(e.message);
138+
}
139+
}
140+
141+
/**
142+
* main method to spin up the server and start the worker proxy
143+
*/
144+
public static async main(args: string[]) {
145+
const commandLineString = WorkerProxy.buildOptions(args);
146+
const commandLine = commandLineString as any;
147+
148+
if (commandLine[OPTION_SPANNER_PORT] === undefined) {
149+
throw new Error(
150+
'Spanner proxyPort need to be assigned in order to start worker proxy.',
151+
);
152+
}
153+
WorkerProxy.spannerPort = commandLine[OPTION_SPANNER_PORT];
154+
if (
155+
WorkerProxy.spannerPort < WorkerProxy.MIN_PORT ||
156+
WorkerProxy.spannerPort > WorkerProxy.MAX_PORT
157+
) {
158+
throw new Error(
159+
'Spanner proxyPort must be between ' +
160+
WorkerProxy.MIN_PORT +
161+
' and ' +
162+
WorkerProxy.MAX_PORT,
163+
);
164+
}
165+
166+
if (commandLine[OPTION_PROXY_PORT] === undefined) {
167+
throw new Error(
168+
'Proxy port need to be assigned in order to start worker proxy.',
169+
);
170+
}
171+
WorkerProxy.proxyPort = commandLine[OPTION_PROXY_PORT];
172+
if (
173+
WorkerProxy.proxyPort < WorkerProxy.MIN_PORT ||
174+
WorkerProxy.proxyPort > WorkerProxy.MAX_PORT
175+
) {
176+
throw new Error(
177+
'Proxy port must be between ' +
178+
WorkerProxy.MIN_PORT +
179+
' and ' +
180+
WorkerProxy.MAX_PORT,
181+
);
182+
}
183+
184+
if (!commandLine[OPTION_CERTIFICATE]) {
185+
throw new Error(
186+
'Certificate need to be assigned in order to start worker proxy.',
187+
);
188+
}
189+
WorkerProxy.cert = commandLine[OPTION_CERTIFICATE];
190+
191+
if (commandLine[OPTION_SERVICE_KEY_FILE]) {
192+
WorkerProxy.serviceKeyFile = commandLine[OPTION_SERVICE_KEY_FILE];
193+
}
194+
195+
WorkerProxy.usePlainTextChannel =
196+
!!commandLine[OPTION_USE_PLAIN_TEXT_CHANNEL];
197+
WorkerProxy.enableGrpcFaultInjector =
198+
!!commandLine[OPTION_ENABLE_GRPC_FAULT_INJECTOR];
199+
200+
if (
201+
commandLine[OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO] !== undefined
202+
) {
203+
WorkerProxy.multiplexedSessionOperationsRatio = Number(
204+
commandLine[OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO],
205+
);
206+
console.info(
207+
`Multiplexed session ratio from commandline arg: \n${WorkerProxy.multiplexedSessionOperationsRatio}`,
208+
);
209+
if (
210+
WorkerProxy.multiplexedSessionOperationsRatio < WorkerProxy.MIN_RATIO ||
211+
WorkerProxy.multiplexedSessionOperationsRatio > WorkerProxy.MAX_RATIO
212+
) {
213+
throw new Error(
214+
'Spanner multiplexedSessionOperationsRatio must be between ' +
215+
WorkerProxy.MIN_RATIO +
216+
' and ' +
217+
WorkerProxy.MAX_RATIO,
218+
);
219+
}
220+
}
221+
222+
// Setup the OpenTelemetry for tracing
223+
WorkerProxy.openTelemetrySdk = await WorkerProxy.setupOpenTelemetrySdk();
224+
225+
// Check if proto file exists
226+
if (!fs.existsSync(PROTO_PATH)) {
227+
throw new Error(`Proto file not found at ${PROTO_PATH}`);
228+
}
229+
230+
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
231+
keepCase: true,
232+
longs: String,
233+
enums: String,
234+
defaults: true,
235+
oneofs: true,
236+
includeDirs: [
237+
path.join(__dirname, '../../protos'),
238+
path.join(__dirname, '../../node_modules/google-proto-files'),
239+
path.join(__dirname, '../../node_modules/google-gax/build/protos'),
240+
],
241+
});
242+
const protoDescriptor = grpc.loadPackageDefinition(
243+
packageDefinition,
244+
) as any;
245+
const spannerExecutorProxy =
246+
protoDescriptor.google.spanner.executor.v1.SpannerExecutorProxy;
247+
248+
let server: grpc.Server;
249+
for (;;) {
250+
try {
251+
const cloudExecutorImpl = new CloudExecutorImpl(
252+
WorkerProxy.enableGrpcFaultInjector,
253+
WorkerProxy.multiplexedSessionOperationsRatio,
254+
);
255+
256+
server = new grpc.Server();
257+
server.addService(
258+
spannerExecutorProxy.service,
259+
cloudExecutorImpl as any,
260+
);
261+
262+
const healthImpl = new HealthImplementation({
263+
'': 'SERVING',
264+
});
265+
healthImpl.addToServer(server);
266+
const reflection = new ReflectionService(packageDefinition);
267+
reflection.addToServer(server);
268+
269+
const bindAddr = `0.0.0.0:${WorkerProxy.proxyPort}`;
270+
const port = await new Promise<number>((resolve, reject) => {
271+
server.bindAsync(
272+
bindAddr,
273+
grpc.ServerCredentials.createInsecure(),
274+
(err, port) => {
275+
if (err) {
276+
return reject(err);
277+
}
278+
resolve(port);
279+
},
280+
);
281+
});
282+
283+
console.info(`Server started on proxyPort: ${port}`);
284+
break;
285+
} catch (e) {
286+
console.warn(
287+
`Failed to start server on proxyPort ${WorkerProxy.proxyPort}`,
288+
e,
289+
);
290+
// Wait briefly before retrying to avoid tight loop
291+
await new Promise(resolve => setTimeout(resolve, 1000));
292+
}
293+
}
294+
295+
const shutdown = () => {
296+
// eslint-disable-next-line n/no-process-exit
297+
setTimeout(() => process.exit(1), 2000).unref();
298+
server.tryShutdown(() => {
299+
WorkerProxy.openTelemetrySdk
300+
.shutdown()
301+
.then(() => console.info('Tracing terminated'))
302+
.catch(error => console.error('Error terminating tracing', error))
303+
// eslint-disable-next-line n/no-process-exit
304+
.finally(() => process.exit(0));
305+
});
306+
};
307+
308+
process.on('SIGTERM', shutdown);
309+
process.on('SIGINT', shutdown);
310+
}
311+
}
312+
313+
if (require.main === module) {
314+
WorkerProxy.main(process.argv.slice(2)).catch(err => {
315+
console.error('Failed to start worker proxy:', err);
316+
throw err;
317+
});
318+
}

handwritten/spanner/package.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@
9898
"uuid": "^11.1.0"
9999
},
100100
"devDependencies": {
101+
"@google-cloud/opentelemetry-cloud-trace-exporter": "^3.0.0",
102+
"@grpc/reflection": "^1.0.4",
101103
"@opentelemetry/sdk-trace-base": "^2.0.0",
102104
"@opentelemetry/sdk-trace-node": "^2.0.0",
103105
"@types/concat-stream": "^2.0.3",
@@ -112,13 +114,15 @@
112114
"@types/request": "^2.48.12",
113115
"@types/sinon": "^21.0.0",
114116
"@types/through2": "^2.0.41",
117+
"@types/yargs": "^17.0.35",
115118
"binary-search-bounds": "^2.0.5",
116119
"c8": "^10.1.3",
117120
"codecov": "^3.8.3",
118121
"concat-stream": "^2.0.0",
119122
"dedent": "^1.5.3",
120123
"execa": "^5.0.0",
121124
"gapic-tools": "^1.0.1",
125+
"grpc-health-check": "^2.1.0",
122126
"gts": "^6.0.2",
123127
"jsdoc": "^4.0.4",
124128
"jsdoc-fresh": "^5.0.0",
@@ -140,5 +144,8 @@
140144
"typescript": "^5.8.2",
141145
"yargs": "^17.7.2"
142146
},
147+
"overrides": {
148+
"@opentelemetry/resources": "^1.8.0"
149+
},
143150
"homepage": "https://github.com/googleapis/google-cloud-node/tree/main/handwritten/spanner"
144-
}
151+
}

0 commit comments

Comments
 (0)