p2p uploading work

p2p
quadrismegistus 4 years ago
parent f6f526aad1
commit fa8742877d

@ -189,6 +189,8 @@ class MainApp(MDApp):
self.username = userd.get('username','')
def is_logged_in(self,just_check_timestamp=True, use_caching=True):
# self.username='root'
# return True
if self.logged_in: return True
if not use_caching: return False
@ -235,107 +237,40 @@ class MainApp(MDApp):
self.root.ids.login_screen.login_status.text=dat['error']
return False
def upload(self,orig_img_src):
url_upload=self.api+'/upload'
filename=orig_img_src[0] if orig_img_src and os.path.exists(orig_img_src[0]) else ''
if not filename: return
server_filename=''
media_uid=None
with self.get_session() as sess:
with sess.post(url_upload,files={'file':open(filename,'rb')}) as r1:
if r1.status_code==200:
rdata1 = r1.json()
server_filename = rdata1.get('filename','')
media_uid=rdata1.get('media_uid')
if server_filename:
# pre-cache
cache_filename = os.path.join('cache','img',server_filename)
cache_filedir = os.path.dirname(cache_filename)
if not os.path.exists(cache_filedir): os.makedirs(cache_filedir)
shutil.copyfile(filename,cache_filename)
return {'cache_filename':cache_filename, 'media_uid':media_uid, 'server_filename':server_filename}
def upload(self,filename,file_id=None):
log('uploading filename:',filename)
rdata=self.api.upload(filename,file_id=file_id)
if rdata is not None:
rdata['success']='File uploaded'
return rdata
return {'error':'Upload failed'}
def post(self, content='', media_uid=None):
timestamp=time.time()
jsond = {'content':str(content),'media_uid':media_uid,
'username':self.username, 'timestamp':timestamp}
url_post = self.api+'/post'
'author':self.username, 'timestamp':timestamp}
with self.get_session() as sess:
# post
with sess.post(url_post, json=jsond) as r2:
log('got back from post: ' + r2.text)
rdata2 = r2.json()
post_id = rdata2.get('post_id',None)
if post_id:
# pre-cache
cache_dir = os.path.join('cache','json',post_id[:3])
cache_fnfn = os.path.join(cache_dir,post_id[3:]+'.json')
if not os.path.exists(cache_dir): os.makedirs(cache_dir)
with open(cache_fnfn,'w') as of:
json.dump(jsond, of)
#self.root.view_post(post_id)
self.root.change_screen('feed')
return {'post_id':post_id}
res=self.api.post(jsond)
if 'success' in res:
self.root.change_screen('feed')
return {'post_id':res['post_id']}
def get_post(self,post_id):
# get json from cache?
ofn_json = os.path.join('cache','json',str(post_id)+'.json')
if os.path.exists(ofn_json):
with open(ofn_json) as f:
jsond = json.load(f)
else:
with self.get_session() as sess:
with sess.get(self.api+'/post/'+str(post_id)) as r:
jsond = r.json()
# cache it!
with open(ofn_json,'w') as of:
json.dump(jsond, of)
return jsond
return self.api.get_post(post_id)
def get_posts(self):
return []
with self.get_session() as sess:
with sess.get(self.api+'/posts') as r:
log(r.text)
jsond=r.json()
return jsond['posts']
return []
return self.api.get_posts()
def get_my_posts(self):
with self.get_session() as sess:
with sess.get(self.api+'/posts/'+self.username) as r:
log(r.text)
jsond=r.json()
return jsond['posts']
return []
def get_posts_async(self):
result=[]
with self.get_session() as sess:
futures = [sess.get(self.api+'/posts')]
for future in as_completed(futures):
log('second?')
r=future.result()
log(r.text)
jsond=r.json()
result=jsond['posts']
log('first?')
return result
return self.api.get_posts('/author/'+self.username)
def get_image(self, img_src):
# is there an image?
if not img_src: return

