Skip to content

Commit 8499ef4

Browse files
authored
fix: remove provider lock (#3169)
This creates resource contention and is not necessary.
1 parent 6a3ae02 commit 8499ef4

File tree

8 files changed

+46
-75
lines changed

8 files changed

+46
-75
lines changed

packages/kad-dht/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
"it-protobuf-stream": "^2.0.2",
6969
"it-pushable": "^3.2.3",
7070
"it-take": "^3.0.8",
71-
"mortice": "^3.0.6",
7271
"multiformats": "^13.3.6",
7372
"p-defer": "^4.0.1",
7473
"p-event": "^6.0.1",

packages/kad-dht/src/kad-dht.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { NotFoundError, TypedEventEmitter, contentRoutingSymbol, peerDiscoverySymbol, peerRoutingSymbol, serviceCapabilities, serviceDependencies, setMaxListeners, start, stop } from '@libp2p/interface'
22
import drain from 'it-drain'
3-
import createMortice from 'mortice'
43
import pDefer from 'p-defer'
54
import { ALPHA, ON_PEER_CONNECT_TIMEOUT, PROTOCOL } from './constants.js'
65
import { ContentFetching } from './content-fetching/index.js'
@@ -175,13 +174,10 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
175174
this.peerInfoMapper = init.peerInfoMapper ?? removePrivateAddressesMapper
176175
this.onPeerConnectTimeout = init.onPeerConnectTimeout ?? ON_PEER_CONNECT_TIMEOUT
177176

178-
const providerLock = createMortice()
179-
180177
this.providers = new Providers(components, {
181178
...init.providers,
182179
logPrefix,
183-
datastorePrefix,
184-
lock: providerLock
180+
datastorePrefix
185181
})
186182

187183
this.validators = {
@@ -292,7 +288,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
292288
metricsPrefix,
293289
datastorePrefix,
294290
contentRouting: this.contentRouting,
295-
lock: providerLock,
296291
operationMetrics
297292
})
298293

