Skip to content

Commit 72bab55

Browse files
authored
Tiddy Gossipsub API and comments (#227)
1 parent 2e1dc69 commit 72bab55

File tree

1 file changed

+38
-70
lines changed

1 file changed

+38
-70
lines changed

ts/index.ts

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ export default class Gossipsub extends EventEmitter {
563563
/**
564564
* Registrar notifies an established connection with pubsub protocol
565565
*/
566-
protected async onPeerConnected(peerId: PeerId, conn: Connection): Promise<void> {
566+
private async onPeerConnected(peerId: PeerId, conn: Connection): Promise<void> {
567567
this.log('connected %s %s', peerId.toB58String(), conn.stat.direction)
568568

569569
try {
@@ -583,7 +583,7 @@ export default class Gossipsub extends EventEmitter {
583583
/**
584584
* Registrar notifies a closing connection with pubsub protocol
585585
*/
586-
protected onPeerDisconnected(peerId: PeerId, err?: Error): void {
586+
private onPeerDisconnected(peerId: PeerId, err?: Error): void {
587587
const idB58Str = peerId.toB58String()
588588

589589
this.log('connection ended', idB58Str, err)
@@ -716,7 +716,7 @@ export default class Gossipsub extends EventEmitter {
716716
/**
717717
* Responsible for processing each RPC message received by other peers.
718718
*/
719-
async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8Array | BufferList>): Promise<void> {
719+
private async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8Array | BufferList>): Promise<void> {
720720
try {
721721
await pipe(stream, async (source) => {
722722
for await (const data of source) {
@@ -759,7 +759,7 @@ export default class Gossipsub extends EventEmitter {
759759
/**
760760
* Handles an rpc request from a peer
761761
*/
762-
async handleReceivedRpc(from: PeerId, rpc: IRPC): Promise<void> {
762+
private async handleReceivedRpc(from: PeerId, rpc: IRPC): Promise<void> {
763763
// Check if peer is graylisted in which case we ignore the event
764764
if (!this.acceptFrom(from.toB58String())) {
765765
this.log('received message from unacceptable peer %s', from.toB58String())
@@ -802,7 +802,7 @@ export default class Gossipsub extends EventEmitter {
802802
/**
803803
* Handles a subscription change from a peer
804804
*/
805-
handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
805+
private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
806806
if (subOpt.topicID == null) {
807807
return
808808
}
@@ -830,7 +830,7 @@ export default class Gossipsub extends EventEmitter {
830830
* Handles a newly received message from an RPC.
831831
* May forward to all peers in the mesh.
832832
*/
833-
async handleReceivedMessage(from: PeerId, rpcMsg: RPC.IMessage): Promise<void> {
833+
private async handleReceivedMessage(from: PeerId, rpcMsg: RPC.IMessage): Promise<void> {
834834
this.metrics?.onMsgRecvPreValidation(rpcMsg.topic)
835835

836836
const validationResult = await this.validateReceivedMessage(from, rpcMsg)
@@ -895,41 +895,14 @@ export default class Gossipsub extends EventEmitter {
895895
}
896896
}
897897

898-
// # Ethereum consensus message-id function
899-
//
900-
// ## phase0
901-
//
902-
// The message-id of a gossipsub message MUST be the following 20 byte value computed from the message data:
903-
//
904-
// - If message.data has a valid snappy decompression, set message-id to the first 20 bytes of the SHA256 hash of
905-
// the concatenation of MESSAGE_DOMAIN_VALID_SNAPPY with the snappy decompressed message data,
906-
// i.e. SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20].
907-
//
908-
// - Otherwise, set message-id to the first 20 bytes of the SHA256 hash of the concatenation of
909-
// MESSAGE_DOMAIN_INVALID_SNAPPY with the raw message data,
910-
// i.e. SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + message.data)[:20].
911-
//
912-
// ## altair
913-
//
914-
// The derivation of the message-id has changed starting with Altair to incorporate the message topic along with the
915-
// message data. These are fields of the Message Protobuf, and interpreted as empty byte strings if missing. The
916-
// message-id MUST be the following 20 byte value computed from the message:
917-
//
918-
// - If message.data has a valid snappy decompression, set message-id to the first 20 bytes of the SHA256 hash of
919-
// the concatenation of the following data: MESSAGE_DOMAIN_VALID_SNAPPY, the length of the topic byte string
920-
// (encoded as little-endian uint64), the topic byte string, and the snappy decompressed message data:
921-
// i.e. SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + uint_to_bytes(uint64(len(message.topic))) + message.topic + snappy_decompress(message.data))[:20].
922-
//
923-
// - Otherwise, set message-id to the first 20 bytes of the SHA256 hash of the concatenation of the following data:
924-
// MESSAGE_DOMAIN_INVALID_SNAPPY, the length of the topic byte string (encoded as little-endian uint64),the topic
925-
// byte string, and the raw message data:
926-
// i.e. SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + uint_to_bytes(uint64(len(message.topic))) + message.topic + message.data)[:20].
927-
928898
/**
929899
* Handles a newly received message from an RPC.
930900
* May forward to all peers in the mesh.
931901
*/
932-
async validateReceivedMessage(propagationSource: PeerId, rpcMsg: RPC.IMessage): Promise<ReceivedMessageResult> {
902+
private async validateReceivedMessage(
903+
propagationSource: PeerId,
904+
rpcMsg: RPC.IMessage
905+
): Promise<ReceivedMessageResult> {
933906
this.metrics?.onMsgRecvPreValidation(rpcMsg.topic)
934907

935908
// Fast message ID stuff
@@ -1217,7 +1190,7 @@ export default class Gossipsub extends EventEmitter {
12171190
private async handleGraft(id: PeerIdStr, graft: RPC.IControlGraft[]): Promise<RPC.IControlPrune[]> {
12181191
const prune: TopicStr[] = []
12191192
const score = this.score.score(id)
1220-
const now = this._now()
1193+
const now = Date.now()
12211194
let doPX = this.opts.doPX
12221195

12231196
graft.forEach(({ topicID }) => {
@@ -1365,7 +1338,7 @@ export default class Gossipsub extends EventEmitter {
13651338
backoff = new Map()
13661339
this.backoff.set(topic, backoff)
13671340
}
1368-
const expire = this._now() + interval
1341+
const expire = Date.now() + interval
13691342
const existingExpire = backoff.get(id) ?? 0
13701343
if (existingExpire < expire) {
13711344
backoff.set(id, expire)
@@ -1391,7 +1364,7 @@ export default class Gossipsub extends EventEmitter {
13911364
return
13921365
}
13931366

1394-
const now = this._now()
1367+
const now = Date.now()
13951368
this.backoff.forEach((backoff, topic) => {
13961369
backoff.forEach((expire, id) => {
13971370
if (expire < now) {
@@ -1530,7 +1503,7 @@ export default class Gossipsub extends EventEmitter {
15301503
/**
15311504
* Join topic
15321505
*/
1533-
join(topic: TopicStr): void {
1506+
private join(topic: TopicStr): void {
15341507
if (this.status.code !== GossipStatusCode.started) {
15351508
throw new Error('Gossipsub has not started')
15361509
}
@@ -1598,7 +1571,7 @@ export default class Gossipsub extends EventEmitter {
15981571
/**
15991572
* Leave topic
16001573
*/
1601-
leave(topic: TopicStr): void {
1574+
private leave(topic: TopicStr): void {
16021575
if (this.status.code !== GossipStatusCode.started) {
16031576
throw new Error('Gossipsub has not started')
16041577
}
@@ -1743,7 +1716,7 @@ export default class Gossipsub extends EventEmitter {
17431716
}
17441717

17451718
// We are publishing to fanout peers - update the time we published
1746-
this.fanoutLastpub.set(topic, this._now())
1719+
this.fanoutLastpub.set(topic, Date.now())
17471720
}
17481721
}
17491722
}
@@ -1846,25 +1819,27 @@ export default class Gossipsub extends EventEmitter {
18461819
return tosend.size
18471820
}
18481821

1849-
/// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after
1850-
/// the message got validated by the caller. Messages are stored in the ['Memcache'] and
1851-
/// validation is expected to be fast enough that the messages should still exist in the cache.
1852-
/// There are three possible validation outcomes and the outcome is given in acceptance.
1853-
///
1854-
/// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
1855-
/// network. The `propagation_source` parameter indicates who the message was received by and
1856-
/// will not be forwarded back to that peer.
1857-
///
1858-
/// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
1859-
/// and the P₄ penalty will be applied to the `propagation_source`.
1860-
//
1861-
/// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
1862-
/// but no P₄ penalty will be applied.
1863-
///
1864-
/// This function will return true if the message was found in the cache and false if was not
1865-
/// in the cache anymore.
1866-
///
1867-
/// This should only be called once per message.
1822+
/**
1823+
* This function should be called when `asyncValidation` is `true` after
1824+
* the message got validated by the caller. Messages are stored in the `mcache` and
1825+
* validation is expected to be fast enough that the messages should still exist in the cache.
1826+
* There are three possible validation outcomes and the outcome is given in acceptance.
1827+
*
1828+
* If acceptance = `MessageAcceptance.Accept` the message will get propagated to the
1829+
* network. The `propagation_source` parameter indicates who the message was received by and
1830+
* will not be forwarded back to that peer.
1831+
*
1832+
* If acceptance = `MessageAcceptance.Reject` the message will be deleted from the memcache
1833+
* and the P₄ penalty will be applied to the `propagationSource`.
1834+
*
1835+
* If acceptance = `MessageAcceptance.Ignore` the message will be deleted from the memcache
1836+
* but no P₄ penalty will be applied.
1837+
*
1838+
* This function will return true if the message was found in the cache and false if was not
1839+
* in the cache anymore.
1840+
*
1841+
* This should only be called once per message.
1842+
*/
18681843
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: MessageAcceptance): void {
18691844
if (acceptance === MessageAcceptance.Accept) {
18701845
const cacheEntry = this.mcache.validate(msgId)
@@ -2110,13 +2085,6 @@ export default class Gossipsub extends EventEmitter {
21102085
this.gossip.set(id, gossip.concat(controlIHaveMsgs))
21112086
}
21122087

2113-
/**
2114-
* Returns the current time in milliseconds
2115-
*/
2116-
_now(): number {
2117-
return Date.now()
2118-
}
2119-
21202088
/**
21212089
* Make a PRUNE control message for a peer in a topic
21222090
*/
@@ -2410,7 +2378,7 @@ export default class Gossipsub extends EventEmitter {
24102378
})
24112379

24122380
// expire fanout for topics we haven't published to in a while
2413-
const now = this._now()
2381+
const now = Date.now()
24142382
this.fanoutLastpub.forEach((lastpb, topic) => {
24152383
if (lastpb + fanoutTTL < now) {
24162384
this.fanout.delete(topic)

0 commit comments

Comments
 (0)