operator-time
quadrismegistus 4 years ago
parent 57a6c0aa3a
commit caee9fc35c

13
.gitignore vendored

@ -1,18 +1,9 @@
__pycache__
app.json
.keys.json
app/cache
app/log.txt
log.txt
venv
# .vscode
sto.dat
p2p/kademlia0
cache.sqlite
.vscode
dbm.*
.DS_Store
.vscode/
.vscode/settings.json
lib
*.venv
.vscode
*.key

@ -0,0 +1 @@
{"user": {"username": "marx"}}

Before

Width:  |  Height:  |  Size: 2.3 KiB

After

Width:  |  Height:  |  Size: 2.3 KiB

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 43 KiB

Before

Width:  |  Height:  |  Size: 333 KiB

After

Width:  |  Height:  |  Size: 333 KiB

Before

Width:  |  Height:  |  Size: 276 KiB

After

Width:  |  Height:  |  Size: 276 KiB

Before

Width:  |  Height:  |  Size: 568 KiB

After

Width:  |  Height:  |  Size: 568 KiB

Before

Width:  |  Height:  |  Size: 9.7 MiB

After

Width:  |  Height:  |  Size: 9.7 MiB

Before

Width:  |  Height:  |  Size: 9.7 MiB

After

Width:  |  Height:  |  Size: 9.7 MiB

Before

Width:  |  Height:  |  Size: 398 KiB

After

Width:  |  Height:  |  Size: 398 KiB

Before

Width:  |  Height:  |  Size: 384 KiB

After

Width:  |  Height:  |  Size: 384 KiB

Before

Width:  |  Height:  |  Size: 9.7 MiB

After

Width:  |  Height:  |  Size: 9.7 MiB

Before

Width:  |  Height:  |  Size: 9.7 MiB

After

Width:  |  Height:  |  Size: 9.7 MiB

Before

Width:  |  Height:  |  Size: 9.7 MiB

After

Width:  |  Height:  |  Size: 9.7 MiB

Before

Width:  |  Height:  |  Size: 491 KiB

After

Width:  |  Height:  |  Size: 491 KiB

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 50 KiB

Before

Width:  |  Height:  |  Size: 36 KiB

After

Width:  |  Height:  |  Size: 36 KiB

Before

Width:  |  Height:  |  Size: 14 KiB

After

Width:  |  Height:  |  Size: 14 KiB

Before

Width:  |  Height:  |  Size: 7.0 KiB

After

Width:  |  Height:  |  Size: 7.0 KiB

Before

Width:  |  Height:  |  Size: 7.1 MiB

After

Width:  |  Height:  |  Size: 7.1 MiB

Before

Width:  |  Height:  |  Size: 2.0 MiB

After

Width:  |  Height:  |  Size: 2.0 MiB

Before

Width:  |  Height:  |  Size: 6.4 MiB

After

Width:  |  Height:  |  Size: 6.4 MiB

Before

Width:  |  Height:  |  Size: 923 KiB

After

Width:  |  Height:  |  Size: 923 KiB

Before

Width:  |  Height:  |  Size: 491 KiB

After

Width:  |  Height:  |  Size: 491 KiB

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 66 KiB

Before

Width:  |  Height:  |  Size: 47 KiB

After

Width:  |  Height:  |  Size: 47 KiB

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 23 KiB

Before

Width:  |  Height:  |  Size: 224 KiB

After

Width:  |  Height:  |  Size: 224 KiB

Before

Width:  |  Height:  |  Size: 151 KiB

After

Width:  |  Height:  |  Size: 151 KiB

Before

Width:  |  Height:  |  Size: 246 KiB

After

Width:  |  Height:  |  Size: 246 KiB

Before

Width:  |  Height:  |  Size: 44 KiB

After

Width:  |  Height:  |  Size: 44 KiB

Before

Width:  |  Height:  |  Size: 47 KiB

After

Width:  |  Height:  |  Size: 47 KiB

Before

Width:  |  Height:  |  Size: 48 KiB

After

Width:  |  Height:  |  Size: 48 KiB

Before

Width:  |  Height:  |  Size: 202 KiB

After

Width:  |  Height:  |  Size: 202 KiB

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 35 KiB

@ -1,6 +0,0 @@
"""
Kademlia is a Python implementation of the Kademlia protocol which
utilizes the asyncio library.
"""
__version__ = "2.2.1"

