Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/transfer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"it-drain": "^3.0.1",
"kubo": "^0.38.0",
"kubo-rpc-client": "^5.0.0",
"libp2p": "^3.0.6",
"libp2p": "^3.2.0",
"multiformats": "^13.1.0",
"pretty-bytes": "^7.1.0",
"uint8arrays": "^5.1.0"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/transports/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"@ipld/dag-pb": "^4.1.0",
"@libp2p/circuit-relay-v2": "^4.0.5",
"@libp2p/identify": "^4.0.5",
"@libp2p/interface": "^3.1.0",
"@libp2p/interface": "^3.2.0",
"@libp2p/logger": "^6.0.5",
"@libp2p/tcp": "^11.0.5",
"@libp2p/webrtc": "^6.0.6",
Expand All @@ -43,7 +43,7 @@
"it-drain": "^3.0.7",
"kubo": "^0.38.0",
"kubo-rpc-client": "^5.0.0",
"libp2p": "^3.0.6",
"libp2p": "^3.2.0",
"multiformats": "^13.1.0",
"playwright-test": "^14.1.1",
"pretty-bytes": "^7.1.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/bitswap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"dependencies": {
"@helia/interface": "^6.1.1",
"@helia/utils": "^2.4.2",
"@libp2p/interface": "^3.1.0",
"@libp2p/interface": "^3.2.0",
"@libp2p/logger": "^6.0.5",
"@libp2p/peer-collections": "^7.0.5",
"@libp2p/utils": "^7.0.5",
Expand All @@ -78,7 +78,7 @@
},
"devDependencies": {
"@libp2p/crypto": "^5.1.12",
"@libp2p/peer-id": "^6.0.3",
"@libp2p/peer-id": "^6.0.6",
"@types/sinon": "^21.0.0",
"aegir": "^47.0.22",
"blockstore-core": "^6.1.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class Bitswap implements BitswapInterface {
signal
})

options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.sender }))
options.onProgress?.(new CustomProgressEvent<{ cid: CID, sender: PeerId }>('bitswap:block', { cid, sender: result.connection.remotePeer }))

return result.block
} finally {
Expand Down
6 changes: 3 additions & 3 deletions packages/bitswap/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ export const BITSWAP_120 = '/ipfs/bitswap/1.2.0'
export const DEFAULT_MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024
export const DEFAULT_MAX_INBOUND_STREAMS = 1024
export const DEFAULT_MAX_OUTBOUND_STREAMS = 1024
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 5000
export const DEFAULT_MESSAGE_SEND_DELAY = 10
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 5000
export const DEFAULT_MESSAGE_RECEIVE_TIMEOUT = 10_000
export const DEFAULT_WANTLIST_SEND_DEBOUNCE = 10
export const DEFAULT_MESSAGE_SEND_TIMEOUT = 10_000
export const DEFAULT_MESSAGE_SEND_CONCURRENCY = 50
export const DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS = false
export const DEFAULT_SESSION_ROOT_PRIORITY = 1
Expand Down
42 changes: 29 additions & 13 deletions packages/bitswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
import { Bitswap as BitswapClass } from './bitswap.ts'
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents, BitswapNetworkProgressEvents } from './network.ts'
import type { WantType } from './pb/message.ts'
import type { CreateSessionOptions, ProviderOptions, SessionBlockBroker } from '@helia/interface'
import type { BlockBrokerGetBlockProgressEvents, CreateSessionOptions, HasherLoader, ProviderOptions, SessionBlockBroker } from '@helia/interface'
import type { Routing } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type BitswapWantProgressEvents =
Expand All @@ -27,7 +26,8 @@ export type BitswapWantBlockProgressEvents =
ProgressEvent<'bitswap:unwant', CID> |
ProgressEvent<'bitswap:want', CID> |
ProgressEvent<'bitswap:block', { cid: CID, sender: PeerId }> |
BitswapNetworkWantProgressEvents
BitswapNetworkWantProgressEvents |
BlockBrokerGetBlockProgressEvents