packages/kad-dht/src/providers.ts

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ import * as varint from 'uint8-varint'
33
import { parseProviderKey, readProviderTime, toProviderKey } from './utils.js'
44
import type { AbortOptions, ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface'
55
import type { Datastore } from 'interface-datastore'
6-
import type { Mortice } from 'mortice'
76
import type { CID } from 'multiformats'
87

98
export interface ProvidersInit {
109
logPrefix: string
1110
datastorePrefix: string
12-
lock: Mortice
1311
}
1412

1513
export interface ProvidersComponents {
@@ -29,59 +27,39 @@ export class Providers {
2927
private readonly log: Logger
3028
private readonly datastore: Datastore
3129
private readonly datastorePrefix: string
32-
private readonly lock: Mortice
3330

3431
constructor (components: ProvidersComponents, init: ProvidersInit) {
3532
this.log = components.logger.forComponent(`${init.logPrefix}:providers`)
3633
this.datastorePrefix = `${init.datastorePrefix}/provider`
3734
this.datastore = components.datastore
38-
this.lock = init.lock
3935
}
4036

4137
/**
4238
* Add a new provider for the given CID
4339
*/
4440
async addProvider (cid: CID, provider: PeerId, options?: AbortOptions): Promise<void> {
45-
const release = await this.lock.readLock(options)
46-
47-
try {
48-
this.log.trace('%p provides %s', provider, cid)
49-
await this.writeProviderEntry(cid, provider, options)
50-
} finally {
51-
release()
52-
}
41+
this.log.trace('%p provides %s', provider, cid)
42+
await this.writeProviderEntry(cid, provider, options)
5343
}
5444

5545
/**
5646
* Remove a provider for the given CID
5747
*/
5848
async removeProvider (cid: CID, provider: PeerId, options?: AbortOptions): Promise<void> {
59-
const release = await this.lock.writeLock(options)
60-
61-
try {
62-
const key = toProviderKey(this.datastorePrefix, cid, provider)
63-
this.log.trace('%p no longer provides %s', provider, cid)
64-
await this.datastore.delete(key)
65-
} finally {
66-
release()
67-
}
49+
const key = toProviderKey(this.datastorePrefix, cid, provider)
50+
this.log.trace('%p no longer provides %s', provider, cid)
51+
await this.datastore.delete(key, options)
6852
}
6953

7054
/**
7155
* Get a list of providers for the given CID
7256
*/
7357
async getProviders (cid: CID, options?: AbortOptions): Promise<PeerId[]> {
74-
const release = await this.lock.readLock(options)
75-
76-
try {
77-
this.log.trace('get providers for %c', cid)
78-
const provs = await this.loadProviders(cid, options)
79-
this.log.trace('got %d providers for %c', provs.size, cid)
58+
this.log.trace('get providers for %c', cid)
59+
const provs = await this.loadProviders(cid, options)
60+
this.log.trace('got %d providers for %c', provs.size, cid)
8061

81-
return [...provs.keys()]
82-
} finally {
83-
release()
84-
}
62+
return [...provs.keys()]
8563
}
8664

8765
/**

packages/kad-dht/src/reprovider.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import type { AbortOptions, ComponentLogger, Logger, Metrics, PeerId } from '@li
1010
import type { AddressManager } from '@libp2p/interface-internal'
1111
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'
1212
import type { Datastore } from 'interface-datastore'
13-
import type { Mortice } from 'mortice'
1413
import type { CID } from 'multiformats/cid'
1514

1615
export interface ReproviderComponents {
@@ -26,7 +25,6 @@ export interface ReproviderInit {
2625
metricsPrefix: string
2726
datastorePrefix: string
2827
contentRouting: ContentRouting
29-
lock: Mortice
3028
operationMetrics: OperationMetrics
3129
concurrency?: number
3230
maxQueueSize?: number
@@ -60,7 +58,6 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
6058
private readonly addressManager: AddressManager
6159
private readonly validity: number
6260
private readonly interval: number
63-
private readonly lock: Mortice
6461
private readonly peerId: PeerId
6562

6663
constructor (components: ReproviderComponents, init: ReproviderInit) {
@@ -86,7 +83,6 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
8683
this.validity = init.validity ?? PROVIDERS_VALIDITY
8784
this.interval = init.interval ?? REPROVIDE_INTERVAL
8885
this.contentRouting = init.contentRouting
89-
this.lock = init.lock
9086
this.running = false
9187

9288
this.reprovide = timeOperationMethod(this.reprovide.bind(this), init.operationMetrics, 'PROVIDE')
@@ -123,15 +119,13 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
123119
* them if the provider is us and the expiry is within the reprovide window.
124120
*/
125121
private async cleanUp (options?: AbortOptions): Promise<void> {
126-
const release = await this.lock.writeLock(options)
127-
128122
try {
129123
this.safeDispatchEvent('reprovide:start')
130124

131125
// Get all provider entries from the datastore
132126
for await (const entry of this.datastore.query({
133127
prefix: this.datastorePrefix
134-
})) {
128+
}, options)) {
135129
try {
136130
// Add a delete to the batch for each expired entry
137131
const { cid, peerId } = parseProviderKey(entry.key)
@@ -144,7 +138,7 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
144138

145139
// delete the record if it has expired
146140
if (expired) {
147-
await this.datastore.delete(entry.key)
141+
await this.datastore.delete(entry.key, options)
148142
}
149143

150144
// if the provider is us and we are within the reprovide threshold,
@@ -162,7 +156,6 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
162156

163157
this.log('reprovide/cleanup successful')
164158
} finally {
165-
release()
166159
this.safeDispatchEvent('reprovide:end')
167160

168161
if (this.running) {

packages/kad-dht/src/rpc/index.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import type { GetProvidersHandlerComponents } from './handlers/get-providers.js'
1515
import type { GetValueHandlerComponents } from './handlers/get-value.js'
1616
import type { PutValueHandlerComponents } from './handlers/put-value.js'
1717
import type { RoutingTable } from '../routing-table/index.js'
18-
import type { CounterGroup, Logger, Metrics, PeerId, IncomingStreamData } from '@libp2p/interface'
18+
import type { CounterGroup, Logger, Metrics, PeerId, IncomingStreamData, MetricGroup } from '@libp2p/interface'
1919

2020
export interface DHTMessageHandler {
2121
handle(peerId: PeerId, msg: Message): Promise<Message | undefined>
@@ -43,14 +43,16 @@ export class RPC {
4343
private readonly metrics: {
4444
operations?: CounterGroup
4545
errors?: CounterGroup
46+
rpcTime?: MetricGroup
4647
}
4748

4849
private readonly incomingMessageTimeout: number
4950

5051
constructor (components: RPCComponents, init: RPCInit) {
5152
this.metrics = {
5253
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_requests_total`),
53-
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_errors_total`)
54+
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_errors_total`),
55+
rpcTime: components.metrics?.registerMetricGroup(`${init.metricsPrefix}_inbound_rpc_time_seconds`, { label: 'operation' })
5456
}
5557

5658
this.log = components.logger.forComponent(`${init.logPrefix}:rpc`)
@@ -114,15 +116,30 @@ export class RPC {
114116
signal
115117
})
116118

117-
// handle the message
118-
this.log('incoming %s from %p', message.type, connection.remotePeer)
119-
const res = await this.handleMessage(connection.remotePeer, message)
120-
121-
// Not all handlers will return a response
122-
if (res != null) {
123-
await messages.write(res, {
124-
signal
125-
})
119+
const stopSuccessTimer = this.metrics?.rpcTime?.timer(message.type.toString())
120+
const stopErrorTimer = this.metrics?.rpcTime?.timer(message.type.toString())
121+
let errored = false
122+
123+
try {
124+
// handle the message
125+
this.log('incoming %s from %p', message.type, connection.remotePeer)
126+
const res = await this.handleMessage(connection.remotePeer, message)
127+
128+
// Not all handlers will return a response
129+
if (res != null) {
130+
await messages.write(res, {
131+
signal
132+
})
133+
}
134+
} catch (err) {
135+
errored = true
136+
stopErrorTimer?.()
137+
138+
throw err
139+
} finally {
140+
if (!errored) {
141+
stopSuccessTimer?.()
142+
}
126143
}
127144

128145
// we have received a message so reset the timeout controller to

packages/kad-dht/test/providers.spec.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { defaultLogger } from '@libp2p/logger'
44
import { peerIdFromString } from '@libp2p/peer-id'
55
import { expect } from 'aegir/chai'
66
import { MemoryDatastore } from 'datastore-core/memory'
7-
import createMortice from 'mortice'
87
import { CID } from 'multiformats/cid'
98
import { Providers } from '../src/providers.js'
109
import { createPeerIdsWithPrivateKey } from './utils/create-peer-id.js'
@@ -24,8 +23,7 @@ describe('providers', () => {
2423
logger: defaultLogger()
2524
}, {
2625
logPrefix: '',
27-
datastorePrefix: '/dht',
28-
lock: createMortice()
26+
datastorePrefix: '/dht'
2927
})
3028

3129
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
@@ -46,8 +44,7 @@ describe('providers', () => {
4644
logger: defaultLogger()
4745
}, {
4846
logPrefix: '',
49-
datastorePrefix: '/dht',
50-
lock: createMortice()
47+
datastorePrefix: '/dht'
5148
})
5249

5350
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
@@ -72,8 +69,7 @@ describe('providers', () => {
7269
logger: defaultLogger()
7370
}, {
7471
logPrefix: '',
75-
datastorePrefix: '/dht',
76-
lock: createMortice()
72+
datastorePrefix: '/dht'
7773
})
7874

7975
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
@@ -101,8 +97,7 @@ describe('providers', () => {
10197
logger: defaultLogger()
10298
}, {
10399
logPrefix: '',
104-
datastorePrefix: '/dht',
105-
lock: createMortice()
100+
datastorePrefix: '/dht'
106101
})
107102

108103
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')

packages/kad-dht/test/reprovider.spec.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { defaultLogger } from '@libp2p/logger'
33
import { expect } from 'aegir/chai'
44
import { MemoryDatastore } from 'datastore-core'
55
import delay from 'delay'
6-
import createMortice from 'mortice'
76
import { CID } from 'multiformats/cid'
87
import { pEvent } from 'p-event'
98
import { stubInterface } from 'sinon-ts'
@@ -45,22 +44,19 @@ describe('reprovider', () => {
4544
contentRouting = stubInterface()
4645
contentRouting.provide.resolves([])
4746

48-
const lock = createMortice()
4947
const logPrefix = 'libp2p'
5048
const datastorePrefix = '/dht'
5149
const metricsPrefix = ''
5250

5351
providers = new Providers(components, {
5452
logPrefix,
55-
datastorePrefix,
56-
lock
53+
datastorePrefix
5754
})
5855

5956
reprovider = new Reprovider(components, {
6057
logPrefix,
6158
datastorePrefix,
6259
metricsPrefix,
63-
lock,
6460
contentRouting,
6561
threshold: 100,
6662
validity: 200,

packages/kad-dht/test/rpc/handlers/add-provider.spec.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { persistentPeerStore } from '@libp2p/peer-store'
66
import { multiaddr } from '@multiformats/multiaddr'
77
import { expect } from 'aegir/chai'
88
import { MemoryDatastore } from 'datastore-core'
9-
import createMortice from 'mortice'
109
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
1110
import { MessageType } from '../../../src/message/dht.js'
1211
import { Providers } from '../../../src/providers.js'
@@ -47,8 +46,7 @@ describe('rpc - handlers - AddProvider', () => {
4746
logger: defaultLogger()
4847
}, {
4948
logPrefix: '',
50-
datastorePrefix: '/dht',
51-
lock: createMortice()
49+
datastorePrefix: '/dht'
5250
})
5351

5452
handler = new AddProviderHandler({

0 commit comments

Comments
 (0)