@ -14,6 +14,8 @@ import asyncio
from .crypto import *
from main import log
from .p2p import *
from pathlib import Path
from functools import partial
# works better with tor?
import json
@ -41,21 +43,65 @@ class Api(object):
#from .p2p import connect
#self.node = connect()
def get(self,key):
def get(self,key_or_keys):
async def _get():
node = Server(storage=HalfForgetfulStorage())
await node.listen(PORT_LISTEN)
await node.bootstrap(NODES_PRIME)
return await node.get(key)
if type(key_or_keys) in {list,tuple,dict}:
keys = key_or_keys
res = []
res = await asyncio.gather(*[node.get(key) for key in keys])
log('RES?',res)
else:
key = key_or_keys
res = await node.get(key)
return res
return asyncio.run(_get())
def set(self,key,value):
def get_json(self,key_or_keys):
res = self.get(key_or_keys)
if type(res)==list:
return [None if x is None else json.loads(x) for x in res]
else:
return None if res is None else json.loads(res)
def set(self,key_or_keys,value_or_values):
log('hello?')
async def _set():
node = Server(storage=HalfForgetfulStorage())
log('starting server...')
node = Server() #storage=HalfForgetfulStorage())
log('listening...')
await node.listen(PORT_LISTEN)
log('bootstrapping...')
await node.bootstrap(NODES_PRIME)
return await node.set(key,value)
return asyncio.run(_set())
if type(key_or_keys) in {list,tuple,dict}:
keys = key_or_keys
values = value_or_values
log(len(keys),len(values))
assert len(keys)==len(values)
res = await asyncio.gather(*[node.set(key,value) for key,value in zip(keys,values)])
log('RES?',res)
else:
key = key_or_keys
value = value_or_values
res = await node.set(key,value)
node.stop()
return res
return asyncio.run(_set(), debug=True)
def set_json(self,key,value):
value_json = jsonify(value)
return self.set(key,value_json)
def has(self,key):
return self.get(key) is not None
@ -63,14 +109,12 @@ class Api(object):
## PERSONS
def get_person(self,username):
person = self.get('/person/'+username)
return None if person is None else json.loads(person)
return self.get_json('/person/'+username)
def set_person(self,username,public_key):
pem_public_key = save_public_key(public_key,return_instead=True)
obj = {'name':username, 'public_key':pem_public_key.decode()}
obj_str = jsonify(obj)
self.set('/person/'+username,obj_str)
self.set_json('/person/'+username,obj)
@ -131,7 +175,7 @@ class Api(object):
# verify keys
person_public_key_pem = person['public_key']
public_key = load_public_key(person_public_key_pem)
public_key = load_public_key(person_public_key_pem.encode())
real_public_key = private_key.public_key()
#log('PUBLIC',public_key.public_numbers())
@ -141,52 +185,91 @@ class Api(object):
return {'error':'keys do not match!'}
return {'success':'Login successful', 'username':name}
def append_json(self,key,data):
sofar=self.get_json(key)
if sofar is None: sofar = []
new=sofar + ([data] if type(data)!=list else data)
if self.set_json(key, new):
return {'success':'Length increased to %s' % len(new)}
return {'error':'Could not append json'}
def upload(self,filename,file_id=None, uri='/img/',uri_part='/part/'):
import sys
if not file_id: file_id = get_random_id()
part_ids = []
part_keys = []
parts=[]
PARTS=[]
buffer_size=100
for part in bytes_from_file(filename,chunksize=1024*7):
part_id = get_random_id()
part_ids.append(part_id)
part_key='/part/'+part_id
part_keys.append(part_key)
parts.append(part)
# PARTS.append(part)
log('part!:',sys.getsizeof(part))
#self.set(part_key,part)
if len(parts)>=buffer_size:
log('setting...')
self.set(part_keys,parts)
part_keys=[]
PARTS+=parts
parts=[]
# set all parts
#self.set(part_keys,PARTS)
log('# parts:',len(PARTS))
if parts and part_keys: self.set(part_keys, parts)
# how many parts?
log('# pieces!',len(part_ids))
file_store = {'ext':os.path.splitext(filename)[-1][1:], 'parts':part_ids}
log('FILE STORE??',file_store)
self.set_json(uri+file_id,file_store)
# file_store['data'].seek(0)
file_store['id']=file_id
return file_store
"""
## LOGIN
def login(data):
name=data.get('name','')
passkey=data.get('passkey','')
if not (name and passkey):
return {'error':'Login failed'},status.HTTP_400_BAD_REQUEST
person = Person.nodes.get_or_none(name=name)
if person is None:
return {'error':'User already exists'},status.HTTP_401_UNAUTHORIZED
real_passkey = person.passkey
if not check_password_hash(real_passkey, passkey):
return {'error':'Login failed'},status.HTTP_401_UNAUTHORIZED
return {'success':'Login success'},status.HTTP_200_OK
def register(name,passkey):
if not (name and passkey):
return {'error':'Register failed'},status.HTTP_400_BAD_REQUEST
if person is not None:
return {'error':'User exists'},status.HTTP_401_UNAUTHORIZED
def post(self,data):
post_id=get_random_id()
res = self.set_json('/post/'+post_id, data)
log('got data:',data)
private_key,public_key = crypto.new_keys(password=passkey)
## add to channels
self.append_json('/posts/channel/earth', post_id)
## add to user
un=data.get('author')
if un: self.append_json('/posts/author/'+un, post_id)
if res:
return {'success':'Posted! %s' % post_id, 'post_id':post_id}
return {'error':'Post failed'}
def get_post(self,post_id):
return self.get_json('/post/'+post_id)
person = Person(name=name,passkey=passkey).save()
print('REGISTERED!',data)
return {'success':'Account created', 'username':name},status.HTTP_200_OK
def get_posts(self,uri='/channel/earth'):
index = self.get_json('/posts'+uri)
if index is None: return []
data = self.get_json(['/post/'+x for x in index])
return data
## CREATE
def get_random_id():
import uuid
return uuid.uuid4().hex
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@ -196,7 +279,7 @@ def get_random_filename(filename):
fn=uuid.uuid4().hex
return (fn[:3],fn[3:]+os.path.splitext(filename)[-1])
@app.route('/api/upload',methods=['POST'])
def upload():
files = request.files
# check if the post request has the file part
@ -234,43 +317,6 @@ def upload():
return {'error':'Upload failed'},status.HTTP_406_NOT_ACCEPTABLE
@app.route('/api/post',methods=['POST'])
@app.route('/api/post/<post_id>',methods=['GET'])
def post(post_id=None):
if request.method == 'POST':
# get data
data=request.json
print('POST /api/post:',data)
# make post
post = Post(content=data.get('content',''), timestamp=data.get('timestamp')).save()
# attach author
name = data.get('username')
if name:
author = Person.nodes.get_or_none(name=name)
author.wrote.connect(post)
# attach media
media_uid=data.get('media_uid')
if media_uid:
media=Media.nodes.get_or_none(uid=media_uid)
post.has_media.connect(media)
# return
post_id=post.uid
print('created new post!',post_id)
return {'post_id':post_id},status.HTTP_200_OK
print('got post id!',post_id)
post = Post.nodes.get_or_none(uid=post_id)
if not post: return {},status.HTTP_204_NO_CONTENT
return post.data,status.HTTP_200_OK
@app.route('/api/download/<prefix>/<filename>',methods=['GET'])
def download(prefix, filename):
filedir = os.path.join(app.config['UPLOAD_DIR'], prefix)
print(filedir, filename)
@ -279,21 +325,20 @@ def download(prefix, filename):
### READ
@app.route('/api/followers/<name>')
def get_followers(name=None):
person = Person.match(G, name).first()
data = [p.data for p in person.followers]
return jsonify(data)
@app.route('/api/followers/<name>')
def get_follows(name=None):
person = Person.match(G, name).first()
data = [p.data for p in person.follows]
return jsonify(data)
@app.route('/api/posts')
@app.route('/api/posts/<name>')
def get_posts(name=None):
if name:
person = Person.nodes.get_or_none(name=name)
@ -304,7 +349,7 @@ def get_posts(name=None):
return jsonify({'posts':data})
@app.route('/api/post/<int:id>')
def get_post(id=None):
post = Post.match(G, int(id)).first()
data = post.data
@ -313,13 +358,63 @@ def get_post(id=None):
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5555)
# socketio.run(app,host='0.0.0.0', port=5555)
# from gevent import pywsgi
# from geventwebsocket.handler import WebSocketHandler
# server = pywsgi.WSGIServer(('', 5555), app, handler_class=WebSocketHandler)
# server.serve_forever()
"""
import sys
# def bytes_from_file(filename, chunksize=8192//2):
# with open(filename, "rb") as f:
# while True:
# chunk = f.read(chunksize)
# if chunk:
# log(type(chunk), sys.getsizeof(chunk))
# yield chunk
# #yield from chunk
# else:
# break
# def bytes_from_file(filename,chunksize=8192):
# with open(filename,'rb') as f:
# barray = bytearray(f.read())
# for part in barray[0:-1:chunksize]:
# log('!?',part)
# yield bytes(part)
def bytes_from_file(filename,chunksize=8192):
with open(filename, 'rb') as f:
while True:
piece = f.read(chunksize)
if not piece:
break
yield piece
# import sys
# def bytes_from_file(path,chunksize=8000):
# ''' Given a path, return an iterator over the file
# that lazily loads the file.
# '''
# path = Path(path)
# bufsize = get_buffer_size(path)
# with path.open('rb') as file:
# reader = partial(file.read1, bufsize)
# for chunk in iter(reader, bytes()):
# _bytes=bytearray()
# for byte in chunk:
# #if _bytes is None:
# # _bytes=byte
# #else:
# _bytes.append(byte)
# if sys.getsizeof(_bytes)>=8192:
# yield bytes(_bytes) #.bytes()
# _bytes=bytearray()
# if _bytes:
# yield bytes(_bytes)
# def get_buffer_size(path):
# """ Determine optimal buffer size for reading files. """
# st = os.stat(path)
# try:
# bufsize = st.st_blksize # Available on some Unix systems (like Linux)
# except AttributeError:
# bufsize = io.DEFAULT_BUFFER_SIZE
# return bufsize

