Skip to content

Commit 397f2d8

Browse files
authored
feat: emit standardized block broker events (#995)
So callers do not kneed to know how a node is configured in advance, emit standardized events for block broker operations.
1 parent 11802dd commit 397f2d8

36 files changed

Lines changed: 517 additions & 154 deletions

benchmarks/transfer/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"it-drain": "^3.0.1",
3131
"kubo": "^0.38.0",
3232
"kubo-rpc-client": "^5.0.0",
33-
"libp2p": "^3.0.6",
33+
"libp2p": "^3.2.0",
3434
"multiformats": "^13.1.0",
3535
"pretty-bytes": "^7.1.0",
3636
"uint8arrays": "^5.1.0"

benchmarks/transports/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"@ipld/dag-pb": "^4.1.0",
2121
"@libp2p/circuit-relay-v2": "^4.0.5",
2222
"@libp2p/identify": "^4.0.5",
23-
"@libp2p/interface": "^3.1.0",
23+
"@libp2p/interface": "^3.2.0",
2424
"@libp2p/logger": "^6.0.5",
2525
"@libp2p/tcp": "^11.0.5",
2626
"@libp2p/webrtc": "^6.0.6",
@@ -43,7 +43,7 @@
4343
"it-drain": "^3.0.7",
4444
"kubo": "^0.38.0",
4545
"kubo-rpc-client": "^5.0.0",
46-
"libp2p": "^3.0.6",
46+
"libp2p": "^3.2.0",
4747
"multiformats": "^13.1.0",
4848
"playwright-test": "^14.1.1",
4949
"pretty-bytes": "^7.1.0",

packages/bitswap/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"dependencies": {
5454
"@helia/interface": "^6.1.1",
5555
"@helia/utils": "^2.4.2",
56-
"@libp2p/interface": "^3.1.0",
56+
"@libp2p/interface": "^3.2.0",
5757
"@libp2p/logger": "^6.0.5",
5858
"@libp2p/peer-collections": "^7.0.5",
5959
"@libp2p/utils": "^7.0.5",
@@ -78,7 +78,7 @@
7878
},
7979
"devDependencies": {
8080
"@libp2p/crypto": "^5.1.12",
81-
"@libp2p/peer-id": "^6.0.3",
81+
"@libp2p/peer-id": "^6.0.6",
8282
"@types/sinon": "^21.0.0",
8383
"aegir": "^47.0.22",
8484
"blockstore-core": "^6.1.1",

packages/bitswap/src/bitswap.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export class Bitswap implements BitswapInterface {
9696
signal
9797
})
9898

99-
options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.sender }))
99+
options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.connection.remotePeer }))
100100

