6
6
7
7
import { inject , injectable , interfaces } from "inversify" ;
8
8
import { MessageBusIntegration } from "./messagebus-integration" ;
9
- import { Disposable , WorkspaceInstance , Queue , WorkspaceInstancePort , PortVisibility , RunningWorkspaceInfo , DisposableCollection } from "@gitpod/gitpod-protocol" ;
10
- import { WorkspaceStatus , WorkspacePhase , GetWorkspacesRequest , WorkspaceConditionBool , PortVisibility as WsManPortVisibility , PromisifiedWorkspaceManagerClient } from "@gitpod/ws-manager/lib" ;
9
+ import {
10
+ Disposable ,
11
+ WorkspaceInstance ,
12
+ Queue ,
13
+ WorkspaceInstancePort ,
14
+ PortVisibility ,
15
+ RunningWorkspaceInfo ,
16
+ DisposableCollection ,
17
+ } from "@gitpod/gitpod-protocol" ;
18
+ import {
19
+ WorkspaceStatus ,
20
+ WorkspacePhase ,
21
+ GetWorkspacesRequest ,
22
+ WorkspaceConditionBool ,
23
+ PortVisibility as WsManPortVisibility ,
24
+ PromisifiedWorkspaceManagerClient ,
25
+ } from "@gitpod/ws-manager/lib" ;
11
26
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db" ;
12
27
import { UserDB } from "@gitpod/gitpod-db/lib/user-db" ;
13
- import { log } from ' @gitpod/gitpod-protocol/lib/util/logging' ;
28
+ import { log } from " @gitpod/gitpod-protocol/lib/util/logging" ;
14
29
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing" ;
15
30
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics" ;
16
- import { TracedWorkspaceDB , TracedUserDB , DBWithTracing } from ' @gitpod/gitpod-db/lib/traced-db' ;
31
+ import { TracedWorkspaceDB , TracedUserDB , DBWithTracing } from " @gitpod/gitpod-db/lib/traced-db" ;
17
32
import { PrometheusMetricsExporter } from "./prometheus-metrics-exporter" ;
18
33
import { ClientProvider , WsmanSubscriber } from "./wsman-subscriber" ;
19
34
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb" ;
@@ -71,7 +86,7 @@ export class WorkspaceManagerBridge implements Disposable {
71
86
log . debug ( `Starting status update handler: ${ cluster . name } ` , logPayload ) ;
72
87
/* no await */ this . startStatusUpdateHandler ( clientProvider , writeToDB , logPayload )
73
88
// this is a mere safe-guard: we do not expect the code inside to fail
74
- . catch ( err => log . error ( "Cannot start status update handler" , err ) ) ;
89
+ . catch ( ( err ) => log . error ( "Cannot start status update handler" , err ) ) ;
75
90
} ;
76
91
77
92
if ( cluster . govern ) {
@@ -102,20 +117,41 @@ export class WorkspaceManagerBridge implements Disposable {
102
117
this . dispose ( ) ;
103
118
}
104
119
105
- protected async startStatusUpdateHandler ( clientProvider : ClientProvider , writeToDB : boolean , logPayload : { } ) : Promise < void > {
120
+ protected async startStatusUpdateHandler (
121
+ clientProvider : ClientProvider ,
122
+ writeToDB : boolean ,
123
+ logPayload : { } ,
124
+ ) : Promise < void > {
106
125
const subscriber = new WsmanSubscriber ( clientProvider ) ;
107
126
this . disposables . push ( subscriber ) ;
108
127
109
128
const onReconnect = ( ctx : TraceContext , s : WorkspaceStatus [ ] ) => {
110
- s . forEach ( sx => this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , sx , m => m . getId ( ) , ( ctx , msg ) => this . handleStatusUpdate ( ctx , msg , writeToDB ) ) )
129
+ s . forEach ( ( sx ) =>
130
+ this . serializeMessagesByInstanceId < WorkspaceStatus > (
131
+ ctx ,
132
+ sx ,
133
+ ( m ) => m . getId ( ) ,
134
+ ( ctx , msg ) => this . handleStatusUpdate ( ctx , msg , writeToDB ) ,
135
+ ) ,
136
+ ) ;
111
137
} ;
112
138
const onStatusUpdate = ( ctx : TraceContext , s : WorkspaceStatus ) => {
113
- this . serializeMessagesByInstanceId < WorkspaceStatus > ( ctx , s , msg => msg . getId ( ) , ( ctx , s ) => this . handleStatusUpdate ( ctx , s , writeToDB ) )
139
+ this . serializeMessagesByInstanceId < WorkspaceStatus > (
140
+ ctx ,
141
+ s ,
142
+ ( msg ) => msg . getId ( ) ,
143
+ ( ctx , s ) => this . handleStatusUpdate ( ctx , s , writeToDB ) ,
144
+ ) ;
114
145
} ;
115
146
await subscriber . subscribe ( { onReconnect, onStatusUpdate } , logPayload ) ;
116
147
}
117
148
118
- protected serializeMessagesByInstanceId < M > ( ctx : TraceContext , msg : M , getInstanceId : ( msg : M ) => string , handler : ( ctx : TraceContext , msg : M ) => Promise < void > ) {
149
+ protected serializeMessagesByInstanceId < M > (
150
+ ctx : TraceContext ,
151
+ msg : M ,
152
+ getInstanceId : ( msg : M ) => string ,
153
+ handler : ( ctx : TraceContext , msg : M ) => Promise < void > ,
154
+ ) {
119
155
const instanceId = getInstanceId ( msg ) ;
120
156
if ( ! instanceId ) {
121
157
log . warn ( "Received invalid message, could not read instanceId!" , { msg } ) ;
@@ -125,7 +161,7 @@ export class WorkspaceManagerBridge implements Disposable {
125
161
// We can't just handle the status update directly, but have to "serialize" it to ensure the updates stay in order.
126
162
// If we did not do this, the async nature of our code would allow for one message to overtake the other.
127
163
let q = this . queues . get ( instanceId ) || new Queue ( ) ;
128
- q . enqueue ( ( ) => handler ( ctx , msg ) ) . catch ( e => log . error ( { instanceId} , e ) ) ;
164
+ q . enqueue ( ( ) => handler ( ctx , msg ) ) . catch ( ( e ) => log . error ( { instanceId } , e ) ) ;
129
165
this . queues . set ( instanceId , q ) ;
130
166
}
131
167
@@ -149,17 +185,21 @@ export class WorkspaceManagerBridge implements Disposable {
149
185
const userId = status . metadata ! . owner ! ;
150
186
const logCtx = { instanceId, workspaceId, userId } ;
151
187
152
- const instance = await this . workspaceDB . trace ( { span} ) . findInstanceById ( instanceId ) ;
188
+ const instance = await this . workspaceDB . trace ( { span } ) . findInstanceById ( instanceId ) ;
153
189
if ( instance ) {
154
190
this . prometheusExporter . statusUpdateReceived ( this . cluster . name , true ) ;
155
191
} else {
192
+ // This scenario happens when the update for a WorkspaceInstance is picked up by a ws-manager-bridge in a different region,
193
+ // before db-sync finished running. This is because all ws-manager-bridge instances receive updates from all WorkspaceClusters.
194
+ // We ignore this update because we do not have anything to reconcile this update against, but also because we assume it is handled
195
+ // by another instance of ws-manager-bridge that is in the region where the WorkspaceInstance record was created.
156
196
this . prometheusExporter . statusUpdateReceived ( this . cluster . name , false ) ;
157
197
log . warn ( logCtx , "Received a status update for an unknown instance" , { status } ) ;
158
198
return ;
159
199
}
160
200
161
201
if ( ! ! status . spec . exposedPortsList ) {
162
- instance . status . exposedPorts = status . spec . exposedPortsList . map ( p => {
202
+ instance . status . exposedPorts = status . spec . exposedPortsList . map ( ( p ) => {
163
203
return < WorkspaceInstancePort > {
164
204
port : p . port ,
165
205
visibility : mapPortVisibility ( p . visibility ) ,
@@ -180,7 +220,9 @@ export class WorkspaceManagerBridge implements Disposable {
180
220
instance . status . conditions . pullingImages = toBool ( status . conditions . pullingImages ! ) ;
181
221
instance . status . conditions . deployed = toBool ( status . conditions . deployed ) ;
182
222
instance . status . conditions . timeout = status . conditions . timeout ;
183
- instance . status . conditions . firstUserActivity = mapFirstUserActivity ( rawStatus . getConditions ( ) ! . getFirstUserActivity ( ) ) ;
223
+ instance . status . conditions . firstUserActivity = mapFirstUserActivity (
224
+ rawStatus . getConditions ( ) ! . getFirstUserActivity ( ) ,
225
+ ) ;
184
226
instance . status . conditions . headlessTaskFailed = status . conditions . headlessTaskFailed ;
185
227
instance . status . conditions . stoppedByRequest = toBool ( status . conditions . stoppedByRequest ) ;
186
228
instance . status . message = status . message ;
@@ -191,7 +233,7 @@ export class WorkspaceManagerBridge implements Disposable {
191
233
192
234
if ( status . repo ) {
193
235
const r = status . repo ;
194
- const undefinedIfEmpty = < T > ( l : T [ ] ) => l . length > 0 ? l : undefined ;
236
+ const undefinedIfEmpty = < T > ( l : T [ ] ) => ( l . length > 0 ? l : undefined ) ;
195
237
196
238
instance . status . repo = {
197
239
branch : r . branch ,
@@ -201,8 +243,8 @@ export class WorkspaceManagerBridge implements Disposable {
201
243
unpushedCommits : undefinedIfEmpty ( r . unpushedCommitsList ) ,
202
244
totalUntrackedFiles : r . totalUntrackedFiles ,
203
245
untrackedFiles : undefinedIfEmpty ( r . untrackedFilesList ) ,
204
- totalUnpushedCommits : r . totalUnpushedCommits
205
- }
246
+ totalUnpushedCommits : r . totalUnpushedCommits ,
247
+ } ;
206
248
}
207
249
208
250
if ( instance . status . conditions . deployed && ! instance . deployedTime ) {
@@ -238,7 +280,7 @@ export class WorkspaceManagerBridge implements Disposable {
238
280
instance . status . phase = "interrupted" ;
239
281
break ;
240
282
case WorkspacePhase . STOPPING :
241
- if ( instance . status . phase != ' stopped' ) {
283
+ if ( instance . status . phase != " stopped" ) {
242
284
instance . status . phase = "stopping" ;
243
285
if ( ! instance . stoppingTime ) {
244
286
// The first time a workspace enters stopping we record that as it's stoppingTime time.
@@ -259,14 +301,14 @@ export class WorkspaceManagerBridge implements Disposable {
259
301
// yet. Just for this case we need to set it now.
260
302
instance . stoppingTime = now ;
261
303
}
262
- lifecycleHandler = ( ) => this . onInstanceStopped ( { span} , userId , instance ) ;
304
+ lifecycleHandler = ( ) => this . onInstanceStopped ( { span } , userId , instance ) ;
263
305
break ;
264
306
}
265
307
266
308
span . setTag ( "after" , JSON . stringify ( instance ) ) ;
267
309
268
310
// now notify all prebuild listeners about updates - and update DB if needed
269
- await this . updatePrebuiltWorkspace ( { span} , userId , status , writeToDB ) ;
311
+ await this . updatePrebuiltWorkspace ( { span } , userId , status , writeToDB ) ;
270
312
271
313
if ( writeToDB ) {
272
314
await this . workspaceDB . trace ( ctx ) . storeInstance ( instance ) ;
@@ -280,56 +322,78 @@ export class WorkspaceManagerBridge implements Disposable {
280
322
}
281
323
}
282
324
await this . messagebus . notifyOnInstanceUpdate ( ctx , userId , instance ) ;
283
-
284
325
} catch ( e ) {
285
- TraceContext . setError ( { span} , e ) ;
326
+ TraceContext . setError ( { span } , e ) ;
286
327
throw e ;
287
328
} finally {
288
329
span . finish ( ) ;
289
330
}
290
331
}
291
332
292
- protected startController ( clientProvider : ClientProvider , controllerIntervalSeconds : number , controllerMaxDisconnectSeconds : number , maxTimeToRunningPhaseSeconds = 60 * 60 ) {
333
+ protected startController (
334
+ clientProvider : ClientProvider ,
335
+ controllerIntervalSeconds : number ,
336
+ controllerMaxDisconnectSeconds : number ,
337
+ maxTimeToRunningPhaseSeconds = 60 * 60 ,
338
+ ) {
293
339
let disconnectStarted = Number . MAX_SAFE_INTEGER ;
294
340
this . disposables . push (
295
341
repeat ( async ( ) => {
296
342
try {
297
343
const client = await clientProvider ( ) ;
298
344
await this . controlInstallationInstances ( client , maxTimeToRunningPhaseSeconds ) ;
299
345
300
- disconnectStarted = Number . MAX_SAFE_INTEGER ; // Reset disconnect period
346
+ disconnectStarted = Number . MAX_SAFE_INTEGER ; // Reset disconnect period
301
347
} catch ( e ) {
302
348
if ( durationLongerThanSeconds ( disconnectStarted , controllerMaxDisconnectSeconds ) ) {
303
- log . warn ( "Error while controlling installation's workspaces" , e , { installation : this . cluster . name } ) ;
349
+ log . warn ( "Error while controlling installation's workspaces" , e , {
350
+ installation : this . cluster . name ,
351
+ } ) ;
304
352
} else if ( disconnectStarted > Date . now ( ) ) {
305
353
disconnectStarted = Date . now ( ) ;
306
354
}
307
355
}
308
- } , controllerIntervalSeconds * 1000 )
356
+ } , controllerIntervalSeconds * 1000 ) ,
309
357
) ;
310
358
}
311
359
312
- protected async controlInstallationInstances ( client : PromisifiedWorkspaceManagerClient , maxTimeToRunningPhaseSeconds : number ) {
360
+ protected async controlInstallationInstances (
361
+ client : PromisifiedWorkspaceManagerClient ,
362
+ maxTimeToRunningPhaseSeconds : number ,
363
+ ) {
313
364
const installation = this . cluster . name ;
314
365
log . debug ( "controlling instances" , { installation } ) ;
315
366
let ctx : TraceContext = { } ;
316
367
317
368
const runningInstances = await this . workspaceDB . trace ( ctx ) . findRunningInstancesWithWorkspaces ( installation ) ;
318
369
const runningInstancesIdx = new Map < string , RunningWorkspaceInfo > ( ) ;
319
- runningInstances . forEach ( i => runningInstancesIdx . set ( i . latestInstance . id , i ) ) ;
370
+ runningInstances . forEach ( ( i ) => runningInstancesIdx . set ( i . latestInstance . id , i ) ) ;
320
371
321
372
const actuallyRunningInstances = await client . getWorkspaces ( ctx , new GetWorkspacesRequest ( ) ) ;
322
- actuallyRunningInstances . getStatusList ( ) . forEach ( s => runningInstancesIdx . delete ( s . getId ( ) ) ) ;
373
+ actuallyRunningInstances . getStatusList ( ) . forEach ( ( s ) => runningInstancesIdx . delete ( s . getId ( ) ) ) ;
323
374
324
375
const promises : Promise < any > [ ] = [ ] ;
325
376
for ( const [ instanceId , ri ] of runningInstancesIdx . entries ( ) ) {
326
377
const instance = ri . latestInstance ;
327
- if ( ! ( instance . status . phase === 'running' || durationLongerThanSeconds ( Date . parse ( instance . creationTime ) , maxTimeToRunningPhaseSeconds ) ) ) {
328
- log . debug ( { instanceId } , "Skipping instance" , { phase : instance . status . phase , creationTime : instance . creationTime , region : instance . region } ) ;
378
+ if (
379
+ ! (
380
+ instance . status . phase === "running" ||
381
+ durationLongerThanSeconds ( Date . parse ( instance . creationTime ) , maxTimeToRunningPhaseSeconds )
382
+ )
383
+ ) {
384
+ log . debug ( { instanceId } , "Skipping instance" , {
385
+ phase : instance . status . phase ,
386
+ creationTime : instance . creationTime ,
387
+ region : instance . region ,
388
+ } ) ;
329
389
continue ;
330
390
}
331
391
332
- log . info ( { instanceId, workspaceId : instance . workspaceId } , "Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database." , { installation} ) ;
392
+ log . info (
393
+ { instanceId, workspaceId : instance . workspaceId } ,
394
+ "Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database." ,
395
+ { installation } ,
396
+ ) ;
333
397
instance . status . phase = "stopped" ;
334
398
instance . stoppingTime = new Date ( ) . toISOString ( ) ;
335
399
instance . stoppedTime = new Date ( ) . toISOString ( ) ;
@@ -344,27 +408,36 @@ export class WorkspaceManagerBridge implements Disposable {
344
408
// probes are an EE feature - we just need the hook here
345
409
}
346
410
347
- protected async updatePrebuiltWorkspace ( ctx : TraceContext , userId : string , status : WorkspaceStatus . AsObject , writeToDB : boolean ) {
411
+ protected async updatePrebuiltWorkspace (
412
+ ctx : TraceContext ,
413
+ userId : string ,
414
+ status : WorkspaceStatus . AsObject ,
415
+ writeToDB : boolean ,
416
+ ) {
348
417
// prebuilds are an EE feature - we just need the hook here
349
418
}
350
419
351
420
protected async stopPrebuildInstance ( ctx : TraceContext , instance : WorkspaceInstance ) : Promise < void > {
352
421
// prebuilds are an EE feature - we just need the hook here
353
422
}
354
423
355
- protected async onInstanceStopped ( ctx : TraceContext , ownerUserID : string , instance : WorkspaceInstance ) : Promise < void > {
424
+ protected async onInstanceStopped (
425
+ ctx : TraceContext ,
426
+ ownerUserID : string ,
427
+ instance : WorkspaceInstance ,
428
+ ) : Promise < void > {
356
429
const span = TraceContext . startSpan ( "onInstanceStopped" , ctx ) ;
357
430
358
431
try {
359
- await this . userDB . trace ( { span} ) . deleteGitpodTokensNamedLike ( ownerUserID , `${ instance . id } -%` ) ;
432
+ await this . userDB . trace ( { span } ) . deleteGitpodTokensNamedLike ( ownerUserID , `${ instance . id } -%` ) ;
360
433
this . analytics . track ( {
361
434
userId : ownerUserID ,
362
435
event : "workspace_stopped" ,
363
436
messageId : `bridge-wsstopped-${ instance . id } ` ,
364
- properties : { " instanceId" : instance . id , " workspaceId" : instance . workspaceId }
437
+ properties : { instanceId : instance . id , workspaceId : instance . workspaceId } ,
365
438
} ) ;
366
439
} catch ( err ) {
367
- TraceContext . setError ( { span} , err ) ;
440
+ TraceContext . setError ( { span } , err ) ;
368
441
throw err ;
369
442
} finally {
370
443
span . finish ( ) ;
@@ -374,7 +447,6 @@ export class WorkspaceManagerBridge implements Disposable {
374
447
public dispose ( ) {
375
448
this . disposables . dispose ( ) ;
376
449
}
377
-
378
450
}
379
451
380
452
const mapFirstUserActivity = ( firstUserActivity : Timestamp | undefined ) : string | undefined => {
@@ -413,4 +485,4 @@ const filterStatus = (status: WorkspaceStatus.AsObject): Partial<WorkspaceStatus
413
485
conditions : status . conditions ,
414
486
runtime : status . runtime ,
415
487
} ;
416
- }
488
+ } ;
0 commit comments