Skip to content

Refactored RoutedHost into Injected Router #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 5, 2019
Merged
23 changes: 11 additions & 12 deletions libp2p/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from .peer.id import id_from_public_key
from .network.swarm import Swarm
from .host.basic_host import BasicHost
from .kademlia.routed_host import RoutedHost
from .transport.upgrader import TransportUpgrader
from .transport.tcp.tcp import TCP
from .kademlia.network import KademliaServer
from .routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter


async def cleanup_done_tasks():
Expand All @@ -31,7 +31,7 @@ def generate_id():
# private_key = new_key.exportKey("PEM")
return new_id

def initialize_default_kademlia(
def initialize_default_kademlia_router(
ksize=20, alpha=3, id_opt=None, storage=None):
"""
initialize swam when no swarm is passed in
Expand All @@ -46,13 +46,14 @@ def initialize_default_kademlia(
id_opt = generate_id()

node_id = id_opt.get_raw_id()
return KademliaServer(ksize=ksize, alpha=alpha,
node_id=node_id, storage=storage)
server = KademliaServer(ksize=ksize, alpha=alpha,
node_id=node_id, storage=storage)
return KadmeliaPeerRouter(server)


def initialize_default_swarm(
id_opt=None, transport_opt=None,
muxer_opt=None, sec_opt=None, peerstore_opt=None):
id_opt=None, transport_opt=None, muxer_opt=None,
sec_opt=None, peerstore_opt=None, disc_opt=None):
"""
initialize swarm when no swarm is passed in
:param id_opt: optional id for host
Expand All @@ -78,7 +79,8 @@ def initialize_default_swarm(
upgrader = TransportUpgrader(sec, muxer)

peerstore = peerstore_opt or PeerStore()
swarm_opt = Swarm(id_opt, peerstore, upgrader, transport)
swarm_opt = Swarm(id_opt, peerstore,\
upgrader, transport, disc_opt)

return swarm_opt

Expand All @@ -105,14 +107,11 @@ async def new_node(
swarm_opt = initialize_default_swarm(
id_opt=id_opt, transport_opt=transport_opt,
muxer_opt=muxer_opt, sec_opt=sec_opt,
peerstore_opt=peerstore_opt)
peerstore_opt=peerstore_opt, disc_opt=disc_opt)

# TODO enable support for other host type
# TODO routing unimplemented
if not disc_opt:
host = BasicHost(swarm_opt)
else:
host = RoutedHost(swarm_opt, disc_opt)
host = BasicHost(swarm_opt)

# Kick off cleanup job
asyncio.ensure_future(cleanup_done_tasks())
Expand Down
21 changes: 11 additions & 10 deletions libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@
class BasicHost(IHost):

# default options constructor
def __init__(self, _network):
self.network = _network
self.peerstore = self.network.peerstore
def __init__(self, network, router=None):
self._network = network
self._router = router
self.peerstore = self._network.peerstore

def get_id(self):
"""
:return: peer_id of host
"""
return self.network.get_peer_id()
return self._network.get_peer_id()

def get_network(self):
"""
:return: network instance of host
"""
return self.network
return self._network

def get_peerstore(self):
"""
Expand All @@ -45,7 +46,7 @@ def get_addrs(self):
p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty()))

addrs = []
for transport in self.network.listeners.values():
for transport in self._network.listeners.values():
for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part))
return addrs
Expand All @@ -57,7 +58,7 @@ def set_stream_handler(self, protocol_id, stream_handler):
:param stream_handler: a stream handler function
:return: true if successful
"""
return self.network.set_stream_handler(protocol_id, stream_handler)
return self._network.set_stream_handler(protocol_id, stream_handler)

# protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on
Expand All @@ -67,7 +68,7 @@ async def new_stream(self, peer_id, protocol_ids):
:param protocol_id: protocol id that stream runs on
:return: true if successful
"""
stream = await self.network.new_stream(peer_id, protocol_ids)
stream = await self._network.new_stream(peer_id, protocol_ids)
return stream

async def connect(self, peer_info):
Expand All @@ -84,7 +85,7 @@ async def connect(self, peer_info):
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)

# there is already a connection to this peer
if peer_info.peer_id in self.network.connections:
if peer_info.peer_id in self._network.connections:
return

await self.network.dial_peer(peer_info.peer_id)
await self._network.dial_peer(peer_info.peer_id)
2 changes: 1 addition & 1 deletion libp2p/kademlia/crawling.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def _handle_found_values(self, values):
value_counts = Counter(values)
if len(value_counts) != 1:
log.warning("Got multiple values for key %i: %s",
self.node.long_id, str(values))
self.node.xor_id, str(values))
value = value_counts.most_common(1)[0][0]

peer = self.nearest_without_value.popleft()
Expand Down
11 changes: 8 additions & 3 deletions libp2p/kademlia/kad_peerinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ class KadPeerInfo(PeerInfo):
def __init__(self, peer_id, peer_data=None):
super(KadPeerInfo, self).__init__(peer_id, peer_data)

self.peer_id_obj = peer_id
self.peer_id = peer_id.get_raw_id()
self.long_id = int(digest(peer_id.get_raw_id()).hex(), 16)
self.xor_id = peer_id.get_xor_id()

self.addrs = peer_data.get_addrs() if peer_data else None

Expand All @@ -34,7 +35,7 @@ def distance_to(self, node):
"""
Get the distance between this node and another.
"""
return self.long_id ^ node.long_id
return self.xor_id ^ node.xor_id

def __iter__(self):
"""
Expand All @@ -43,11 +44,15 @@ def __iter__(self):
return iter([self.peer_id, self.ip, self.port])

def __repr__(self):
return repr([self.long_id, self.ip, self.port])
return repr([self.xor_id, self.ip, self.port, self.peer_id])

def __str__(self):
return "%s:%s" % (self.ip, str(self.port))

def encode(self):
return str(self.peer_id) + "\n" + \
str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))