101101
return result.block
102102
} finally {

packages/bitswap/src/constants.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ export const BITSWAP_120 = '/ipfs/bitswap/1.2.0'
22
export const DEFAULT_MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024
33
export const DEFAULT_MAX_INBOUND_STREAMS = 1024
44
export const DEFAULT_MAX_OUTBOUND_STREAMS = 1024
5-
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 5000
6-
export const DEFAULT_MESSAGE_SEND_DELAY = 10
7-
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 5000
5+
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 10_000
6+
export const DEFAULT_WANTLIST_SEND_DEBOUNCE = 10
7+
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 10_000
88
export const DEFAULT_MESSAGE_SEND_CONCURRENCY = 50
99
export const DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS = false
1010
export const DEFAULT_SESSION_ROOT_PRIORITY = 1

packages/bitswap/src/index.ts

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
import { Bitswap as BitswapClass } from './bitswap.ts'
1010
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents, BitswapNetworkProgressEvents } from './network.ts'
1111
import type { WantType } from './pb/message.ts'
12-
import type { CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
12+
import type { BlockBrokerGetBlockProgressEvents, CreateSessionOptions, HasherLoader, ProviderOptions, SessionBlockBroker } from '@helia/interface'
1313
import type { Routing } from '@helia/interface/routing'
1414
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
1515
import type { Blockstore } from 'interface-blockstore'
1616
import type { CID } from 'multiformats/cid'
17-
import type { MultihashHasher } from 'multiformats/hashes/interface'
1817
import type { ProgressEvent, ProgressOptions } from 'progress-events'
1918

2019
export type BitswapWantProgressEvents =
@@ -27,7 +26,8 @@ export type BitswapWantBlockProgressEvents =
2726
ProgressEvent<'bitswap:unwant', CID> |
2827
ProgressEvent<'bitswap:want', CID> |
2928
ProgressEvent<'bitswap:block', { cid: CID, sender: PeerId }> |
30-
BitswapNetworkWantProgressEvents
29+
BitswapNetworkWantProgressEvents |
30+
BlockBrokerGetBlockProgressEvents
3131

3232
export type { BitswapNetworkNotifyProgressEvents }
3333
export type { BitswapNetworkWantProgressEvents }
@@ -75,15 +75,12 @@ export interface Bitswap extends Startable {
7575
createSession(options?: CreateSessionOptions<BitswapWantProgressEvents>): Required<Pick<SessionBlockBroker<BitswapWantProgressEvents>, 'retrieve' | 'addPeer'>>
7676
}
7777

78-
export interface MultihashHasherLoader {
79-
getHasher(codeOrName: number | string): Promise<MultihashHasher>
80-
}
81-
8278
export interface BitswapComponents {
8379
routing: Routing
8480
blockstore: Blockstore
8581
logger: ComponentLogger
8682
libp2p: Libp2p
83+
getHasher: HasherLoader
8784
metrics?: Metrics
8885
}
8986

@@ -118,11 +115,6 @@ export interface BitswapOptions {
118115
*/
119116
runOnLimitedConnections?: boolean
120117

121-
/**
122-
* Enables loading esoteric hash functions
123-
*/
124-
hashLoader?: MultihashHasherLoader
125-
126118
/**
127119
* The protocol that we speak
128120
*
@@ -137,6 +129,21 @@ export interface BitswapOptions {
137129
*/
138130
messageSendConcurrency?: number
139131

132+
/**
133+
* Wantlist or blocks must be sent to peers within this many milliseconds
134+
*
135+
* @default 10_000
136+
*/
137+
messageSendTimeout?: number
138+
139+
/**
140+
* Incoming wantlist or block messages must be received from peers within this
141+
* many milliseconds
142+
*
143+
* @default 10_000
144+
*/
145+
messageReceiveTimeout?: number
146+
140147
/**
141148
* When sending blocks to peers, how many messages to send at once
142149
*
@@ -149,7 +156,7 @@ export interface BitswapOptions {
149156
* This is useful for preventing slow/large peer-connections from consuming
150157
* your bandwidth/streams.
151158
*
152-
* @default 10000
159+
* @default 10_000
153160
*/
154161
sendBlocksTimeout?: number
155162

@@ -162,6 +169,15 @@ export interface BitswapOptions {
162169
*/
163170
sendBlocksDebounce?: number
164171

172+
/**
173+
* When a want is added to the local wantlist, wait this many milliseconds
174+
* before sending the wantlist to connected or session peers in case more
175+
* wants are about to be added
176+
*
177+
* @default 10
178+
*/
179+
sendWantlistDebounce?: number
180+
165181
/**
166182
* If the client sends a want-have, and we have the corresponding block, we
167183
* check the size of the block and if it's small enough we send the block

packages/bitswap/src/network.ts

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ import * as lp from 'it-length-prefixed'
55
import map from 'it-map'
66
import { pushable } from 'it-pushable'
77
import take from 'it-take'
8+
import { CID } from 'multiformats/cid'
89
import { CustomProgressEvent } from 'progress-events'
910
import { raceEvent } from 'race-event'
10-
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
11+
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
1112
import { BitswapMessage } from './pb/message.ts'
1213
import { mergeMessages } from './utils/merge-messages.ts'
1314
import { splitMessage } from './utils/split-message.ts'
1415
import type { WantOptions } from './bitswap.ts'
15-
import type { MultihashHasherLoader } from './index.ts'
1616
import type { Block } from './pb/message.ts'
1717
import type { QueuedBitswapMessage } from './utils/bitswap-message.ts'
18+
import type { BlockBrokerGetBlockProgressEvents } from '@helia/interface'
1819
import type { Provider, Routing, RoutingFindProvidersProgressEvents } from '@helia/interface/routing'
19-
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream } from '@libp2p/interface'
20+
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream, OpenConnectionProgressEvents, NewStreamProgressEvents } from '@libp2p/interface'
2021
import type { Logger } from '@libp2p/logger'
2122
import type { PeerQueueJobOptions } from '@libp2p/utils'
2223
import type { Multiaddr } from '@multiformats/multiaddr'
23-
import type { CID } from 'multiformats/cid'
2424
import type { ProgressEvent, ProgressOptions } from 'progress-events'
2525
import type { Uint8ArrayList } from 'uint8arraylist'
2626

@@ -47,25 +47,28 @@ export interface BitswapProvider {
4747
}
4848

4949
export type BitswapNetworkProgressEvents =
50-
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]>
50+
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]> |
51+
OpenConnectionProgressEvents |
52+
NewStreamProgressEvents
5153

5254
export type BitswapNetworkWantProgressEvents =
5355
ProgressEvent<'bitswap:send-wantlist', PeerId> |
5456
ProgressEvent<'bitswap:send-wantlist:error', { peer: PeerId, error: Error }> |
5557
ProgressEvent<'bitswap:find-providers', CID> |
5658
ProgressEvent<'bitswap:found-provider', BitswapProvider> |
5759
BitswapNetworkProgressEvents |
58-
RoutingFindProvidersProgressEvents
60+
RoutingFindProvidersProgressEvents |
61+
BlockBrokerGetBlockProgressEvents
5962

6063
export type BitswapNetworkNotifyProgressEvents =
6164
BitswapNetworkProgressEvents |
6265
ProgressEvent<'bitswap:send-block', PeerId>
6366

6467
export interface NetworkInit {
65-
hashLoader?: MultihashHasherLoader
6668
maxInboundStreams?: number
6769
maxOutboundStreams?: number
6870
messageReceiveTimeout?: number
71+
messageSendTimeout?: number
6972
messageSendConcurrency?: number
7073
protocols?: string[]
7174
runOnLimitedConnections?: boolean
@@ -81,12 +84,16 @@ export interface NetworkComponents {
8184
}
8285

8386
export interface BitswapMessageEventDetail {
87+
/**
88+
* @deprecated access the peer via connection.remotePeer instead
89+
*/
8490
peer: PeerId
8591
message: BitswapMessage
92+
connection: Connection
8693
}
8794

8895
export interface NetworkEvents {
89-
'bitswap:message': CustomEvent<{ peer: PeerId, message: BitswapMessage }>
96+
'bitswap:message': CustomEvent<BitswapMessageEventDetail>
9097
'peer:connected': CustomEvent<PeerId>
9198
'peer:disconnected': CustomEvent<PeerId>
9299
}
@@ -104,6 +111,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
104111
private readonly maxInboundStreams: number
105112
private readonly maxOutboundStreams: number
106113
private readonly messageReceiveTimeout: number
114+
private readonly messageSendTimeout: number
107115
private registrarIds: string[]
108116
private readonly metrics: { blocksSent?: Counter, dataSent?: Counter }
109117
private readonly sendQueue: PeerQueue<void, SendMessageJobOptions>
@@ -126,6 +134,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
126134
this.maxInboundStreams = init.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
127135
this.maxOutboundStreams = init.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
128136
this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT
137+
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
129138
this.runOnLimitedConnections = init.runOnLimitedConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
130139
this.maxIncomingMessageSize = init.maxIncomingMessageSize ?? DEFAULT_MAX_OUTGOING_MESSAGE_SIZE
131140
this.maxOutgoingMessageSize = init.maxOutgoingMessageSize ?? init.maxIncomingMessageSize ?? DEFAULT_MAX_INCOMING_MESSAGE_SIZE
@@ -245,10 +254,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
245254
const message = BitswapMessage.decode(data)
246255
this.log('incoming new bitswap %s message from %p on stream', stream.protocol, connection.remotePeer, stream.id)
247256

248-
this.safeDispatchEvent('bitswap:message', {
257+
this.safeDispatchEvent<BitswapMessageEventDetail>('bitswap:message', {
249258
detail: {
250259
peer: connection.remotePeer,
251-
message
260+
message,
261+
connection
252262
}
253263
})
254264

@@ -287,7 +297,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
287297
continue
288298
}
289299

290-
options?.onProgress?.(new CustomProgressEvent('bitswap:found-provider', {
300+
options?.onProgress?.(new CustomProgressEvent<BitswapProvider>('bitswap:found-provider', {
291301
type: 'bitswap',
292302
cid,
293303
provider,
@@ -325,8 +335,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
325335
}
326336

327337
/**
328-
* Connect to the given peer
329-
* Send the given msg (instance of Message) to the given peer
338+
* Connect to the specified peer and send the given message
330339
*/
331340
async sendMessage (peerId: PeerId, message: QueuedBitswapMessage, options?: AbortOptions & ProgressOptions<BitswapNetworkWantProgressEvents>): Promise<void> {
332341
if (!this.running) {
@@ -359,7 +368,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
359368
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:send-wantlist', peerId))
360369

361370
const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options)
362-
await stream.closeRead()
371+
await stream.closeRead(options)
363372

364373
try {
365374
for (const buf of splitMessage(message, this.maxOutgoingMessageSize)) {
@@ -377,8 +386,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
377386

378387
this._updateSentStats(message.blocks)
379388
}, {
389+
onProgress: options?.onProgress,
380390
peerId,
381-
signal: options?.signal,
391+
signal: options?.signal ?? AbortSignal.timeout(this.messageSendTimeout),
382392
message
383393
})
384394
}

packages/bitswap/src/peer-want-lists/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ export class PeerWantLists {
5656
})
5757

5858
this.network.addEventListener('bitswap:message', (evt) => {
59-
this.receiveMessage(evt.detail.peer, evt.detail.message)
59+
this.receiveMessage(evt.detail.connection.remotePeer, evt.detail.message)
6060
.catch(err => {
61-
this.log.error('error receiving bitswap message from %p - %e', evt.detail.peer, err)
61+
this.log.error('error receiving bitswap message from %p - %e', evt.detail.connection.remotePeer, err)
6262
})
6363
})
6464
this.network.addEventListener('peer:disconnected', evt => {

0 commit comments

Comments
 (0)