Skip to content

Commit 1ba3fc4

Browse files
Sending control extension message to remote peers (#442)
1 parent eff9dd4 commit 1ba3fc4

8 files changed

Lines changed: 824 additions & 56 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ abstract class AbstractRouter(
185185
processControl(msg.control, peer)
186186
}
187187

188-
if (protocol.supportsExtensions()) {
188+
// TODO we need to handle the existence of extension messages more generically (https://github.com/libp2p/jvm-libp2p/issues/441)
189+
190+
if (protocol.supportsExtensions() && (msg.hasTestExtension() || msg.hasPartial())) {
189191
processExtensions(msg, peer)
190192
}
191193

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.libp2p.pubsub.PubsubApiImpl
1212
import io.libp2p.pubsub.PubsubProtocol
1313
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
1414
import io.netty.channel.ChannelHandler
15+
import org.slf4j.LoggerFactory
1516
import java.util.concurrent.CompletableFuture
1617

1718
class Gossip @JvmOverloads constructor(
@@ -21,6 +22,8 @@ class Gossip @JvmOverloads constructor(
2122
) :
2223
ProtocolBinding<Unit>, ConnectionHandler, PubsubApi by api {
2324

25+
private val logger = LoggerFactory.getLogger(Gossip::class.java)
26+
2427
fun updateTopicScoreParams(scoreParams: Map<String, GossipTopicScoreParams>) {
2528
router.score.updateTopicParams(scoreParams)
2629
}
@@ -62,6 +65,7 @@ class Gossip @JvmOverloads constructor(
6265
}
6366

6467
override fun initChannel(ch: P2PChannel, selectedProtocol: String): CompletableFuture<out Unit> {
68+
logger.trace("Gossip initChannel - selected protocol: {}", selectedProtocol)
6569
router.addPeerWithDebugHandler(ch as Stream, debugGossipHandler)
6670
return CompletableFuture.completedFuture(Unit)
6771
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.libp2p.pubsub.gossip
2+
3+
import io.libp2p.core.PeerId
4+
import pubsub.pb.Rpc
5+
6+
class GossipExtensionsState {
7+
8+
/*
9+
Tracks the peers that we have already sent a control extensions message
10+
*/
11+
private val outgoingControlExtensionsMsgPeers: MutableSet<PeerId> = mutableSetOf()
12+
13+
/*
14+
Tracks peers that already sent us a control extensions message
15+
*/
16+
private val peerExtensionSupportMap: MutableMap<PeerId, Rpc.ControlExtensions> = mutableMapOf()
17+
18+
fun onPeerDisconnected(peer: PeerId) {
19+
outgoingControlExtensionsMsgPeers.remove(peer)
20+
peerExtensionSupportMap.remove(peer)
21+
}
22+
23+
fun onControlExtensionsMessage(ctrlExtensions: Rpc.ControlExtensions, receivedFrom: PeerId) {
24+
peerExtensionSupportMap[receivedFrom] = ctrlExtensions
25+
}
26+
27+
fun registerControlExtensionMessageSentToPeers(peerId: PeerId) {
28+
outgoingControlExtensionsMsgPeers.add(peerId)
29+
}
30+
31+
fun peerSupportedExtensions(peerId: PeerId) = peerExtensionSupportMap[peerId]
32+
33+
fun hasReceivedControlExtensionsFrom(peer: PeerId) =
34+
peerExtensionSupportMap.contains(peer)
35+
36+
fun hasSentControlExtensionsTo(peer: PeerId) =
37+
outgoingControlExtensionsMsgPeers.contains(peer)
38+
}

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ open class GossipRouter(
132132
private val acceptRequestsWhitelist = mutableMapOf<PeerHandler, AcceptRequestsWhitelistEntry>()
133133
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }
134134

135-
private val peerExtensionSupportMap = mutableMapOf<PeerId, Rpc.ControlExtensions>()
135+
val gossipExtensionsState = GossipExtensionsState()
136136

137137
private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
138138
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
@@ -159,13 +159,15 @@ open class GossipRouter(
159159
fanout.values.forEach { it.remove(peer) }
160160
acceptRequestsWhitelist -= peer
161161
pendingRpcParts.popQueue(peer) // discard them
162+
gossipExtensionsState.onPeerDisconnected(peer.peerId)
162163
super.onPeerDisconnected(peer)
163164
}
164165

165166
override fun onPeerActive(peer: PeerHandler) {
166167
super.onPeerActive(peer)
167168
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
168169
heartbeatTask.hashCode() // force lazy initialization
170+
sendControlExtensions(peer)
169171
}
170172

171173
override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {
@@ -398,34 +400,56 @@ open class GossipRouter(
398400
) {
399401
logger.trace("Received control extension {}", ctrlExtensions.toString())
400402

401-
if (peerExtensionSupportMap[receivedFrom.peerId] != null) {
402-
// TODO Should downscore peers that send control extension multiple times? (https://github.com/libp2p/jvm-libp2p/issues/437)
403+
if (gossipExtensionsState.hasReceivedControlExtensionsFrom(receivedFrom.peerId)) {
404+
// TODO Should disconnect peers that send control extension multiple times (https://github.com/libp2p/jvm-libp2p/issues/437)
403405
logger.trace(
404406
"Received another control extension message from peer {}",
405407
receivedFrom.peerId
406408
)
407409
return
408410
} else {
409-
peerExtensionSupportMap[receivedFrom.peerId] = ctrlExtensions
411+
gossipExtensionsState.onControlExtensionsMessage(ctrlExtensions, receivedFrom.peerId)
410412
}
411413
}
412414

413415
override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
414-
val peerSupportedExtensions = peerExtensionSupportMap[receivedFrom.peerId]
415-
if (peerSupportedExtensions == null) {
416+
val peerSupportedExtensions =
417+
gossipExtensionsState.peerSupportedExtensions(receivedFrom.peerId)
418+
419+
// TODO Revisit this logic as part of adding feature flags (https://github.com/libp2p/jvm-libp2p/issues/441)
420+
421+
when {
422+
msg.hasTestExtension() && checkPeerExtensionSupport(
423+
peerSupportedExtensions,
424+
Rpc.ControlExtensions::hasTestExtension
425+
) ->
426+
processTestExtensionMessage(msg.testExtension, receivedFrom)
427+
428+
msg.hasPartial() && checkPeerExtensionSupport(
429+
peerSupportedExtensions,
430+
Rpc.ControlExtensions::hasPartialMessages
431+
) ->
432+
processPartialMessageExtension(msg.partial, receivedFrom)
433+
}
434+
}
435+
436+
private fun checkPeerExtensionSupport(
437+
peerSavedPreferences: Rpc.ControlExtensions?,
438+
checkSupportFunction: (Rpc.ControlExtensions) -> Boolean
439+
): Boolean {
440+
if (peerSavedPreferences == null) {
441+
return false
442+
}
443+
444+
if (!checkSupportFunction.invoke(peerSavedPreferences)) {
416445
logger.trace(
417-
"Ignoring extension messages from peer {} - did it send an extension control message?",
418-
receivedFrom.peerId
446+
"Ignoring extension messages from peer {} - did it send an control extensions message?",
447+
peerSavedPreferences
419448
)
420-
} else {
421-
when {
422-
peerSupportedExtensions.hasTestExtension() && msg.hasTestExtension() ->
423-
processTestExtensionMessage(msg.testExtension, receivedFrom)
424-
425-
peerSupportedExtensions.hasPartialMessages() && msg.hasPartial() ->
426-
processPartialMessageExtension(msg.partial, receivedFrom)
427-
}
449+
return false
428450
}
451+
452+
return true
429453
}
430454

431455
private fun processTestExtensionMessage(
@@ -578,6 +602,8 @@ open class GossipRouter(
578602
fanout -= topic
579603
lastPublished -= topic
580604
}
605+
606+
activePeers.forEach { sendControlExtensions(it) }
581607
}
582608

583609
override fun unsubscribe(topic: Topic) {
@@ -778,6 +804,33 @@ open class GossipRouter(
778804
send(peer, iDontWant)
779805
}
780806

807+
private fun sendControlExtensions(peer: PeerHandler) {
808+
if (!this.protocol.supportsExtensions()) {
809+
logger.trace(
810+
"Protocol does not support extensions. Won't send control extensions message."
811+
)
812+
return
813+
}
814+
815+
if (gossipExtensionsState.hasSentControlExtensionsTo(peer.peerId)) {
816+
logger.trace(
817+
"Already sent control extensions msg to peer {}. Won't send another one.",
818+
peer.peerId
819+
)
820+
return
821+
}
822+
823+
logger.trace("Sending control extensions message to peer {}", peer.peerId)
824+
825+
pendingRpcParts.getQueue(peer).addControlExtensions(
826+
Rpc.ControlExtensions.newBuilder()
827+
.setTestExtension(true)
828+
.setPartialMessages(true)
829+
.build()
830+
)
831+
gossipExtensionsState.registerControlExtensionMessageSentToPeers(peer.peerId)
832+
}
833+
781834
data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
782835
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
783836
}

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ interface GossipRpcPartsQueue : RpcPartsQueue {
2626
* Gossip 1.1 variant
2727
*/
2828
fun addPrune(topic: Topic, backoffSeconds: Long, backoffPeers: List<PeerId>)
29+
30+
// TODO Need to check if we should handle when control extension and extension messages could be separated by split (https://github.com/libp2p/jvm-libp2p/issues/440)
31+
fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions)
2932
}
3033

3134
/**
@@ -81,6 +84,12 @@ open class DefaultGossipRpcPartsQueue(
8184
}
8285
}
8386

87+
protected data class ControlExtensionPart(val ctrlExtension: Rpc.ControlExtensions) : AbstractPart {
88+
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
89+
builder.controlBuilder.setExtensions(ctrlExtension)
90+
}
91+
}
92+
8493
override fun addIHave(messageId: MessageId, topic: Topic) {
8594
addPart(IHavePart(messageId, topic))
8695
}
@@ -101,6 +110,10 @@ open class DefaultGossipRpcPartsQueue(
101110
addPart(PrunePart(topic, backoffSeconds, backoffPeers))
102111
}
103112

113+
override fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions) {
114+
addPart(ControlExtensionPart(ctrlMessage))
115+
}
116+
104117
override fun takeMerged(): List<Rpc.RPC> {
105118
val ret = mutableListOf<Rpc.RPC>()
106119
var partIdx = 0

0 commit comments

Comments
 (0)