@ -23,7 +23,7 @@ class HalfForgetfulStorage(ForgetfulStorage):
self.data=pickle.load(f)
print('loaded:',self.data)
# print('loaded:',self.data)
#self.data = pickle.open('sto.dat','rb') #,writeback=True)
# self.data = self.store.get('OrderedDict',OrderedDict())

@ -0,0 +1 @@
python -c "import p2p; p2p.start_first_node()"

@ -1,6 +1,6 @@
from kivymd.uix.label import MDLabel
from kivy.uix.gridlayout import GridLayout
from kivy.uix.image import AsyncImage
from kivy.uix.image import AsyncImage, Image
from kivymd.uix.boxlayout import MDBoxLayout
from kivymd.uix.card import MDCard, MDSeparator
from kivy.uix.scrollview import ScrollView
@ -18,6 +18,8 @@ from kivy.app import App
class PostTitle(MDLabel): pass
class PostGridLayout(GridLayout): pass
class PostImage(AsyncImage): pass
# class PostImage(CoreImage)
class PostImageBytes(Image): pass
class PostContent(MDLabel):
def __init__(self,**kwargs):
@ -154,7 +156,7 @@ class FeedScreen(ProtectedScreen):
i=0
lim=25
for i,post in enumerate(self.app.get_posts()):
for i,post in enumerate(reversed(self.app.get_posts())):
log('third?')
#if ln.startswith('@') or ln.startswith('RT '): continue
#i+=1

