@@ -23,7 +23,6 @@ import { HostNameResolver } from '../channel'
23
23
import SingleConnectionProvider from './connection-provider-single'
24
24
import PooledConnectionProvider from './connection-provider-pooled'
25
25
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
26
- import { controller } from '../lang'
27
26
import {
28
27
createChannelConnection ,
29
28
ConnectionErrorHandler ,
@@ -76,13 +75,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
76
75
)
77
76
} )
78
77
79
- this . _updateRoutingTableTimeoutConfig = {
80
- timeout : this . _config . updateRoutingTableTimeout ,
81
- reason : ( ) => newError (
82
- `Routing table update timed out in ${ this . _config . updateRoutingTableTimeout } ms.`
83
- )
84
- }
85
-
86
78
this . _routingContext = { ...routingContext , address : address . toString ( ) }
87
79
this . _seedRouter = address
88
80
this . _rediscovery = new Rediscovery ( this . _routingContext )
@@ -151,66 +143,53 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
151
143
this . _handleAuthorizationExpired ( error , address , context . database )
152
144
)
153
145
154
- const refreshRoutingTableJob = {
155
- run : async ( _ , cancelationToken ) => {
156
- const routingTable = await this . _freshRoutingTable ( {
157
- accessMode,
158
- database : context . database ,
159
- bookmarks : bookmarks ,
160
- impersonatedUser,
161
- onDatabaseNameResolved : ( databaseName ) => {
162
- context . database = context . database || databaseName
163
- if ( onDatabaseNameResolved ) {
164
- onDatabaseNameResolved ( databaseName )
165
- }
166
- } ,
167
- cancelationToken
168
- } )
169
-
170
- // select a target server based on specified access mode
171
- if ( accessMode === READ ) {
172
- address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
173
- name = 'read'
174
- } else if ( accessMode === WRITE ) {
175
- address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
176
- name = 'write'
177
- } else {
178
- throw newError ( 'Illegal mode ' + accessMode )
179
- }
180
-
181
- // we couldn't select a target server
182
- if ( ! address ) {
183
- throw newError (
184
- `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
185
- SESSION_EXPIRED
186
- )
146
+ const routingTable = await this . _freshRoutingTable ( {
147
+ accessMode,
148
+ database : context . database ,
149
+ bookmarks : bookmarks ,
150
+ impersonatedUser,
151
+ onDatabaseNameResolved : ( databaseName ) => {
152
+ context . database = context . database || databaseName
153
+ if ( onDatabaseNameResolved ) {
154
+ onDatabaseNameResolved ( databaseName )
187
155
}
188
- return { routingTable, address }
189
156
}
190
- }
157
+ } )
191
158
192
- const acquireConnectionJob = {
193
- run : async ( { routingTable, address } ) => {
194
- try {
195
- const connection = await this . _acquireConnectionToServer (
196
- address ,
197
- name ,
198
- routingTable
199
- )
159
+ // select a target server based on specified access mode
160
+ if ( accessMode === READ ) {
161
+ address = this . _loadBalancingStrategy . selectReader ( routingTable . readers )
162
+ name = 'read'
163
+ } else if ( accessMode === WRITE ) {
164
+ address = this . _loadBalancingStrategy . selectWriter ( routingTable . writers )
165
+ name = 'write'
166
+ } else {
167
+ throw newError ( 'Illegal mode ' + accessMode )
168
+ }
200
169
201
- return new DelegateConnection ( connection , databaseSpecificErrorHandler )
202
- } catch ( error ) {
203
- const transformed = databaseSpecificErrorHandler . handleAndTransformError (
204
- error ,
205
- address
206
- )
207
- throw transformed
208
- }
209
- } ,
210
- onTimeout : connection => connection . _release ( )
170
+ // we couldn't select a target server
171
+ if ( ! address ) {
172
+ throw newError (
173
+ `Failed to obtain connection towards ${ name } server. Known routing table is: ${ routingTable } ` ,
174
+ SESSION_EXPIRED
175
+ )
211
176
}
212
177
213
- return controller . runWithTimeout ( this . _sessionConnectionTimeoutConfig , refreshRoutingTableJob , acquireConnectionJob )
178
+ try {
179
+ const connection = await this . _acquireConnectionToServer (
180
+ address ,
181
+ name ,
182
+ routingTable
183
+ )
184
+
185
+ return new DelegateConnection ( connection , databaseSpecificErrorHandler )
186
+ } catch ( error ) {
187
+ const transformed = databaseSpecificErrorHandler . handleAndTransformError (
188
+ error ,
189
+ address
190
+ )
191
+ throw transformed
192
+ }
214
193
}
215
194
216
195
async _hasProtocolVersion ( versionPredicate ) {
@@ -322,28 +301,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
322
301
return this . _connectionPool . acquire ( address )
323
302
}
324
303
325
- _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller . CancelationToken ( ( ) => false ) } = { } ) {
326
- const refreshRoutingTableJob = {
327
- run : ( _ , refreshCancelationToken ) => {
328
- const combinedCancelationToken = refreshCancelationToken . combine ( cancelationToken )
329
- const currentRoutingTable = this . _routingTableRegistry . get (
330
- database ,
331
- ( ) => new RoutingTable ( { database } )
332
- )
304
+ _freshRoutingTable ( { accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = { } ) {
305
+ const currentRoutingTable = this . _routingTableRegistry . get (
306
+ database ,
307
+ ( ) => new RoutingTable ( { database } )
308
+ )
333
309
334
- if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
335
- return currentRoutingTable
336
- }
337
- this . _log . info (
338
- `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
339
- )
340
- return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , combinedCancelationToken )
341
- }
310
+ if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
311
+ return currentRoutingTable
342
312
}
343
- return controller . runWithTimeout ( this . _updateRoutingTableTimeoutConfig , refreshRoutingTableJob )
313
+ this . _log . info (
314
+ `Routing table is stale for database: "${ database } " and access mode: "${ accessMode } ": ${ currentRoutingTable } `
315
+ )
316
+ return this . _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved )
344
317
}
345
318
346
- _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved , cancelationToken ) {
319
+ _refreshRoutingTable ( currentRoutingTable , bookmarks , impersonatedUser , onDatabaseNameResolved ) {
347
320
const knownRouters = currentRoutingTable . routers
348
321
349
322
if ( this . _useSeedRouter ) {
@@ -352,17 +325,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
352
325
currentRoutingTable ,
353
326
bookmarks ,
354
327
impersonatedUser ,
355
- onDatabaseNameResolved ,
356
- cancelationToken
328
+ onDatabaseNameResolved
357
329
)
358
330
}
359
331
return this . _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter (
360
332
knownRouters ,
361
333
currentRoutingTable ,
362
334
bookmarks ,
363
335
impersonatedUser ,
364
- onDatabaseNameResolved ,
365
- cancelationToken
336
+ onDatabaseNameResolved
366
337
)
367
338
}
368
339
@@ -371,8 +342,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
371
342
currentRoutingTable ,
372
343
bookmarks ,
373
344
impersonatedUser ,
374
- onDatabaseNameResolved ,
375
- cancelationToken
345
+ onDatabaseNameResolved
376
346
) {
377
347
// we start with seed router, no routers were probed before
378
348
const seenRouters = [ ]
@@ -381,8 +351,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
381
351
this . _seedRouter ,
382
352
currentRoutingTable ,
383
353
bookmarks ,
384
- impersonatedUser ,
385
- cancelationToken
354
+ impersonatedUser
386
355
)
387
356
388
357
if ( newRoutingTable ) {
@@ -393,8 +362,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
393
362
knownRouters ,
394
363
currentRoutingTable ,
395
364
bookmarks ,
396
- impersonatedUser ,
397
- cancelationToken
365
+ impersonatedUser
398
366
)
399
367
newRoutingTable = newRoutingTable2
400
368
error = error2 || error
@@ -413,15 +381,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
413
381
currentRoutingTable ,
414
382
bookmarks ,
415
383
impersonatedUser ,
416
- onDatabaseNameResolved ,
417
- cancelationToken
384
+ onDatabaseNameResolved
418
385
) {
419
386
let [ newRoutingTable , error ] = await this . _fetchRoutingTableUsingKnownRouters (
420
387
knownRouters ,
421
388
currentRoutingTable ,
422
389
bookmarks ,
423
- impersonatedUser ,
424
- cancelationToken
390
+ impersonatedUser
425
391
)
426
392
427
393
if ( ! newRoutingTable ) {
@@ -431,8 +397,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
431
397
this . _seedRouter ,
432
398
currentRoutingTable ,
433
399
bookmarks ,
434
- impersonatedUser ,
435
- cancelationToken
400
+ impersonatedUser
436
401
)
437
402
}
438
403
@@ -448,15 +413,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
448
413
knownRouters ,
449
414
currentRoutingTable ,
450
415
bookmarks ,
451
- impersonatedUser ,
452
- cancelationToken
416
+ impersonatedUser
453
417
) {
454
418
const [ newRoutingTable , error ] = await this . _fetchRoutingTable (
455
419
knownRouters ,
456
420
currentRoutingTable ,
457
421
bookmarks ,
458
- impersonatedUser ,
459
- cancelationToken
422
+ impersonatedUser
460
423
)
461
424
462
425
if ( newRoutingTable ) {
@@ -481,19 +444,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
481
444
seedRouter ,
482
445
routingTable ,
483
446
bookmarks ,
484
- impersonatedUser ,
485
- cancelationToken
447
+ impersonatedUser
486
448
) {
487
449
const resolvedAddresses = await this . _resolveSeedRouter ( seedRouter )
488
450
489
- cancelationToken . throwIfCancellationRequested ( )
490
-
491
451
// filter out all addresses that we've already tried
492
452
const newAddresses = resolvedAddresses . filter (
493
453
address => seenRouters . indexOf ( address ) < 0
494
454
)
495
455
496
- return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken )
456
+ return await this . _fetchRoutingTable ( newAddresses , routingTable , bookmarks , impersonatedUser )
497
457
}
498
458
499
459
async _resolveSeedRouter ( seedRouter ) {
@@ -505,7 +465,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
505
465
return [ ] . concat . apply ( [ ] , dnsResolvedAddresses )
506
466
}
507
467
508
- async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser , cancelationToken ) {
468
+ async _fetchRoutingTable ( routerAddresses , routingTable , bookmarks , impersonatedUser ) {
509
469
return routerAddresses . reduce (
510
470
async ( refreshedTablePromise , currentRouter , currentIndex ) => {
511
471
const [ newRoutingTable ] = await refreshedTablePromise
@@ -539,13 +499,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
539
499
impersonatedUser
540
500
) , null ]
541
501
} catch ( error ) {
542
- cancelationToken . throwIfCancellationRequested ( )
543
502
return this . _handleRediscoveryError ( error , currentRouter )
544
503
} finally {
545
- await session . close ( )
504
+ session . close ( )
546
505
}
547
506
} else {
548
- cancelationToken . throwIfCancellationRequested ( )
549
507
// unable to acquire connection and create session towards the current router
550
508
// return null to signal that the next router should be tried
551
509
return [ null , error ]
0 commit comments