THIS WORKS

macdev
quadrismegistus 4 years ago
parent 6d7520c761
commit d09c796229

@ -113,40 +113,25 @@ class Api(object):
async def get(self,key_or_keys,get_last=True):
if not type(key_or_keys) in {list,tuple}:
key_or_keys=[key_or_keys]
async def _get():
# self.log('async _get()',self.node)
#node=await _getdb(self)
node=await self.node
returned_vals = []
res=None
if type(key_or_keys) in {list,tuple,dict}:
keys = key_or_keys
self.log('??????!!!!!')
tasks=[]
for key in keys:
time_vals = await node.get(key)
self.log('time_vals1 =',time_vals)
if time_vals is None: return []
if type(time_vals)!=list: time_vals=[time_vals]
self.log(f'time_vals = {time_vals}')
#if get_last: time_vals = [time_vals[-1]]
for _time,_vals in time_vals:
task = self.decode_data(_vals)
tasks+=[task]
val = await node.get(key)
task = self.decode_data(val)
tasks.append(task)
res = await asyncio.gather(*tasks)
self.log('RES?',res)
return list(res)
else:
raise Exception('not allowed!')
return []
key=key_or_keys
val = await node.get(key)
res = await self.decode_data(val)
self.log(f'_get({key_or_keys}) --> {res}')
return res
return await _get()
def encode_data(self,val,sep=BSEP,sep2=BSEP2,do_encrypt=True,receiver_pubkey=None):
@ -360,8 +345,7 @@ class Api(object):
res = await self.get(key_or_keys,get_last=get_last)
self.log('get_json() got',res)
if not res: return None
return [jsonize_res(x) for x in res]
return jsonize_res(res)
@ -514,7 +498,9 @@ class Api(object):
async def append_json(self,key,data):
sofar=await self.get_json_val(key)
if sofar is None: sofar = []
new=sofar + ([data] if type(data)!=list else data)
if type(sofar)!=list: sofar=[sofar]
if type(data)!=list: data=[data]
new=sofar + data
if await self.set_json(key, new):
return {'success':'Length increased to %s' % len(new)}
return {'error':'Could not append json'}
@ -593,7 +579,7 @@ class Api(object):
# ## add to channels
res = await asyncio.gather(*[
self.set_json(f'/posts/channel/{channel}',post_id) for channel in channels
self.append_json(f'/posts/channel/{channel}',post_id) for channel in channels
])
# ## add to user

@ -166,7 +166,6 @@ class Server:
)
log.info("setting '%s' = '%s' on network", key, value)
dkey = digest(key)
return self.storage.set(key,value)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value):