@ -11,6 +11,8 @@ from screens.feed.feed import *
import os,time,threading
from threading import Thread
from kivymd.uix.dialog import MDDialog
from kivy.core.image import Image as CoreImage
import io,shutil
class ProgressPopup(MDDialog): pass
class MessagePopup(MDDialog): pass
@ -128,30 +130,54 @@ class PostScreen(ProtectedScreen):
self.upload_button.choose()
self.orig_img_src = self.upload_button.selection
self.open_dialog('uploading')
# self.open_dialog('uploading')
# self.upload()
# self.close_dialog()
mythread = threading.Thread(target=self.upload)
mythread.start()
def upload(self):
rdata = self.app.upload(self.orig_img_src)
for k,v in rdata.items():
log('data!!!' + str(k) +':'+str(v))
setattr(self,k,v)
self.add_image()
self.close_dialog()
def add_image(self):
if hasattr(self,'image'):
self.image.source=self.cache_filename
else:
self.image_layout = image_layout = PostImageLayout()
self.image = image = PostImage(source=self.cache_filename)
image.height = '300dp'
image_layout.add_widget(image)
image_layout.height='300dp'
self.post_card.add_widget(image_layout,index=1)
# get file id
filename=self.orig_img_src[0] if self.orig_img_src and os.path.exists(self.orig_img_src[0]) else ''
if not filename: return
self.img_id = file_id = get_random_id()
self.img_ext = os.path.splitext(filename)[-1][1:]
# cache
tmp_img_fn = 'cache/img/'+self.img_id[:3]+'/'+self.img_id[3:]+'.'+self.img_ext
tmp_img_dir = os.path.dirname(tmp_img_fn)
if not os.path.exists(tmp_img_dir): os.makedirs(tmp_img_dir)
shutil.copyfile(filename, tmp_img_fn)
# rdata = self.app.upload(filename)
# if 'success' in rdata:
# # log('rdata = ',rdata)
# self.img_id = rdata['id']
# self.img_data = rdata['data']
# self.img_ext = rdata['ext']
# #for k,v in rdata.items():
# # log('data!!!' + str(k) +':'+str(v))
# # setattr(self,k,v)
self.add_image(tmp_img_fn)
log('hey?',tmp_img_fn)
self.app.upload(tmp_img_fn)
# self.close_dialog()
def add_image(self,filename):
if hasattr(self,'image_layout'):
self.post_card.remove_widget(self.image_layout)
self.image_layout = image_layout = PostImageLayout()
self.image = image = PostImage(source=filename)
# self.image.texture = img.texture
self.image.height = '300dp'
self.image_layout.add_widget(self.image)
self.image_layout.height='300dp'
self.post_card.add_widget(self.image_layout,index=1)
def post(self):
# check?
@ -195,3 +221,9 @@ class PostScreen(ProtectedScreen):
# pass
def get_random_id():
import uuid
return uuid.uuid4().hex
Loading…
Cancel
Save