@ -1,190 +0,0 @@
from collections import Counter
import logging
from kademlia.node import Node, NodeHeap
from kademlia.utils import gather_dict
log = logging.getLogger(__name__) # pylint: disable=invalid-name
# pylint: disable=too-few-public-methods
class SpiderCrawl:
"""
Crawl the network and look for given 160-bit keys.
"""
def __init__(self, protocol, node, peers, ksize, alpha, log = print):
"""
Create a new C{SpiderCrawl}er.
Args:
protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance.
node: A :class:`~kademlia.node.Node` representing the key we're
looking for
peers: A list of :class:`~kademlia.node.Node` instances that
provide the entry point for the network
ksize: The value for k based on the paper
alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = NodeHeap(self.node, self.ksize)
self.last_ids_crawled = []
self.log("creating spider with peers: %s" % peers)
self.nearest.push(peers)
self.log = log
async def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
Args:
rpcmethod: The protocol's callfindValue or call_find_node.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried
already sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
self.log("crawling network with nearest: %s" % str(tuple(self.nearest)))
count = self.alpha
if self.nearest.get_ids() == self.last_ids_crawled:
count = len(self.nearest)
self.last_ids_crawled = self.nearest.get_ids()
dicts = {}
for peer in self.nearest.get_uncontacted()[:count]:
dicts[peer.id] = rpcmethod(peer, self.node)
self.nearest.mark_contacted(peer)
found = await gather_dict(dicts)
return await self._nodes_found(found)
async def _nodes_found(self, responses):
raise NotImplementedError
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha, log=print):
self.log = log
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha, log = log)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearest_without_value = NodeHeap(self.node, 1)
async def find(self):
"""
Find either the closest nodes or the value requested.
"""
return await self._find(self.protocol.call_find_value)
async def _nodes_found(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
found_values = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.has_value():
found_values.append(response.get_value())
else:
peer = self.nearest.get_node(peerid)
self.nearest_without_value.push(peer)
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if found_values:
return await self._handle_found_values(found_values)
if self.nearest.have_contacted_all():
# not found!
return None
return await self.find()
async def _handle_found_values(self, values):
"""
We got some values! Exciting. But let's make sure
they're all the same or freak out a little bit. Also,
make sure we tell the nearest node that *didn't* have
the value to store it.
"""
value_counts = Counter(values)
if len(value_counts) != 1:
self.log("Got multiple values for key %i: %s" %
(self.node.long_id, str(values)) )
value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft()
if peer:
await self.protocol.call_store(peer, self.node.id, value)
return value
class NodeSpiderCrawl(SpiderCrawl):
def __init__(self,*x,**y):
self.log=y.get('log',print)
super().__init__(*x)
async def find(self):
"""
Find the closest nodes.
"""
return await self._find(self.protocol.call_find_node)
async def _nodes_found(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if self.nearest.have_contacted_all():
return list(self.nearest)
return await self.find()
class RPCFindResponse:
def __init__(self, response):
"""
A wrapper for the result of a RPC find.
Args:
response: This will be a tuple of (<response received>, <value>)
where <value> will be a list of tuples if not found or
a dictionary of {'value': v} where v is the value desired
"""
self.response = response
def happened(self):
"""
Did the other host actually respond?
"""
return self.response[0]
def has_value(self):
return isinstance(self.response[1], dict)
def get_value(self):
return self.response[1]['value']
def get_node_list(self):
"""
Get the node list in the response. If there's no value, this should
be set.
"""
nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist]

@ -1,333 +0,0 @@
"""
Package for interacting on the network at a high level.
"""
STORE_ANYWHERE=True
import random
import pickle
import asyncio
import logging
class CannotReachNetworkError(Exception): pass
from kademlia.protocol import KademliaProtocol
from kademlia.utils import digest
from kademlia.storage import HalfForgetfulStorage
from kademlia.node import Node
from kademlia.crawling import ValueSpiderCrawl
from kademlia.crawling import NodeSpiderCrawl
log = logging.getLogger(__name__) # pylint: disable=invalid-name
# pylint: disable=too-many-instance-attributes
class Server:
"""
High level view of a node instance. This is the object that should be
created to start listening as an active node on the network.
"""
protocol_class = KademliaProtocol
@property
def logger(self):
if not hasattr(self,'_logger'):
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self._logger = logger = logging.getLogger(self.title)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return self._logger
def __init__(self, ksize=20, alpha=3, node_id=None, storage=None,log=None):
"""
Create a server instance. This will start listening on the given port.
Args:
ksize (int): The k parameter from the paper
alpha (int): The alpha parameter from the paper
node_id: The id for this node on the network.
storage: An instance that implements the interface
:class:`~kademlia.storage.IStorage`
"""
self.ksize = ksize
self.alpha = alpha
self.log = log if log is not None else self.logger.debug
self.storage = HalfForgetfulStorage() #storage or ForgetfulStorage()
print('[Server] storage loaded with %s keys' % len(self.storage.data))
self.node = Node(node_id or digest(random.getrandbits(255)))
self.transport = None
self.protocol = None
self.refresh_loop = None
self.save_state_loop = None
## echo
#self.re_echo()
#def re_echo(self):
# return [asyncio.create_task(self.set_digest(k,v)) for k,v in self.storage.items()]
def __repr__(self):
neighbs=self.bootstrappable_neighbors()
neighbors=' '.join(':'.join(str(x) for x in ip_port) for ip_port in neighbs)
repr = f"""storing {len(self.storage.data)} keys and has {len(neighbs)} neighbors""" #:\n\t{neighbors}"""
return repr
def stop(self):
if self.transport is not None:
self.transport.close()
if self.refresh_loop:
self.refresh_loop.cancel()
if self.save_state_loop:
self.save_state_loop.cancel()
def _create_protocol(self):
return self.protocol_class(self.node, self.storage, self.ksize, self.log)
async def listen(self, port, interface='0.0.0.0'):
"""
Start listening on the given port.
Provide interface="::" to accept ipv6 address
"""
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port))
self.log("Node %i listening on %s:%i" % (self.node.long_id, interface, port))
self.transport, self.protocol = await listen
# finally, schedule refreshing table
self.refresh_table()
def refresh_table(self):
self.log("Refreshing routing table")
asyncio.ensure_future(self._refresh_table())
loop = asyncio.get_event_loop()
self.refresh_loop = loop.call_later(3600, self.refresh_table)
async def _refresh_table(self):
"""
Refresh buckets that haven't had any lookups in the last hour
(per section 2.3 of the paper).
"""
results = []
for node_id in self.protocol.get_refresh_ids():
node = Node(node_id)
nearest = self.protocol.router.find_neighbors(node, self.alpha)
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
spider.log=self.log
results.append(spider.find())
# do our crawling
await asyncio.gather(*results)
# now republish keys older than one hour
# repub_every=3600
repub_every=3600
for dkey, value in self.storage.iter_older_than(repub_every):
await self.set_digest(dkey, value)
def bootstrappable_neighbors(self):
"""
Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for
use as an argument to the bootstrap method.
The server should have been bootstrapped
already - this is just a utility for getting some neighbors and then
storing them if this server is going down for a while. When it comes
back up, the list of nodes can be used to bootstrap.
"""
neighbors = self.protocol.router.find_neighbors(self.node)
return [tuple(n)[-2:] for n in neighbors]
async def bootstrap(self, addrs):
"""
Bootstrap the server by connecting to other known nodes in the network.
Args:
addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP
addresses are acceptable - hostnames will cause an error.
"""
self.log("Attempting to bootstrap node with %i initial contacts",
len(addrs))
cos = list(map(self.bootstrap_node, addrs))
gathered = await asyncio.gather(*cos)
nodes = [node for node in gathered if node is not None]
spider = NodeSpiderCrawl(self.protocol, self.node, nodes,
self.ksize, self.alpha)
spider.log=self.log
return await spider.find()
async def bootstrap_node(self, addr):
result = await self.protocol.ping(addr, self.node.id)
return Node(result[1], addr[0], addr[1]) if result[0] else None
async def get(self, key, store_anywhere=STORE_ANYWHERE):
"""
Get a key if the network has it.
Returns:
:class:`None` if not found, the value otherwise.
"""
dkey = digest(key)
self.log("Looking up key %s %s" % (key,dkey))
# if this node has it, return it
if self.storage.get(dkey) is not None:
self.log(f'already have {key} ({dkey}) in storage, returning...')
return self.storage.get(dkey)
node = Node(dkey)
self.log(f'creating node {node}')
nearest = self.protocol.router.find_neighbors(node)
self.log(f'nearest = {nearest}')
if not nearest:
raise CannotReachNetworkError("There are no known neighbors to get key %s" % key)
found = None
#while found is None:
spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha, log=self.log)
self.log(f'spider crawling... {spider}')
found = await spider.find()
self.log('spider found <-',found,'for key',key,'(',dkey,')')
#await asyncio.sleep(5)
self.log(f"Eventually found for key {key} value {found}")
# if not found:
# return None
#raise Exception('nothing found!')
# # set it locally? @EDIT
# if store_anywhere and found:
# self.log(f'storing anywhere: {dkey} -> {found}')
# self.storage[dkey]=found
return found
async def set(self, key, value):
"""
Set the given string key to the given value in the network.
"""
if not check_dht_value_type(value):
raise TypeError(
"Value must be of type int, float, bool, str, or bytes"
)
self.log(f"setting '{key}' = '{value}' ({type(value)}) on network")
dkey = digest(key)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value, store_anywhere=STORE_ANYWHERE):
"""
Set the given SHA1 digest key (bytes) to the given value in the
network.
"""
node = Node(dkey)
self.log('set_digest()',node)
nearest = self.protocol.router.find_neighbors(node)
self.log('set_digest() nearest -->',nearest)
if not nearest:
self.log("There are no known neighbors to set key %s" % dkey.hex())
return False
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha, log=self.log)
nodes = await spider.find()
self.log(f"setting '%s' on %s" % (dkey.hex(), list(map(str, nodes))))
# if this node is close too, then store here as well
try:
biggest = max([n.distance_to(node) for n in nodes])
if self.node.distance_to(node) < biggest:
self.log(f'< bigges -> {dkey} --> {value}')
self.storage[dkey] = value
except ValueError as e:
pass # !?!?
results = [self.protocol.call_store(n, dkey, value) for n in nodes]
results = await asyncio.gather(*results)
self.log(f'--> set() results --> {results}')
if store_anywhere:
self.log(f'store_anywhere -> {dkey} --> {value}')
self.storage[dkey]=value
# return true only if at least one store call succeeded
return any(results)
def save_state(self, fname):
"""
Save the state of this node (the alpha/ksize/id/immediate neighbors)
to a cache file with the given fname.
"""
self.log("Saving state to %s" % fname)
data = {
'ksize': self.ksize,
'alpha': self.alpha,
'id': self.node.id,
'neighbors': self.bootstrappable_neighbors()
}
if not data['neighbors']:
self.log("No known neighbors, so not writing to cache.")
return
with open(fname, 'wb') as file:
pickle.dump(data, file)
@classmethod
async def load_state(cls, fname, port, interface='0.0.0.0'):
"""
Load the state of this node (the alpha/ksize/id/immediate neighbors)
from a cache file with the given fname and then bootstrap the node
(using the given port/interface to start listening/bootstrapping).
"""
self.log("Loading state from %s" % fname)
with open(fname, 'rb') as file:
data = pickle.load(file)
svr = Server(data['ksize'], data['alpha'], data['id'])
await svr.listen(port, interface)
if data['neighbors']:
await svr.bootstrap(data['neighbors'])
return svr
def save_state_regularly(self, fname, frequency=600):
"""
Save the state of node with a given regularity to the given
filename.
Args:
fname: File name to save retularly to
frequency: Frequency in seconds that the state should be saved.
By default, 10 minutes.
"""
self.save_state(fname)
loop = asyncio.get_event_loop()
self.save_state_loop = loop.call_later(frequency,
self.save_state_regularly,
fname,
frequency)
def check_dht_value_type(value):
"""
Checks to see if the type of the value is a valid type for
placing in the dht.
"""
typeset = [
int,
float,
bool,
str,
bytes
]
return type(value) in typeset # pylint: disable=unidiomatic-typecheck

