Skip to content

Commit 17523e8

Browse files
committed
feat: emit standardized block broker events
So callers do not kneed to know how a node is configured in advance, emit standardized events for block broker operations.
1 parent 5ee3283 commit 17523e8

17 files changed

Lines changed: 555 additions & 107 deletions

File tree

packages/bitswap/src/bitswap.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { createBitswapSession } from './session.ts'
77
import { Stats } from './stats.ts'
88
import { WantList } from './want-list.ts'
99
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, WantListEntry, BitswapComponents, PeerWantListEntry } from './index.ts'
10-
import type { CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
10+
import type { BlockBrokerReceiveBlockProgressEvent, CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
1111
import type { ComponentLogger, Libp2p, PeerId, AbortOptions } from '@libp2p/interface'
1212
import type { Logger } from '@libp2p/logger'
1313
import type { Blockstore } from 'interface-blockstore'
@@ -96,7 +96,7 @@ export class Bitswap implements BitswapInterface {
9696
signal
9797
})
9898

99-
options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.sender }))
99+
options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.connection.remotePeer }))
100100

101101
return result.block
102102
} finally {

packages/bitswap/src/constants.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ export const BITSWAP_120 = '/ipfs/bitswap/1.2.0'
22
export const DEFAULT_MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024
33
export const DEFAULT_MAX_INBOUND_STREAMS = 1024
44
export const DEFAULT_MAX_OUTBOUND_STREAMS = 1024
5-
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 5000
6-
export const DEFAULT_MESSAGE_SEND_DELAY = 10
7-
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 5000
5+
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 10_000
6+
export const DEFAULT_WANTLIST_SEND_DEBOUNCE = 10
7+
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 10_000
88
export const DEFAULT_MESSAGE_SEND_CONCURRENCY = 50
99
export const DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS = false
1010
export const DEFAULT_SESSION_ROOT_PRIORITY = 1

packages/bitswap/src/index.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import { Bitswap as BitswapClass } from './bitswap.ts'
1010
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents, BitswapNetworkProgressEvents } from './network.ts'
1111
import type { WantType } from './pb/message.ts'
12-
import type { CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
12+
import type { Await, BlockBrokerGetBlockProgressEvents, CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
1313
import type { Routing } from '@helia/interface/routing'
1414
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
1515
import type { Blockstore } from 'interface-blockstore'
@@ -27,7 +27,8 @@ export type BitswapWantBlockProgressEvents =
2727
ProgressEvent<'bitswap:unwant', CID> |
2828
ProgressEvent<'bitswap:want', CID> |
2929
ProgressEvent<'bitswap:block', { cid: CID, sender: PeerId }> |
30-
BitswapNetworkWantProgressEvents
30+
BitswapNetworkWantProgressEvents |
31+
BlockBrokerGetBlockProgressEvents
3132

3233
export type { BitswapNetworkNotifyProgressEvents }
3334
export type { BitswapNetworkWantProgressEvents }
@@ -76,7 +77,7 @@ export interface Bitswap extends Startable {
7677
}
7778

7879
export interface MultihashHasherLoader {
79-
getHasher(codeOrName: number | string): Promise<MultihashHasher>
80+
getHasher(codeOrName: number | string): Await<MultihashHasher>
8081
}
8182

8283
export interface BitswapComponents {
@@ -85,6 +86,7 @@ export interface BitswapComponents {
8586
logger: ComponentLogger
8687
libp2p: Libp2p
8788
metrics?: Metrics
89+
hashLoader?: MultihashHasherLoader
8890
}
8991

9092
export interface BitswapOptions {
@@ -118,11 +120,6 @@ export interface BitswapOptions {
118120
*/
119121
runOnLimitedConnections?: boolean
120122

121-
/**
122-
* Enables loading esoteric hash functions
123-
*/
124-
hashLoader?: MultihashHasherLoader
125-
126123
/**
127124
* The protocol that we speak
128125
*
@@ -137,6 +134,21 @@ export interface BitswapOptions {
137134
*/
138135
messageSendConcurrency?: number
139136

137+
/**
138+
* Wantlist or blocks must be sent to peers within this many milliseconds
139+
*
140+
* @default 10_000
141+
*/
142+
messageSendTimeout?: number
143+
144+
/**
145+
* Incoming wantlist or block messages must be received from peers within this
146+
* many milliseconds
147+
*
148+
* @default 10_000
149+
*/
150+
messageReceiveTimeout?: number
151+
140152
/**
141153
* When sending blocks to peers, how many messages to send at once
142154
*
@@ -149,7 +161,7 @@ export interface BitswapOptions {
149161
* This is useful for preventing slow/large peer-connections from consuming
150162
* your bandwidth/streams.
151163
*
152-
* @default 10000
164+
* @default 10_000
153165
*/
154166
sendBlocksTimeout?: number
155167

@@ -162,6 +174,15 @@ export interface BitswapOptions {
162174
*/
163175
sendBlocksDebounce?: number
164176

177+
/**
178+
* When a want is added to the local wantlist, wait this many milliseconds
179+
* before sending the wantlist to connected or session peers in case more
180+
* wants are about to be added
181+
*
182+
* @default 10
183+
*/
184+
sendWantlistDebounce?: number
185+
165186
/**
166187
* If the client sends a want-have, and we have the corresponding block, we
167188
* check the size of the block and if it's small enough we send the block

packages/bitswap/src/network.ts

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { pushable } from 'it-pushable'
77
import take from 'it-take'
88
import { CustomProgressEvent } from 'progress-events'
99
import { raceEvent } from 'race-event'
10-
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
10+
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
1111
import { BitswapMessage } from './pb/message.ts'
1212
import { mergeMessages } from './utils/merge-messages.ts'
1313
import { splitMessage } from './utils/split-message.ts'
@@ -16,13 +16,14 @@ import type { MultihashHasherLoader } from './index.ts'
1616
import type { Block } from './pb/message.ts'
1717
import type { QueuedBitswapMessage } from './utils/bitswap-message.ts'
1818
import type { Provider, Routing, RoutingFindProvidersProgressEvents } from '@helia/interface/routing'
19-
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream } from '@libp2p/interface'
19+
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream, OpenConnectionProgressEvents, NewStreamProgressEvents } from '@libp2p/interface'
2020
import type { Logger } from '@libp2p/logger'
2121
import type { PeerQueueJobOptions } from '@libp2p/utils'
2222
import type { Multiaddr } from '@multiformats/multiaddr'
23-
import type { CID } from 'multiformats/cid'
23+
import { CID } from 'multiformats/cid'
2424
import type { ProgressEvent, ProgressOptions } from 'progress-events'
2525
import type { Uint8ArrayList } from 'uint8arraylist'
26+
import type { BlockBrokerGetBlockProgressEvents } from '@helia/interface'
2627

2728
export interface BitswapProvider {
2829
/**
@@ -47,15 +48,18 @@ export interface BitswapProvider {
4748
}
4849

4950
export type BitswapNetworkProgressEvents =
50-
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]>
51+
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]> |
52+
OpenConnectionProgressEvents |
53+
NewStreamProgressEvents
5154

5255
export type BitswapNetworkWantProgressEvents =
5356
ProgressEvent<'bitswap:send-wantlist', PeerId> |
5457
ProgressEvent<'bitswap:send-wantlist:error', { peer: PeerId, error: Error }> |
5558
ProgressEvent<'bitswap:find-providers', CID> |
5659
ProgressEvent<'bitswap:found-provider', BitswapProvider> |
5760
BitswapNetworkProgressEvents |
58-
RoutingFindProvidersProgressEvents
61+
RoutingFindProvidersProgressEvents |
62+
BlockBrokerGetBlockProgressEvents
5963

6064
export type BitswapNetworkNotifyProgressEvents =
6165
BitswapNetworkProgressEvents |
@@ -66,6 +70,7 @@ export interface NetworkInit {
6670
maxInboundStreams?: number
6771
maxOutboundStreams?: number
6872
messageReceiveTimeout?: number
73+
messageSendTimeout?: number
6974
messageSendConcurrency?: number
7075
protocols?: string[]
7176
runOnLimitedConnections?: boolean
@@ -81,12 +86,16 @@ export interface NetworkComponents {
8186
}
8287

8388
export interface BitswapMessageEventDetail {
89+
/**
90+
* @deprecated access the peer via connection.remotePeer instead
91+
*/
8492
peer: PeerId
8593
message: BitswapMessage
94+
connection: Connection
8695
}
8796

8897
export interface NetworkEvents {
89-
'bitswap:message': CustomEvent<{ peer: PeerId, message: BitswapMessage }>
98+
'bitswap:message': CustomEvent<BitswapMessageEventDetail>
9099
'peer:connected': CustomEvent<PeerId>
91100
'peer:disconnected': CustomEvent<PeerId>
92101
}
@@ -104,6 +113,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
104113
private readonly maxInboundStreams: number
105114
private readonly maxOutboundStreams: number
106115
private readonly messageReceiveTimeout: number
116+
private readonly messageSendTimeout: number
107117
private registrarIds: string[]
108118
private readonly metrics: { blocksSent?: Counter, dataSent?: Counter }
109119
private readonly sendQueue: PeerQueue<void, SendMessageJobOptions>
@@ -126,6 +136,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
126136
this.maxInboundStreams = init.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
127137
this.maxOutboundStreams = init.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
128138
this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT
139+
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
129140
this.runOnLimitedConnections = init.runOnLimitedConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
130141
this.maxIncomingMessageSize = init.maxIncomingMessageSize ?? DEFAULT_MAX_OUTGOING_MESSAGE_SIZE
131142
this.maxOutgoingMessageSize = init.maxOutgoingMessageSize ?? init.maxIncomingMessageSize ?? DEFAULT_MAX_INCOMING_MESSAGE_SIZE
@@ -245,10 +256,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
245256
const message = BitswapMessage.decode(data)
246257
this.log('incoming new bitswap %s message from %p on stream', stream.protocol, connection.remotePeer, stream.id)
247258

248-
this.safeDispatchEvent('bitswap:message', {
259+
this.safeDispatchEvent<BitswapMessageEventDetail>('bitswap:message', {
249260
detail: {
250261
peer: connection.remotePeer,
251-
message
262+
message,
263+
connection
252264
}
253265
})
254266

@@ -287,7 +299,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
287299
continue
288300
}
289301

290-
options?.onProgress?.(new CustomProgressEvent('bitswap:found-provider', {
302+
options?.onProgress?.(new CustomProgressEvent<BitswapProvider>('bitswap:found-provider', {
291303
type: 'bitswap',
292304
cid,
293305
provider,
@@ -325,8 +337,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
325337
}
326338

327339
/**
328-
* Connect to the given peer
329-
* Send the given msg (instance of Message) to the given peer
340+
* Connect to the specified peer and send the given message
330341
*/
331342
async sendMessage (peerId: PeerId, message: QueuedBitswapMessage, options?: AbortOptions & ProgressOptions<BitswapNetworkWantProgressEvents>): Promise<void> {
332343
if (!this.running) {
@@ -359,7 +370,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
359370
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:send-wantlist', peerId))
360371

361372
const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options)
362-
await stream.closeRead()
373+
await stream.closeRead(options)
363374

364375
try {
365376
for (const buf of splitMessage(message, this.maxOutgoingMessageSize)) {
@@ -377,8 +388,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
377388

378389
this._updateSentStats(message.blocks)
379390
}, {
391+
onProgress: options?.onProgress,
380392
peerId,
381-
signal: options?.signal,
393+
signal: options?.signal ?? AbortSignal.timeout(this.messageSendTimeout),
382394
message
383395
})
384396
}

packages/bitswap/src/peer-want-lists/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ export class PeerWantLists {
5656
})
5757

5858
this.network.addEventListener('bitswap:message', (evt) => {
59-
this.receiveMessage(evt.detail.peer, evt.detail.message)
59+
this.receiveMessage(evt.detail.connection.remotePeer, evt.detail.message)
6060
.catch(err => {
61-
this.log.error('error receiving bitswap message from %p - %e', evt.detail.peer, err)
61+
this.log.error('error receiving bitswap message from %p - %e', evt.detail.connection.remotePeer, err)
6262
})
6363
})
6464
this.network.addEventListener('peer:disconnected', evt => {

0 commit comments

Comments
 (0)