Skip to content

Tiddy Gossipsub API and comments #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 38 additions & 70 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Registrar notifies an established connection with pubsub protocol
*/
protected async onPeerConnected(peerId: PeerId, conn: Connection): Promise<void> {
private async onPeerConnected(peerId: PeerId, conn: Connection): Promise<void> {
this.log('connected %s %s', peerId.toB58String(), conn.stat.direction)

try {
Expand All @@ -583,7 +583,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Registrar notifies a closing connection with pubsub protocol
*/
protected onPeerDisconnected(peerId: PeerId, err?: Error): void {
private onPeerDisconnected(peerId: PeerId, err?: Error): void {
const idB58Str = peerId.toB58String()

this.log('connection ended', idB58Str, err)
Expand Down Expand Up @@ -716,7 +716,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Responsible for processing each RPC message received by other peers.
*/
async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8Array | BufferList>): Promise<void> {
private async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8Array | BufferList>): Promise<void> {
try {
await pipe(stream, async (source) => {
for await (const data of source) {
Expand Down Expand Up @@ -759,7 +759,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Handles an rpc request from a peer
*/
async handleReceivedRpc(from: PeerId, rpc: IRPC): Promise<void> {
private async handleReceivedRpc(from: PeerId, rpc: IRPC): Promise<void> {
// Check if peer is graylisted in which case we ignore the event
if (!this.acceptFrom(from.toB58String())) {
this.log('received message from unacceptable peer %s', from.toB58String())
Expand Down Expand Up @@ -802,7 +802,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Handles a subscription change from a peer
*/
handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
if (subOpt.topicID == null) {
return
}
Expand Down Expand Up @@ -830,7 +830,7 @@ export default class Gossipsub extends EventEmitter {
* Handles a newly received message from an RPC.
* May forward to all peers in the mesh.
*/
async handleReceivedMessage(from: PeerId, rpcMsg: RPC.IMessage): Promise<void> {
private async handleReceivedMessage(from: PeerId, rpcMsg: RPC.IMessage): Promise<void> {
this.metrics?.onMsgRecvPreValidation(rpcMsg.topic)

const validationResult = await this.validateReceivedMessage(from, rpcMsg)
Expand Down Expand Up @@ -895,41 +895,14 @@ export default class Gossipsub extends EventEmitter {
}
}

// # Ethereum consensus message-id function
//
// ## phase0
//
// The message-id of a gossipsub message MUST be the following 20 byte value computed from the message data:
//
// - If message.data has a valid snappy decompression, set message-id to the first 20 bytes of the SHA256 hash of
// the concatenation of MESSAGE_DOMAIN_VALID_SNAPPY with the snappy decompressed message data,
// i.e. SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20].
//
// - Otherwise, set message-id to the first 20 bytes of the SHA256 hash of the concatenation of
// MESSAGE_DOMAIN_INVALID_SNAPPY with the raw message data,
// i.e. SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + message.data)[:20].
//
// ## altair
//
// The derivation of the message-id has changed starting with Altair to incorporate the message topic along with the
// message data. These are fields of the Message Protobuf, and interpreted as empty byte strings if missing. The
// message-id MUST be the following 20 byte value computed from the message:
//
// - If message.data has a valid snappy decompression, set message-id to the first 20 bytes of the SHA256 hash of
// the concatenation of the following data: MESSAGE_DOMAIN_VALID_SNAPPY, the length of the topic byte string
// (encoded as little-endian uint64), the topic byte string, and the snappy decompressed message data:
// i.e. SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + uint_to_bytes(uint64(len(message.topic))) + message.topic + snappy_decompress(message.data))[:20].
//
// - Otherwise, set message-id to the first 20 bytes of the SHA256 hash of the concatenation of the following data:
// MESSAGE_DOMAIN_INVALID_SNAPPY, the length of the topic byte string (encoded as little-endian uint64),the topic
// byte string, and the raw message data:
// i.e. SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + uint_to_bytes(uint64(len(message.topic))) + message.topic + message.data)[:20].

/**
* Handles a newly received message from an RPC.
* May forward to all peers in the mesh.
*/
async validateReceivedMessage(propagationSource: PeerId, rpcMsg: RPC.IMessage): Promise<ReceivedMessageResult> {
private async validateReceivedMessage(
propagationSource: PeerId,
rpcMsg: RPC.IMessage
): Promise<ReceivedMessageResult> {
this.metrics?.onMsgRecvPreValidation(rpcMsg.topic)

// Fast message ID stuff
Expand Down Expand Up @@ -1217,7 +1190,7 @@ export default class Gossipsub extends EventEmitter {
private async handleGraft(id: PeerIdStr, graft: RPC.IControlGraft[]): Promise<RPC.IControlPrune[]> {
const prune: TopicStr[] = []
const score = this.score.score(id)
const now = this._now()
const now = Date.now()
let doPX = this.opts.doPX

graft.forEach(({ topicID }) => {
Expand Down Expand Up @@ -1365,7 +1338,7 @@ export default class Gossipsub extends EventEmitter {
backoff = new Map()
this.backoff.set(topic, backoff)
}
const expire = this._now() + interval
const expire = Date.now() + interval
const existingExpire = backoff.get(id) ?? 0
if (existingExpire < expire) {
backoff.set(id, expire)
Expand All @@ -1391,7 +1364,7 @@ export default class Gossipsub extends EventEmitter {
return
}

const now = this._now()
const now = Date.now()
this.backoff.forEach((backoff, topic) => {
backoff.forEach((expire, id) => {
if (expire < now) {
Expand Down Expand Up @@ -1530,7 +1503,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Join topic
*/
join(topic: TopicStr): void {
private join(topic: TopicStr): void {
if (this.status.code !== GossipStatusCode.started) {
throw new Error('Gossipsub has not started')
}
Expand Down Expand Up @@ -1598,7 +1571,7 @@ export default class Gossipsub extends EventEmitter {
/**
* Leave topic
*/
leave(topic: TopicStr): void {
private leave(topic: TopicStr): void {
if (this.status.code !== GossipStatusCode.started) {
throw new Error('Gossipsub has not started')
}
Expand Down Expand Up @@ -1743,7 +1716,7 @@ export default class Gossipsub extends EventEmitter {
}

// We are publishing to fanout peers - update the time we published
this.fanoutLastpub.set(topic, this._now())
this.fanoutLastpub.set(topic, Date.now())
}
}
}
Expand Down Expand Up @@ -1846,25 +1819,27 @@ export default class Gossipsub extends EventEmitter {
return tosend.size
}

/// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after
/// the message got validated by the caller. Messages are stored in the ['Memcache'] and
/// validation is expected to be fast enough that the messages should still exist in the cache.
/// There are three possible validation outcomes and the outcome is given in acceptance.
///
/// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
/// network. The `propagation_source` parameter indicates who the message was received by and
/// will not be forwarded back to that peer.
///
/// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
/// and the P₄ penalty will be applied to the `propagation_source`.
//
/// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
/// but no P₄ penalty will be applied.
///
/// This function will return true if the message was found in the cache and false if was not
/// in the cache anymore.
///
/// This should only be called once per message.
/**
* This function should be called when `asyncValidation` is `true` after
* the message got validated by the caller. Messages are stored in the `mcache` and
* validation is expected to be fast enough that the messages should still exist in the cache.
* There are three possible validation outcomes and the outcome is given in acceptance.
*
* If acceptance = `MessageAcceptance.Accept` the message will get propagated to the
* network. The `propagation_source` parameter indicates who the message was received by and
* will not be forwarded back to that peer.
*
* If acceptance = `MessageAcceptance.Reject` the message will be deleted from the memcache
* and the P₄ penalty will be applied to the `propagationSource`.
*
* If acceptance = `MessageAcceptance.Ignore` the message will be deleted from the memcache
* but no P₄ penalty will be applied.
*
* This function will return true if the message was found in the cache and false if was not
* in the cache anymore.
*
* This should only be called once per message.
*/
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: MessageAcceptance): void {
if (acceptance === MessageAcceptance.Accept) {
const cacheEntry = this.mcache.validate(msgId)
Expand Down Expand Up @@ -2110,13 +2085,6 @@ export default class Gossipsub extends EventEmitter {
this.gossip.set(id, gossip.concat(controlIHaveMsgs))
}

/**
* Returns the current time in milliseconds
*/
_now(): number {
return Date.now()
}

/**
* Make a PRUNE control message for a peer in a topic
*/
Expand Down Expand Up @@ -2410,7 +2378,7 @@ export default class Gossipsub extends EventEmitter {
})

// expire fanout for topics we haven't published to in a while
const now = this._now()
const now = Date.now()
this.fanoutLastpub.forEach((lastpb, topic) => {
if (lastpb + fanoutTTL < now) {
this.fanout.delete(topic)
Expand Down