@ -1,127 +0,0 @@
from operator import itemgetter
import heapq
class Node:
"""
Simple object to encapsulate the concept of a Node (minimally an ID, but
also possibly an IP and port if this represents a node on the network).
This class should generally not be instantiated directly, as it is a low
level construct mostly used by the router.
"""
def __init__(self, node_id, ip=None, port=None):
"""
Create a Node instance.
Args:
node_id (int): A value between 0 and 2^160
ip (string): Optional IP address where this Node lives
port (int): Optional port for this Node (set when IP is set)
"""
self.id = node_id # pylint: disable=invalid-name
self.ip = ip # pylint: disable=invalid-name
self.port = port
self.long_id = int(node_id.hex(), 16)
def same_home_as(self, node):
return self.ip == node.ip and self.port == node.port
def distance_to(self, node):
"""
Get the distance between this node and another.
"""
return self.long_id ^ node.long_id
def __iter__(self):
"""
Enables use of Node as a tuple - i.e., tuple(node) works.
"""
return iter([self.id, self.ip, self.port])
def __repr__(self):
return repr([self.long_id, self.ip, self.port])
def __str__(self):
return "%s:%s" % (self.ip, str(self.port))
class NodeHeap:
"""
A heap of nodes ordered by distance to a given node.
"""
def __init__(self, node, maxsize):
"""
Constructor.
@param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to.
"""
self.node = node
self.heap = []
self.contacted = set()
self.maxsize = maxsize
def remove(self, peers):
"""
Remove a list of peer ids from this heap. Note that while this
heap retains a constant visible size (based on the iterator), it's
actual size may be quite a bit larger than what's exposed. Therefore,
removal of nodes may not change the visible size as previously added
nodes suddenly become visible.
"""
peers = set(peers)
if not peers:
return
nheap = []
for distance, node in self.heap:
if node.id not in peers:
heapq.heappush(nheap, (distance, node))
self.heap = nheap
def get_node(self, node_id):
for _, node in self.heap:
if node.id == node_id:
return node
return None
def have_contacted_all(self):
return len(self.get_uncontacted()) == 0
def get_ids(self):
return [n.id for n in self]
def mark_contacted(self, node):
self.contacted.add(node.id)
def popleft(self):
return heapq.heappop(self.heap)[1] if self else None
def push(self, nodes):
"""
Push nodes onto heap.
@param nodes: This can be a single item or a C{list}.
"""
if not isinstance(nodes, list):
nodes = [nodes]
for node in nodes:
if node not in self:
distance = self.node.distance_to(node)
heapq.heappush(self.heap, (distance, node))
def __len__(self):
return min(len(self.heap), self.maxsize)
def __iter__(self):
nodes = heapq.nsmallest(self.maxsize, self.heap)
return iter(map(itemgetter(1), nodes))
def __contains__(self, node):
for _, other in self.heap:
if node.id == other.id:
return True
return False
def get_uncontacted(self):
return [n for n in self if n.id not in self.contacted]