export type { BitswapNetworkNotifyProgressEvents }
export type { BitswapNetworkWantProgressEvents }
Expand Down Expand Up @@ -75,15 +75,12 @@ export interface Bitswap extends Startable {
createSession(options?: CreateSessionOptions<BitswapWantProgressEvents>): Required<Pick<SessionBlockBroker<BitswapWantProgressEvents>, 'retrieve' | 'addPeer'>>
}

export interface MultihashHasherLoader {
getHasher(codeOrName: number | string): Promise<MultihashHasher>
}

export interface BitswapComponents {
routing: Routing
blockstore: Blockstore
logger: ComponentLogger
libp2p: Libp2p
getHasher: HasherLoader
metrics?: Metrics
}

Expand Down Expand Up @@ -118,11 +115,6 @@ export interface BitswapOptions {
*/
runOnLimitedConnections?: boolean

/**
* Enables loading esoteric hash functions
*/
hashLoader?: MultihashHasherLoader

/**
* The protocol that we speak
*
Expand All @@ -137,6 +129,21 @@ export interface BitswapOptions {
*/
messageSendConcurrency?: number

/**
* Wantlist or blocks must be sent to peers within this many milliseconds
*
* @default 10_000
*/
messageSendTimeout?: number

/**
* Incoming wantlist or block messages must be received from peers within this
* many milliseconds
*
* @default 10_000
*/
messageReceiveTimeout?: number

/**
* When sending blocks to peers, how many messages to send at once
*
Expand All @@ -149,7 +156,7 @@ export interface BitswapOptions {
* This is useful for preventing slow/large peer-connections from consuming
* your bandwidth/streams.
*
* @default 10000
* @default 10_000
*/
sendBlocksTimeout?: number

Expand All @@ -162,6 +169,15 @@ export interface BitswapOptions {
*/
sendBlocksDebounce?: number

/**
* When a want is added to the local wantlist, wait this many milliseconds
* before sending the wantlist to connected or session peers in case more
* wants are about to be added
*
* @default 10
*/
sendWantlistDebounce?: number

/**
* If the client sends a want-have, and we have the corresponding block, we
* check the size of the block and if it's small enough we send the block
Expand Down
40 changes: 25 additions & 15 deletions packages/bitswap/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ import * as lp from 'it-length-prefixed'
import map from 'it-map'
import { pushable } from 'it-pushable'
import take from 'it-take'
import { CID } from 'multiformats/cid'
import { CustomProgressEvent } from 'progress-events'
import { raceEvent } from 'race-event'
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts'
import { BitswapMessage } from './pb/message.ts'
import { mergeMessages } from './utils/merge-messages.ts'
import { splitMessage } from './utils/split-message.ts'
import type { WantOptions } from './bitswap.ts'
import type { MultihashHasherLoader } from './index.ts'
import type { Block } from './pb/message.ts'
import type { QueuedBitswapMessage } from './utils/bitswap-message.ts'
import type { BlockBrokerGetBlockProgressEvents } from '@helia/interface'
import type { Provider, Routing, RoutingFindProvidersProgressEvents } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream } from '@libp2p/interface'
import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream, OpenConnectionProgressEvents, NewStreamProgressEvents } from '@libp2p/interface'
import type { Logger } from '@libp2p/logger'
import type { PeerQueueJobOptions } from '@libp2p/utils'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand All @@ -47,25 +47,28 @@ export interface BitswapProvider {
}

export type BitswapNetworkProgressEvents =
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]>
ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]> |
OpenConnectionProgressEvents |
NewStreamProgressEvents

export type BitswapNetworkWantProgressEvents =
ProgressEvent<'bitswap:send-wantlist', PeerId> |
ProgressEvent<'bitswap:send-wantlist:error', { peer: PeerId, error: Error }> |
ProgressEvent<'bitswap:find-providers', CID> |
ProgressEvent<'bitswap:found-provider', BitswapProvider> |
BitswapNetworkProgressEvents |
RoutingFindProvidersProgressEvents
RoutingFindProvidersProgressEvents |
BlockBrokerGetBlockProgressEvents

export type BitswapNetworkNotifyProgressEvents =
BitswapNetworkProgressEvents |
ProgressEvent<'bitswap:send-block', PeerId>

export interface NetworkInit {
hashLoader?: MultihashHasherLoader
maxInboundStreams?: number
maxOutboundStreams?: number
messageReceiveTimeout?: number
messageSendTimeout?: number
messageSendConcurrency?: number
protocols?: string[]
runOnLimitedConnections?: boolean
Expand All @@ -81,12 +84,16 @@ export interface NetworkComponents {
}

export interface BitswapMessageEventDetail {
/**
* @deprecated access the peer via connection.remotePeer instead
*/
peer: PeerId
message: BitswapMessage
connection: Connection
}

export interface NetworkEvents {
'bitswap:message': CustomEvent<{ peer: PeerId, message: BitswapMessage }>
'bitswap:message': CustomEvent<BitswapMessageEventDetail>
'peer:connected': CustomEvent<PeerId>
'peer:disconnected': CustomEvent<PeerId>
}
Expand All @@ -104,6 +111,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
private readonly maxInboundStreams: number
private readonly maxOutboundStreams: number
private readonly messageReceiveTimeout: number
private readonly messageSendTimeout: number
private registrarIds: string[]
private readonly metrics: { blocksSent?: Counter, dataSent?: Counter }
private readonly sendQueue: PeerQueue<void, SendMessageJobOptions>
Expand All @@ -126,6 +134,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
this.maxInboundStreams = init.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
this.maxOutboundStreams = init.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
this.runOnLimitedConnections = init.runOnLimitedConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
this.maxIncomingMessageSize = init.maxIncomingMessageSize ?? DEFAULT_MAX_OUTGOING_MESSAGE_SIZE
this.maxOutgoingMessageSize = init.maxOutgoingMessageSize ?? init.maxIncomingMessageSize ?? DEFAULT_MAX_INCOMING_MESSAGE_SIZE
Expand Down Expand Up @@ -245,10 +254,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
const message = BitswapMessage.decode(data)
this.log('incoming new bitswap %s message from %p on stream', stream.protocol, connection.remotePeer, stream.id)

this.safeDispatchEvent('bitswap:message', {
this.safeDispatchEvent<BitswapMessageEventDetail>('bitswap:message', {
detail: {
peer: connection.remotePeer,
message
message,
connection
}
})

Expand Down Expand Up @@ -287,7 +297,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
continue
}

options?.onProgress?.(new CustomProgressEvent('bitswap:found-provider', {
options?.onProgress?.(new CustomProgressEvent<BitswapProvider>('bitswap:found-provider', {
type: 'bitswap',
cid,
provider,
Expand Down Expand Up @@ -325,8 +335,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
}

/**
* Connect to the given peer
* Send the given msg (instance of Message) to the given peer
* Connect to the specified peer and send the given message
*/
async sendMessage (peerId: PeerId, message: QueuedBitswapMessage, options?: AbortOptions & ProgressOptions<BitswapNetworkWantProgressEvents>): Promise<void> {
if (!this.running) {
Expand Down Expand Up @@ -359,7 +368,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:send-wantlist', peerId))

const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options)
await stream.closeRead()
await stream.closeRead(options)

try {
for (const buf of splitMessage(message, this.maxOutgoingMessageSize)) {
Expand All @@ -377,8 +386,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {

this._updateSentStats(message.blocks)
}, {
onProgress: options?.onProgress,
peerId,
signal: options?.signal,
signal: options?.signal ?? AbortSignal.timeout(this.messageSendTimeout),
message
})
}
Expand Down
4 changes: 2 additions & 2 deletions packages/bitswap/src/peer-want-lists/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ export class PeerWantLists {
})

this.network.addEventListener('bitswap:message', (evt) => {
this.receiveMessage(evt.detail.peer, evt.detail.message)
this.receiveMessage(evt.detail.connection.remotePeer, evt.detail.message)
.catch(err => {
this.log.error('error receiving bitswap message from %p - %e', evt.detail.peer, err)
this.log.error('error receiving bitswap message from %p - %e', evt.detail.connection.remotePeer, err)
})
})
this.network.addEventListener('peer:disconnected', evt => {
Expand Down
Loading
Loading