@ -6,11 +6,18 @@ from abc import abstractmethod, ABC
BSEP_ST = b'||||'
import base64
import base64,json
def xprint(*xx):
raise Exception('\n'.join(str(x) for x in xx))
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger(__file__)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
log=logger.info
class IStorage(ABC):
"""
@ -50,36 +57,117 @@ class IStorage(ABC):
"""
class ForgetfulStorage(IStorage):
def __init__(self, ttl=604800):
# class ForgetfulStorage(IStorage):
# def __init__(self, ttl=604800):
# """
# By default, max age is a week.
# """
# self.data = OrderedDict()
# self.ttl = ttl
# def __setitem__(self, key, value):
# if key in self.data:
# del self.data[key]
# self.data[key] = (time.monotonic(), value)
# self.cull()
# def cull(self):
# for _, _ in self.iter_older_than(self.ttl):
# self.data.popitem(last=False)
# def get(self, key, default=None):
# self.cull()
# if key in self.data:
# return self[key]
# return default
# def __getitem__(self, key):
# self.cull()
# return self.data[key][1]
# def __repr__(self):
# self.cull()
# return repr(self.data)
# def iter_older_than(self, seconds_old):
# min_birthday = time.monotonic() - seconds_old
# zipped = self._triple_iter()
# matches = takewhile(lambda r: min_birthday >= r[1], zipped)
# return list(map(operator.itemgetter(0, 2), matches))
# def _triple_iter(self):
# ikeys = self.data.keys()
# ibirthday = map(operator.itemgetter(0), self.data.values())
# ivalues = map(operator.itemgetter(1), self.data.values())
# return zip(ikeys, ibirthday, ivalues)
# def __iter__(self):
# self.cull()
# ikeys = self.data.keys()
# ivalues = map(operator.itemgetter(1), self.data.values())
# return zip(ikeys, ivalues)
import pickle
class HalfForgetfulStorage(IStorage):
def __init__(self, fn='dbm.pickle', ttl=604800, log=None):
"""
By default, max age is a week.
"""
self.data = OrderedDict()
self.fn = fn
self.ttl = ttl
def dump(self):
with open(self.fn,'wb') as of:
pickle.dump(self.data, of)
def __setitem__(self, key, value):
self.set(key,value)
def keys(self): return self.data.keys()
def items(self): return self.data.items()
def values(self): return self.data.values()
def set(self,key,value):
log(f'HFS.set({key}) -> {value}')
# store
if key in self.data:
del self.data[key]
self.data[key] = (time.monotonic(), value)
# save and prune
self.dump()
self.cull()
def keys(self):
return self.data.keys()
def cull(self):
for _, _ in self.iter_older_than(self.ttl):
self.data.popitem(last=False)
def get(self, key, default=None):
self.cull()
if key in self.data:
return self[key]
def get(self, key, default=None, incl_time=False):
#self.cull()
log(f'HFS.get({key}) -> ?')
try:
val=self.data[key]
if not incl_time: val=val[1]
log(f'HFS.get({key}) -> {val}')
return val
except (KeyError,IndexError) as e:
pass
return default
def __getitem__(self, key):
self.cull()
return self.data[key][1]
#self.cull()
return self.get(key)
def __repr__(self):
self.cull()
#self.cull()
return repr(self.data)
def iter_older_than(self, seconds_old):
@ -105,62 +193,66 @@ class ForgetfulStorage(IStorage):
class HalfForgetfulStorage(ForgetfulStorage):
def __init__(self, fn='dbm', ttl=604800, log=print):
"""
By default, max age is a week.
"""
self.fn=fn
self.log=log
# class HalfForgetfulStorage(ForgetfulStorage):
# def __init__(self, fn='dbm', ttl=604800, log=print):
# """
# By default, max age is a week.
# """
# self.fn=fn
# self.log=log
# import pickledb
# self.data = pickledb.load(self.fn,False)
import dbm
self.data = dbm.open(self.fn,flag='cs')
self.ttl = ttl
# import pickledb
# # self.data = pickledb.load(self.fn,False)
# import dbm
# self.data = dbm.open(self.fn,flag='cs')
# # import shelve
# # self.data = shelve.open(self.fn, flag='cs')
# # from kivy.storage.jsonstore import JsonStore
# # self.
# self.ttl = ttl
self.log('have %s keys' % len(self))
# self.log('have %s keys' % len(self))
def keys(self):
# return self.data.getall()
return self.data.keys()
# def keys(self):
# # return self.data.getall()
# return self.data.keys()
def __len__(self):
return len(self.keys())
# def __len__(self):
# return len(self.keys())
def __setitem__(self, key, value):
self.set(key,value)
# def __setitem__(self, key, value):
# self.set(key,value)
def set(self, key,value):# try:
self.log(f'key: {key},\nvalue:{value}')
time_b=str(time.monotonic()).encode()
if type(value)!=bytes:
value = str(json.dumps(value)).encode()
# def set(self, key,value):# try:
# #self.log(f'key: {key},\nvalue:{value}')
# #if type(value)==list and len(value)==2:
# # time,val_b = value
# # value = str(time).encode() + BSEP_ST + val_b
# #self.log('newdat =',value)
newdat = BSEP_ST.join([time_b, value])
self.data[key]=newdat
# return True
def get(self, key, default=None):
# print(f'??!?\n{key}\n{self.data[key]}')
# return self.data[key][1]
# (skip time part of tuple)
val=self.data[key] if key in self.data else None
self.log('VALLLL',val)
if val is None: return None
time_b,val_b = val.split(BSEP_ST)
rval = (float(time_b.decode()), val_b)
self.log('rvalll',rval)
return rval
def __getitem__(self, key):
return self.get(key)
# self.data[key]=value
# # return True
# def get(self, key, default=None):
# # print(f'??!?\n{key}\n{self.data[key]}')
# # return self.data[key][1]
# # (skip time part of tuple)
# # val=self.data[key] if key in self.data else None
# # self.log('VALLLL',val)
# # if val is None: return None
# # time_b,val_b = val.split(BSEP_ST)
# # rval = (float(time_b.decode()), val_b)
# # self.log('rvalll',rval)
# # return rval
# return self.data.get(key,None)
# def __getitem__(self, key):
# return self.get(key)
#return data_list
# #return data_list

@ -1,73 +1,13 @@
import logging
import asyncio
import shelve
from collections import OrderedDict
import pickle,os
# NODES_PRIME = [("128.232.229.63",8467), ("68.66.241.111",8467)]
NODES_PRIME = [("68.66.241.111",8467)] # ("10.42.0.13",8467)]
async def echo(msg):
print('echo',msg)
# def boot_selfless_node(port=8468, loop=None):
# # handler = logging.StreamHandler()
# # formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# # handler.setFormatter(formatter)
# # log = logging.getLogger('kademlia')
# # log.addHandler(handler)
# # log.setLevel(logging.DEBUG)
# if not loop: loop = asyncio.get_event_loop()
# loop.set_debug(True)
# # shelf = HalfForgetfulStorage()
# #server = Server(storage=shelf)
# try:
# from kad import KadServer,HalfForgetfulStorage
# except ImportError:
# from .kad import KadServer,HalfForgetfulStorage
# server = KadServer(storage=HalfForgetfulStorage())
# loop.create_task(server.listen(port))
# # try:
# # loop.run_forever()
# # except KeyboardInterrupt:
# # pass
# # finally:
# # server.stop()
# # loop.close()
# return server,loop
NODES_PRIME = [("128.232.229.63",8467), ("68.66.241.111",8467)]
def boot_lonely_selfless_node(port=8467):
# handler = logging.StreamHandler()
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
# log = logging.getLogger('kademlia')
# log.addHandler(handler)
# log.setLevel(logging.DEBUG)
# import asyncio
# loop = asyncio.new_event_loop()
# # async def go():
# # from api import _getdb
# # node = await _getdb()
# # i=0
# # while i+1:
# # if not i%10: print(node)
# # await asyncio.sleep(5)
# # i+=1
# asyncio.run(go())
async def go():
from api import Api,PORT_LISTEN
API = Api()

Loading…
Cancel
Save