class KadPeerHeap:
"""
A heap of peers ordered by distance to a given node.
Expand Down
2 changes: 1 addition & 1 deletion libp2p/kademlia/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def listen(self, port, interface='0.0.0.0'):
listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port))
log.info("Node %i listening on %s:%i",
self.node.long_id, interface, port)
self.node.xor_id, interface, port)
self.transport, self.protocol = await listen
# finally, schedule refreshing table
self.refresh_table()
Expand Down
21 changes: 0 additions & 21 deletions libp2p/kademlia/routed_host.py

This file was deleted.

6 changes: 3 additions & 3 deletions libp2p/kademlia/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def split(self):
one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two
bucket = one if node.xor_id <= midpoint else two
bucket.nodes[node.peer_id] = node
return (one, two)

Expand All @@ -48,7 +48,7 @@ def remove_node(self, node):
self.nodes[newnode.peer_id] = newnode

def has_in_range(self, node):
return self.range[0] <= node.long_id <= self.range[1]
return self.range[0] <= node.xor_id <= self.range[1]

def is_new_node(self, node):
return node.peer_id not in self.nodes
Expand Down Expand Up @@ -175,7 +175,7 @@ def get_bucket_for(self, node):
Get the index of the bucket that the given node would fall into.
"""
for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]:
if node.xor_id < bucket.range[1]:
return index
# we should never be here, but make linter happy
return None
Expand Down
14 changes: 10 additions & 4 deletions libp2p/network/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
from .connection.raw_connection import RawConnection

class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
# pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments

def __init__(self, peer_id, peerstore, upgrader, transport):
def __init__(self, peer_id, peerstore, upgrader, transport, router):
self.self_id = peer_id
self.peerstore = peerstore
self.upgrader = upgrader
self.transport = transport
self.router = router
self.connections = dict()
self.listeners = dict()
self.stream_handlers = dict()
Expand Down Expand Up @@ -57,8 +58,10 @@ async def dial_peer(self, peer_id):
if not addrs:
raise SwarmException("No known addresses to peer")

# TODO: define logic to choose which address to use, or try them all ?
multiaddr = addrs[0]
if not self.router:
multiaddr = addrs[0]
else:
multiaddr = self.router.find_peer(peer_id)

if peer_id in self.connections:
# If muxed connection already exists for peer_id,
Expand Down Expand Up @@ -183,6 +186,9 @@ def notify(self, notifee):
return True
return False

def add_router(self, router):
self.router = router

def create_generic_protocol_handler(swarm):
"""
Create a generic protocol handler from the given swarm. We use swarm
Expand Down
9 changes: 9 additions & 0 deletions libp2p/peer/id.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import base58
import multihash

Expand All @@ -21,6 +22,9 @@ def get_raw_id(self):
def pretty(self):
return base58.b58encode(self._id_str).decode()

def get_xor_id(self):
return int(digest(self.get_raw_id()).hex(), 16)

def __str__(self):
pid = self.pretty()
if len(pid) <= 10:
Expand Down Expand Up @@ -67,3 +71,8 @@ def id_from_public_key(key):

def id_from_private_key(key):
return id_from_public_key(key.publickey())

def digest(string):
if not isinstance(string, bytes):
string = str(string).encode('utf8')
return hashlib.sha1(string).digest()
2 changes: 1 addition & 1 deletion libp2p/routing/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ def find_peer(self, peer_id):
Find specific Peer
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
with relevant addresses.
"""
"""
35 changes: 35 additions & 0 deletions libp2p/routing/kademlia/kademlia_peer_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import ast

from libp2p.routing.interfaces import IPeerRouting
from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo


class KadmeliaPeerRouter(IPeerRouting):
# pylint: disable=too-few-public-methods

def __init__(self, dht_server):
self.server = dht_server

async def find_peer(self, peer_id):
"""
Find a specific peer
:param peer_id: peer to search for
:return: KadPeerInfo of specified peer
"""
# switching peer_id to xor_id used by kademlia as node_id
xor_id = peer_id.get_xor_id()
value = await self.server.get(xor_id)
return decode_peerinfo(value)

def decode_peerinfo(encoded):
if isinstance(encoded, bytes):
encoded = encoded.decode()
try:
lines = ast.literal_eval(encoded)
except SyntaxError:
return None
ip = lines[1] # pylint: disable=invalid-name
port = lines[2]
peer_id = lines[3]
peer_info = create_kad_peerinfo(peer_id, ip, port)
return peer_info
31 changes: 0 additions & 31 deletions libp2p/routing/kadmelia/kadmelia_peer_router.py

This file was deleted.

Empty file added tests/routing/__init__.py
Empty file.
Loading