@@ -97,6 +97,8 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
97
97
kadBucketEvents : CounterGroup < 'ping_old_contact' | 'ping_old_contact_error' | 'ping_new_contact' | 'ping_new_contact_error' | 'peer_added' | 'peer_removed' >
98
98
}
99
99
100
+ private shutdownController : AbortController
101
+
100
102
constructor ( components : RoutingTableComponents , init : RoutingTableInit ) {
101
103
super ( )
102
104
@@ -114,6 +116,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
114
116
this . peerRemoved = this . peerRemoved . bind ( this )
115
117
this . populateFromDatastoreOnStart = init . populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START
116
118
this . populateFromDatastoreLimit = init . populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT
119
+ this . shutdownController = new AbortController ( )
117
120
118
121
this . pingOldContactQueue = new PeerQueue ( {
119
122
concurrency : init . pingOldContactConcurrency ?? PING_OLD_CONTACT_CONCURRENCY ,
@@ -185,49 +188,62 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
185
188
186
189
this . running = true
187
190
191
+ this . shutdownController = new AbortController ( )
188
192
await start ( this . closestPeerTagger , this . kb )
189
- await this . kb . addSelfPeer ( this . components . peerId )
190
193
}
191
194
192
195
async afterStart ( ) : Promise < void > {
196
+ let peerStorePeers = 0
197
+
193
198
// do this async to not block startup but iterate serially to not overwhelm
194
199
// the ping queue
195
200
Promise . resolve ( ) . then ( async ( ) => {
196
201
if ( ! this . populateFromDatastoreOnStart ) {
197
202
return
198
203
}
199
204
200
- let peerStorePeers = 0
201
-
202
- // add existing peers from the peer store to routing table
203
- for ( const peer of await this . components . peerStore . all ( {
204
- filters : [ ( peer ) => {
205
- return peer . protocols . includes ( this . protocol ) && peer . tags . has ( KAD_PEER_TAG_NAME )
206
- } ] ,
207
- limit : this . populateFromDatastoreLimit
208
- } ) ) {
209
- if ( ! this . running ) {
210
- // bail if we've been shut down
211
- return
212
- }
205
+ const signal = anySignal ( [
206
+ this . shutdownController . signal ,
207
+ AbortSignal . timeout ( 20_000 )
208
+ ] )
209
+ setMaxListeners ( Infinity , signal )
210
+
211
+ try {
212
+ // add existing peers from the peer store to routing table
213
+ for ( const peer of await this . components . peerStore . all ( {
214
+ filters : [ ( peer ) => {
215
+ return peer . protocols . includes ( this . protocol ) && peer . tags . has ( KAD_PEER_TAG_NAME )
216
+ } ] ,
217
+ limit : this . populateFromDatastoreLimit ,
218
+ signal
219
+ } ) ) {
220
+ if ( ! this . running ) {
221
+ // bail if we've been shut down
222
+ return
223
+ }
213
224
214
- try {
215
- await this . add ( peer . id )
216
- peerStorePeers ++
217
- } catch ( err ) {
218
- this . log ( 'failed to add peer %p to routing table, removing kad-dht peer tags - %e' )
219
- await this . components . peerStore . merge ( peer . id , {
220
- tags : {
221
- [ this . peerTagName ] : undefined
222
- }
223
- } )
225
+ try {
226
+ await this . add ( peer . id , {
227
+ signal
228
+ } )
229
+ peerStorePeers ++
230
+ } catch ( err ) {
231
+ this . log ( 'failed to add peer %p to routing table, removing kad-dht peer tags - %e' )
232
+ await this . components . peerStore . merge ( peer . id , {
233
+ tags : {
234
+ [ this . peerTagName ] : undefined
235
+ }
236
+ } )
237
+ }
224
238
}
239
+ } finally {
240
+ signal . clear ( )
225
241
}
226
242
227
243
this . log ( 'added %d peer store peers to the routing table' , peerStorePeers )
228
244
} )
229
245
. catch ( err => {
230
- this . log . error ( 'error adding peer store peers to the routing table %e' , err )
246
+ this . log . error ( 'error adding %d, peer store peers to the routing table - %e' , peerStorePeers , err )
231
247
} )
232
248
}
233
249
@@ -236,6 +252,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
236
252
await stop ( this . closestPeerTagger , this . kb )
237
253
this . pingOldContactQueue . abort ( )
238
254
this . pingNewContactQueue . abort ( )
255
+ this . shutdownController . abort ( )
239
256
}
240
257
241
258
private async peerAdded ( peer : Peer , bucket : LeafBucket , options ?: AbortOptions ) : Promise < void > {
@@ -310,7 +327,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
310
327
311
328
const result = await this . pingOldContactQueue . add ( async ( options ) => {
312
329
const signal = this . pingOldContactTimeout . getTimeoutSignal ( )
313
- const signals = anySignal ( [ signal , options ?. signal ] )
330
+ const signals = anySignal ( [
331
+ signal ,
332
+ this . shutdownController . signal ,
333
+ options ?. signal
334
+ ] )
314
335
setMaxListeners ( Infinity , signal , signals )
315
336
316
337
try {
@@ -342,7 +363,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
342
363
343
364
async verifyNewContact ( contact : Peer , options ?: AbortOptions ) : Promise < boolean > {
344
365
const signal = this . pingNewContactTimeout . getTimeoutSignal ( )
345
- const signals = anySignal ( [ signal , options ?. signal ] )
366
+ const signals = anySignal ( [
367
+ signal ,
368
+ this . shutdownController . signal ,
369
+ options ?. signal
370
+ ] )
346
371
setMaxListeners ( Infinity , signal , signals )
347
372
348
373
try {
0 commit comments