@ -1,179 +0,0 @@
import random
import asyncio
import logging
from rpcudp.protocol import RPCProtocol
from kademlia.node import Node
from kademlia.routing import RoutingTable
from kademlia.utils import digest
log = logging.getLogger(__name__) # pylint: disable=invalid-name
#### PROXY PROTOCOL
class ProxyDatagramProtocol(asyncio.DatagramProtocol):
def __init__(self, remote_address):
self.remote_address = remote_address
self.remotes = {}
super().__init__()
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
if addr in self.remotes:
self.remotes[addr].transport.sendto(data)
return
loop = asyncio.get_event_loop()
self.remotes[addr] = RemoteDatagramProtocol(self, addr, data)
coro = loop.create_datagram_endpoint(
lambda: self.remotes[addr], remote_addr=self.remote_address)
asyncio.ensure_future(coro)
class RemoteDatagramProtocol(asyncio.DatagramProtocol):
def __init__(self, proxy, addr, data):
self.proxy = proxy
self.addr = addr
self.data = data
super().__init__()
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(self.data)
def datagram_received(self, data, _):
self.proxy.transport.sendto(data, self.addr)
def connection_lost(self, exc):
self.proxy.remotes.pop(self.attr)
#####
import logging
log = logging.getLogger(__name__) # pylint: disable=invalid-name
class KademliaProtocol(RPCProtocol):
def __init__(self, source_node, storage, ksize, log=None):
RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize, source_node)
self.storage = storage
self.source_node = source_node
self.log=log.debug if log is None else log
def get_refresh_ids(self):
"""
Get ids to search for to keep old buckets up to date.
"""
ids = []
for bucket in self.router.lonely_buckets():
rid = random.randint(*bucket.range).to_bytes(20, byteorder='big')
ids.append(rid)
return ids
def rpc_stun(self, sender): # pylint: disable=no-self-use
return sender
def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
return self.source_node.id
def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
self.log("got a store request from %s, storing '%s' -> %s'" %
(sender, key.hex(), value[:10]))
self.storage[key] = value
return True
def rpc_find_node(self, sender, nodeid, key):
self.log("finding neighbors of %i in local table" %
int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
node = Node(key)
neighbors = self.router.find_neighbors(node, exclude=source)
return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
return {'value': value}
async def call_find_node(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_node(address, self.source_node.id,
node_to_find.id)
return self.handle_call_response(result, node_to_ask)
async def call_find_value(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_value(address, self.source_node.id,
node_to_find.id)
return self.handle_call_response(result, node_to_ask)
async def call_ping(self, node_to_ask):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.ping(address, self.source_node.id)
return self.handle_call_response(result, node_to_ask)
async def call_store(self, node_to_ask, key, value):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.store(address, self.source_node.id, key, value)
return self.handle_call_response(result, node_to_ask)
def welcome_if_new(self, node):
"""
Given a new node, send it all the keys/values it should be storing,
then add it to the routing table.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
if not self.router.is_new_node(node):
return
self.log("never seen %s before, adding to router" % node)
#for key, value in self.storage:
for key in self.storage.keys():
value = self.storage[key]
keynode = Node(digest(key))
neighbors = self.router.find_neighbors(keynode)
if neighbors:
last = neighbors[-1].distance_to(keynode)
new_node_close = node.distance_to(keynode) < last
first = neighbors[0].distance_to(keynode)
this_closest = self.source_node.distance_to(keynode) < first
if not neighbors or (new_node_close and this_closest):
asyncio.ensure_future(self.call_store(node, key, value))
self.router.add_contact(node)
def handle_call_response(self, result, node):
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if not result[0]:
self.log("!! no response from %s, removing from router", node)
self.router.remove_contact(node)
return result
self.log("got successful response from %s" % node)
self.welcome_if_new(node)
return result

@ -1,199 +0,0 @@
import heapq
import time
import operator
import asyncio
from itertools import chain
from collections import OrderedDict
from kademlia.utils import shared_prefix, bytes_to_bit_string
# EXCLUDE_PORTS = {5637}
EXCLUDE_PORTS = {}
class KBucket:
def __init__(self, rangeLower, rangeUpper, ksize, replacementNodeFactor=5):
self.range = (rangeLower, rangeUpper)
self.nodes = OrderedDict()
self.replacement_nodes = OrderedDict()
self.touch_last_updated()
self.ksize = ksize
self.max_replacement_nodes = self.ksize * replacementNodeFactor
def touch_last_updated(self):
self.last_updated = time.monotonic()
def get_nodes(self):
return list(self.nodes.values())
def split(self):
midpoint = (self.range[0] + self.range[1]) // 2
one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize)
nodes = chain(self.nodes.values(), self.replacement_nodes.values())
for node in nodes:
bucket = one if node.long_id <= midpoint else two
bucket.add_node(node)
return (one, two)
def remove_node(self, node):
if node.id in self.replacement_nodes:
del self.replacement_nodes[node.id]
if node.id in self.nodes:
del self.nodes[node.id]
if self.replacement_nodes:
newnode_id, newnode = self.replacement_nodes.popitem()
self.nodes[newnode_id] = newnode
def has_in_range(self, node):
return self.range[0] <= node.long_id <= self.range[1]
def is_new_node(self, node):
return node.id not in self.nodes
def add_node(self, node):
"""
Add a C{Node} to the C{KBucket}. Return True if successful,
False if the bucket is full.
If the bucket is full, keep track of node in a replacement list,
per section 4.1 of the paper.
"""
if node.id in self.nodes:
del self.nodes[node.id]
self.nodes[node.id] = node
elif len(self) < self.ksize:
self.nodes[node.id] = node
else:
if node.id in self.replacement_nodes:
del self.replacement_nodes[node.id]
self.replacement_nodes[node.id] = node
while len(self.replacement_nodes) > self.max_replacement_nodes:
self.replacement_nodes.popitem(last=False)
return False
return True
def depth(self):
vals = self.nodes.values()
sprefix = shared_prefix([bytes_to_bit_string(n.id) for n in vals])
return len(sprefix)
def head(self):
return list(self.nodes.values())[0]
def __getitem__(self, node_id):
return self.nodes.get(node_id, None)
def __len__(self):
return len(self.nodes)
class TableTraverser:
def __init__(self, table, startNode):
index = table.get_bucket_for(startNode)
table.buckets[index].touch_last_updated()
self.current_nodes = table.buckets[index].get_nodes()
self.left_buckets = table.buckets[:index]
self.right_buckets = table.buckets[(index + 1):]
self.left = True
def __iter__(self):
return self
def __next__(self):
"""
Pop an item from the left subtree, then right, then left, etc.
"""
if self.current_nodes:
return self.current_nodes.pop()
if self.left and self.left_buckets:
self.current_nodes = self.left_buckets.pop().get_nodes()
self.left = False
return next(self)
if self.right_buckets:
self.current_nodes = self.right_buckets.pop(0).get_nodes()
self.left = True
return next(self)
raise StopIteration
class RoutingTable:
def __init__(self, protocol, ksize, node):
"""
@param node: The node that represents this server. It won't
be added to the routing table, but will be needed later to
determine which buckets to split or not.
"""
self.node = node
self.protocol = protocol
self.ksize = ksize
self.flush()
def flush(self):
self.buckets = [KBucket(0, 2 ** 160, self.ksize)]
def split_bucket(self, index):
one, two = self.buckets[index].split()
self.buckets[index] = one
self.buckets.insert(index + 1, two)
def lonely_buckets(self):
"""
Get all of the buckets that haven't been updated in over
an hour.
"""
hrago = time.monotonic() - 3600
return [b for b in self.buckets if b.last_updated < hrago]
def remove_contact(self, node):
index = self.get_bucket_for(node)
self.buckets[index].remove_node(node)
def is_new_node(self, node):
index = self.get_bucket_for(node)
return self.buckets[index].is_new_node(node)
def add_contact(self, node):
index = self.get_bucket_for(node)
bucket = self.buckets[index]
# this will succeed unless the bucket is full
if bucket.add_node(node):
return
# Per section 4.2 of paper, split if the bucket has the node
# in its range or if the depth is not congruent to 0 mod 5
if bucket.has_in_range(self.node) or bucket.depth() % 5 != 0:
self.split_bucket(index)
self.add_contact(node)
else:
asyncio.ensure_future(self.protocol.call_ping(bucket.head()))
def get_bucket_for(self, node):
"""
Get the index of the bucket that the given node would fall into.
"""
for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]:
return index
# we should never be here, but make linter happy
return None
def find_neighbors(self, node, k=None, exclude=None, exclude_ports=EXCLUDE_PORTS):
k = k or self.ksize
nodes = []
for neighbor in TableTraverser(self, node):
notexcluded = exclude is None or not neighbor.same_home_as(exclude)
notexcluded_port = exclude_ports is None or neighbor.port not in exclude_ports
#print('EXCLUDING_PORTS',notexcluded_port,exclude_ports)
if neighbor.id != node.id and notexcluded:
heapq.heappush(nodes, (node.distance_to(neighbor), neighbor))
if len(nodes) == k:
break
return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes)))

@ -1,284 +0,0 @@
import time
from itertools import takewhile
import operator
from collections import OrderedDict
from abc import abstractmethod, ABC
import asyncio
from kademlia.utils import digest
#BSEP_ST = b'||||'
import base64,json
def xprint(*xx):
raise Exception('\n'.join(str(x) for x in xx))
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger(__file__)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
log=logger.info
class IStorage(ABC):
"""
Local storage for this node.
IStorage implementations of get must return the same type as put in by set
"""
@abstractmethod
def __setitem__(self, key, value):
"""
Set a key to the given value.
"""
@abstractmethod
def __getitem__(self, key):
"""
Get the given key. If item doesn't exist, raises C{KeyError}
"""
@abstractmethod
def get(self, key, default=None):
"""
Get given key. If not found, return default.
"""
@abstractmethod
def iter_older_than(self, seconds_old):
"""
Return the an iterator over (key, value) tuples for items older
than the given secondsOld.
"""
@abstractmethod
def __iter__(self):
"""
Get the iterator for this storage, should yield tuple of (key, value)
"""
# class ForgetfulStorage(IStorage):
# def __init__(self, ttl=604800):
# """
# By default, max age is a week.
# """
# self.data = OrderedDict()
# self.ttl = ttl
# def __setitem__(self, key, value):
# if key in self.data:
# del self.data[key]
# self.data[key] = (time.monotonic(), value)
# self.cull()
# def cull(self):
# for _, _ in self.iter_older_than(self.ttl):
# self.data.popitem(last=False)
# def get(self, key, default=None):
# self.cull()
# if key in self.data:
# return self[key]
# return default
# def __getitem__(self, key):
# self.cull()
# return self.data[key][1]
# def __repr__(self):
# self.cull()
# return repr(self.data)
# def iter_older_than(self, seconds_old):
# min_birthday = time.monotonic() - seconds_old
# zipped = self._triple_iter()
# matches = takewhile(lambda r: min_birthday >= r[1], zipped)
# return list(map(operator.itemgetter(0, 2), matches))
# def _triple_iter(self):
# ikeys = self.data.keys()
# ibirthday = map(operator.itemgetter(0), self.data.values())
# ivalues = map(operator.itemgetter(1), self.data.values())
# return zip(ikeys, ibirthday, ivalues)
# def __iter__(self):
# self.cull()
# ikeys = self.data.keys()
# ivalues = map(operator.itemgetter(1), self.data.values())
# return zip(ikeys, ivalues)
import pickle,os
class HalfForgetfulStorage(IStorage):
def __init__(self, fn='dbm.pickle', ttl=604800, log=None):
"""
By default, max age is a week.
"""
self.fn = fn
self.ttl = ttl
self.log = logger.info
self.data = self.load()
# import pickledb
# self.data = pickledb.load(self.fn,auto_dump=True)
#import shelve
#self.data = shelve.open(self.fn,flag='cs')
def dump(self,show_keys=100):
async def do():
msg='[async!!] dumping %s keys...' % len(self.keys())
with open(self.fn,'wb') as of:
pickle.dump(self.data, of)
asyncio.create_task(do())
def load(self):
if not os.path.exists(self.fn): return OrderedDict()
self.log('loading pickle...')
with open(self.fn,'rb') as of:
res=pickle.load(of)
self.log(f'>> found {len(res)} keys in pickle...')
return res
def __setitem__(self, key, value):
self.set(key,value)
def keys(self): return self.data.keys()
def items(self): return [(k,v) for k,v in zip(self.keys(),self.values())]
def values(self): return [self.data[k] for k in self.keys()]
def set(self,dkey,value):
# log(f'HFS.set({key}) -> {value}')
newval = (time.monotonic(), value)
# store
if dkey in self.data:
del self.data[dkey]
self.data[dkey]=newval
# save and prune
self.dump()
# self.cull()
def cull(self):
for _, _ in self.iter_older_than(self.ttl):
self.data.popitem(last=False)
def get(self, key, default=None, incl_time=False):
#self.cull()
# log(f'HFS.get({key}) -> ?')
try:
val=self.data[key]
# val=self.data.get(key)
# log(f'HFS.get({key}) -> {val}')
if val is False: raise KeyError
if val and not incl_time: val=val[1]
return val
except (KeyError,IndexError) as e:
pass
return default
def __getitem__(self, key):
#self.cull()
return self.get(key)
def __repr__(self,lim_eg=5):
#self.cull()
#return repr(self.data)
#eg = list(sorted(self.data.keys()))[:lim_eg]
msg=f"""HFS() # keys = {len(self.data)}"""
return msg
def iter_older_than(self, seconds_old):
min_birthday = time.monotonic() - seconds_old
zipped = self._triple_iter()
matches = takewhile(lambda r: min_birthday >= r[1], zipped)
return list(map(operator.itemgetter(0, 2), matches))
def _triple_iter(self):
ikeys = self.keys()
ibirthday = map(operator.itemgetter(0), self.values())
ivalues = map(operator.itemgetter(1), self.values())
return zip(ikeys, ibirthday, ivalues)
def __iter__(self):
self.cull()
ikeys = self.keys()
ivalues = map(operator.itemgetter(1), self.values())
return zip(ikeys, ivalues)
# class HalfForgetfulStorage(ForgetfulStorage):
# def __init__(self, fn='dbm', ttl=604800, log=print):
# """
# By default, max age is a week.
# """
# self.fn=fn
# self.log=log
# import pickledb
# # self.data = pickledb.load(self.fn,False)
# import dbm
# self.data = dbm.open(self.fn,flag='cs')
# # import shelve
# # self.data = shelve.open(self.fn, flag='cs')
# # from kivy.storage.jsonstore import JsonStore
# # self.
# self.ttl = ttl
# self.log('have %s keys' % len(self))
# def keys(self):
# # return self.data.getall()
# return self.data.keys()
# def __len__(self):
# return len(self.keys())
# def __setitem__(self, key, value):
# self.set(key,value)
# def set(self, key,value):# try:
# #self.log(f'key: {key},\nvalue:{value}')
# #if type(value)==list and len(value)==2:
# # time,val_b = value
# # value = str(time).encode() + BSEP_ST + val_b
# #self.log('newdat =',value)
# self.data[key]=value
# # return True
# def get(self, key, default=None):
# # print(f'??!?\n{key}\n{self.data[key]}')
# # return self.data[key][1]
# # (skip time part of tuple)
# # val=self.data[key] if key in self.data else None
# # self.log('VALLLL',val)
# # if val is None: return None
# # time_b,val_b = val.split(BSEP_ST)
# # rval = (float(time_b.decode()), val_b)
# # self.log('rvalll',rval)
# # return rval
# return self.data.get(key,None)
# def __getitem__(self, key):
# return self.get(key)
# #return data_list

@ -1,3 +0,0 @@
"""
Tests live here.
"""

@ -1,57 +0,0 @@
import random
import hashlib
from struct import pack
import pytest
from kademlia.network import Server
from kademlia.node import Node
from kademlia.routing import RoutingTable
@pytest.yield_fixture
def bootstrap_node(event_loop):
server = Server()
event_loop.run_until_complete(server.listen(8468))
try:
yield ('127.0.0.1', 8468)
finally:
server.stop()
# pylint: disable=redefined-outer-name
@pytest.fixture()
def mknode():
def _mknode(node_id=None, ip_addy=None, port=None, intid=None):
"""
Make a node. Created a random id if not specified.
"""
if intid is not None:
node_id = pack('>l', intid)
if not node_id:
randbits = str(random.getrandbits(255))
node_id = hashlib.sha1(randbits.encode()).digest()
return Node(node_id, ip_addy, port)
return _mknode
# pylint: disable=too-few-public-methods
class FakeProtocol: # pylint: disable=too-few-public-methods
def __init__(self, source_id, ksize=20):
self.router = RoutingTable(self, ksize, Node(source_id))
self.storage = {}
self.source_id = source_id
# pylint: disable=too-few-public-methods
class FakeServer:
def __init__(self, node_id):
self.id = node_id # pylint: disable=invalid-name
self.protocol = FakeProtocol(self.id)
self.router = self.protocol.router
@pytest.fixture
def fake_server(mknode):
return FakeServer(mknode().id)

@ -1,26 +0,0 @@
from glob import glob
import pycodestyle
from pylint import epylint as lint
class LintError(Exception):
pass
class TestCodeLinting:
# pylint: disable=no-self-use
def test_pylint(self):
(stdout, _) = lint.py_run('kademlia', return_std=True)
errors = stdout.read()
if errors.strip():
raise LintError(errors)
# pylint: disable=no-self-use
def test_pep8(self):
style = pycodestyle.StyleGuide()
files = glob('kademlia/**/*.py', recursive=True)
result = style.check_files(files)
if result.total_errors > 0:
raise LintError("Code style errors found.")

@ -1,54 +0,0 @@
import random
import hashlib
from kademlia.node import Node, NodeHeap
class TestNode:
def test_long_id(self): # pylint: disable=no-self-use
rid = hashlib.sha1(str(random.getrandbits(255)).encode()).digest()
node = Node(rid)
assert node.long_id == int(rid.hex(), 16)
def test_distance_calculation(self): # pylint: disable=no-self-use
ridone = hashlib.sha1(str(random.getrandbits(255)).encode())
ridtwo = hashlib.sha1(str(random.getrandbits(255)).encode())
shouldbe = int(ridone.hexdigest(), 16) ^ int(ridtwo.hexdigest(), 16)
none = Node(ridone.digest())
ntwo = Node(ridtwo.digest())
assert none.distance_to(ntwo) == shouldbe
class TestNodeHeap:
def test_max_size(self, mknode): # pylint: disable=no-self-use
node = NodeHeap(mknode(intid=0), 3)
assert not node
for digit in range(10):
node.push(mknode(intid=digit))
assert len(node) == 3
assert len(list(node)) == 3
def test_iteration(self, mknode): # pylint: disable=no-self-use
heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(intid=x) for x in range(10)]
for index, node in enumerate(nodes):
heap.push(node)
for index, node in enumerate(heap):
assert index == node.long_id
assert index < 5
def test_remove(self, mknode): # pylint: disable=no-self-use
heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(intid=x) for x in range(10)]
for node in nodes:
heap.push(node)
heap.remove([nodes[0].id, nodes[1].id])
assert len(list(heap)) == 5
for index, node in enumerate(heap):
assert index + 2 == node.long_id
assert index < 5

@ -1,121 +0,0 @@
from random import shuffle
from kademlia.routing import KBucket, TableTraverser
class TestKBucket:
def test_split(self, mknode): # pylint: disable=no-self-use
bucket = KBucket(0, 10, 5)
bucket.add_node(mknode(intid=5))
bucket.add_node(mknode(intid=6))
one, two = bucket.split()
assert len(one) == 1
assert one.range == (0, 5)
assert len(two) == 1
assert two.range == (6, 10)
def test_split_no_overlap(self): # pylint: disable=no-self-use
left, right = KBucket(0, 2 ** 160, 20).split()
assert (right.range[0] - left.range[1]) == 1
def test_add_node(self, mknode): # pylint: disable=no-self-use
# when full, return false
bucket = KBucket(0, 10, 2)
assert bucket.add_node(mknode()) is True
assert bucket.add_node(mknode()) is True
assert bucket.add_node(mknode()) is False
assert len(bucket) == 2
# make sure when a node is double added it's put at the end
bucket = KBucket(0, 10, 3)
nodes = [mknode(), mknode(), mknode()]
for node in nodes:
bucket.add_node(node)
for index, node in enumerate(bucket.get_nodes()):
assert node == nodes[index]
def test_remove_node(self, mknode): # pylint: disable=no-self-use
k = 3
bucket = KBucket(0, 10, k)
nodes = [mknode() for _ in range(10)]
for node in nodes:
bucket.add_node(node)
replacement_nodes = bucket.replacement_nodes
assert list(bucket.nodes.values()) == nodes[:k]
assert list(replacement_nodes.values()) == nodes[k:]
bucket.remove_node(nodes.pop())
assert list(bucket.nodes.values()) == nodes[:k]
assert list(replacement_nodes.values()) == nodes[k:]
bucket.remove_node(nodes.pop(0))
assert list(bucket.nodes.values()) == nodes[:k-1] + nodes[-1:]
assert list(replacement_nodes.values()) == nodes[k-1:-1]
shuffle(nodes)
for node in nodes:
bucket.remove_node(node)
assert not bucket
assert not replacement_nodes
def test_in_range(self, mknode): # pylint: disable=no-self-use
bucket = KBucket(0, 10, 10)
assert bucket.has_in_range(mknode(intid=5)) is True
assert bucket.has_in_range(mknode(intid=11)) is False
assert bucket.has_in_range(mknode(intid=10)) is True
assert bucket.has_in_range(mknode(intid=0)) is True
def test_replacement_factor(self, mknode): # pylint: disable=no-self-use
k = 3
factor = 2
bucket = KBucket(0, 10, k, replacementNodeFactor=factor)
nodes = [mknode() for _ in range(10)]
for node in nodes:
bucket.add_node(node)
replacement_nodes = bucket.replacement_nodes
assert len(list(replacement_nodes.values())) == k * factor
assert list(replacement_nodes.values()) == nodes[k + 1:]
assert nodes[k] not in list(replacement_nodes.values())
# pylint: disable=too-few-public-methods
class TestRoutingTable:
# pylint: disable=no-self-use
def test_add_contact(self, fake_server, mknode):
fake_server.router.add_contact(mknode())
assert len(fake_server.router.buckets) == 1
assert len(fake_server.router.buckets[0].nodes) == 1
# pylint: disable=too-few-public-methods
class TestTableTraverser:
# pylint: disable=no-self-use
def test_iteration(self, fake_server, mknode):
"""
Make 10 nodes, 5 buckets, two nodes add to one bucket in order,
All buckets: [node0, node1], [node2, node3], [node4, node5],
[node6, node7], [node8, node9]
Test traver result starting from node4.
"""
nodes = [mknode(intid=x) for x in range(10)]
buckets = []
for i in range(5):
bucket = KBucket(2 * i, 2 * i + 1, 2)
bucket.add_node(nodes[2 * i])
bucket.add_node(nodes[2 * i + 1])
buckets.append(bucket)
# replace router's bucket with our test buckets
fake_server.router.buckets = buckets
# expected nodes order
expected_nodes = [nodes[5], nodes[4], nodes[3], nodes[2], nodes[7],
nodes[6], nodes[1], nodes[0], nodes[9], nodes[8]]
start_node = nodes[4]
table_traverser = TableTraverser(fake_server.router, start_node)
for index, node in enumerate(table_traverser):
assert node == expected_nodes[index]

@ -1,62 +0,0 @@
import asyncio
import pytest
from kademlia.network import Server
from kademlia.protocol import KademliaProtocol
@pytest.mark.asyncio
async def test_storing(bootstrap_node):
server = Server()
await server.listen(bootstrap_node[1] + 1)
await server.bootstrap([bootstrap_node])
await server.set('key', 'value')
result = await server.get('key')
assert result == 'value'
server.stop()
class TestSwappableProtocol:
def test_default_protocol(self): # pylint: disable=no-self-use
"""
An ordinary Server object will initially not have a protocol, but will
have a KademliaProtocol object as its protocol after its listen()
method is called.
"""
loop = asyncio.get_event_loop()
server = Server()
assert server.protocol is None
loop.run_until_complete(server.listen(8469))
assert isinstance(server.protocol, KademliaProtocol)
server.stop()
def test_custom_protocol(self): # pylint: disable=no-self-use
"""
A subclass of Server which overrides the protocol_class attribute will
have an instance of that class as its protocol after its listen()
method is called.
"""
# Make a custom Protocol and Server to go with hit.
class CoconutProtocol(KademliaProtocol):
pass
class HuskServer(Server):
protocol_class = CoconutProtocol
# An ordinary server does NOT have a CoconutProtocol as its protocol...
loop = asyncio.get_event_loop()
server = Server()
loop.run_until_complete(server.listen(8469))
assert not isinstance(server.protocol, CoconutProtocol)
server.stop()
# ...but our custom server does.
husk_server = HuskServer()
loop.run_until_complete(husk_server.listen(8469))
assert isinstance(husk_server.protocol, CoconutProtocol)
husk_server.stop()

@ -1,27 +0,0 @@
from kademlia.storage import ForgetfulStorage
class ForgetfulStorageTest:
def test_storing(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10)
storage['one'] = 'two'
assert storage['one'] == 'two'
def test_forgetting(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(0)
storage['one'] = 'two'
assert storage.get('one') is None
def test_iter(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10)
storage['one'] = 'two'
for key, value in storage:
assert key == 'one'
assert value == 'two'
def test_iter_old(self): # pylint: disable=no-self-use
storage = ForgetfulStorage(10)
storage['one'] = 'two'
for key, value in storage.iter_older_than(0):
assert key == 'one'
assert value == 'two'

@ -1,25 +0,0 @@
import hashlib
from kademlia.utils import digest, shared_prefix
class TestUtils:
def test_digest(self): # pylint: disable=no-self-use
dig = hashlib.sha1(b'1').digest()
assert dig == digest(1)
dig = hashlib.sha1(b'another').digest()
assert dig == digest('another')
def test_shared_prefix(self): # pylint: disable=no-self-use
args = ['prefix', 'prefixasdf', 'prefix', 'prefixxxx']
assert shared_prefix(args) == 'prefix'
args = ['p', 'prefixasdf', 'prefix', 'prefixxxx']
assert shared_prefix(args) == 'p'
args = ['one', 'two']
assert shared_prefix(args) == ''
args = ['hi']
assert shared_prefix(args) == 'hi'

@ -1,42 +0,0 @@
"""
General catchall for functions that don't make sense as methods.
"""
import hashlib
import operator
import asyncio
async def gather_dict(dic):
cors = list(dic.values())
results = await asyncio.gather(*cors)
return dict(zip(dic.keys(), results))
def digest(string):
if not isinstance(string, bytes):
string = str(string).encode('utf8')
return hashlib.sha1(string).digest()
def shared_prefix(args):
"""
Find the shared prefix between the strings.
For instance:
sharedPrefix(['blahblah', 'blahwhat'])
returns 'blah'.
"""
i = 0
while i < min(map(len, args)):
if len(set(map(operator.itemgetter(i), args))) != 1:
break
i += 1
return args[0][:i]
def bytes_to_bit_string(bites):
bits = [bin(bite)[2:].rjust(8, '0') for bite in bites]
return "".join(bits)
Loading…
Cancel
Save