async io finally working! eureka!

macdev
quadrismegistus 4 years ago
parent 7a5ec174b6
commit 27bb44767c

@ -14,7 +14,7 @@ subprocess.DEVNULL = -3 # noqa
import asyncio
import os
os.environ['KIVY_EVENTLOOP'] = 'async'
os.environ['KIVY_EVENTLOOP'] = 'asyncio'
# loop = asyncio.get_event_loop()
# loop.set_debug(True)
@ -299,7 +299,7 @@ class MainApp(MDApp):
if data_piece is not None:
of.write(data_piece)
def post(self, content='', file_id=None, file_ext=None, anonymous=False):
async def post(self, content='', file_id=None, file_ext=None, anonymous=False):
#timestamp=time.time()
jsond={}
#jsond['timestamp']=
@ -310,7 +310,7 @@ class MainApp(MDApp):
jsond['author']=self.username
self.log('posting:',jsond)
res=self.api.post(jsond)
res=await self.api.post(jsond)
if 'success' in res:
self.root.change_screen('feed')
return {'post_id':res['post_id']}
@ -319,11 +319,11 @@ class MainApp(MDApp):
def get_post(self,post_id):
return self.api.get_post(post_id)
async def get_post(self,post_id):
return await self.api.get_post(post_id)
def get_posts(self):
return self.api.get_posts()
async def get_posts(self):
return await self.api.get_posts()
def get_my_posts(self):
return self.api.get_posts('/author/'+self.username)
@ -345,18 +345,66 @@ class MainApp(MDApp):
shutil.copyfileobj(r.raw, of)
return ofn_image
def app_func(self):
'''This will run both methods asynchronously and then block until they
are finished
'''
# self.other_task = asyncio.ensure_future(self.waste_time_freely())
self.other_task = asyncio.ensure_future(self.api.connect_forever())
async def run_wrapper():
# we don't actually need to set asyncio as the lib because it is
# the default, but it doesn't hurt to be explicit
await self.async_run() #async_lib='asyncio')
print('App done')
self.other_task.cancel()
return asyncio.gather(run_wrapper(), self.other_task)
async def waste_time_freely(self):
'''This method is also run by the asyncio loop and periodically prints
something.
'''
try:
i = 0
while True:
if self.root is not None:
#status = self.root.ids.label.status
status='TimeWaster'
self.log('{} on the beach'.format(status))
# # get some sleep
# if self.root.ids.btn1.state != 'down' and i >= 2:
# i = 0
# self.log('Yawn, getting tired. Going to sleep')
# self.root.ids.btn1.trigger_action()
i += 1
await asyncio.sleep(2)
except asyncio.CancelledError as e:
self.log('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
self.log('Done wasting time')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(MainApp().app_func())
loop.close()
def main():
# start_logger()
App = MainApp()
App.run()
if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(main())
# loop.close()
main()
# def main():
# # start_logger()
# App = MainApp()
# App.run()
# if __name__ == '__main__':
# # loop = asyncio.get_event_loop()
# # asyncio.set_event_loop(loop)
# # loop.run_until_complete(main())
# # loop.close()
# main()

@ -10,7 +10,7 @@ import os,time
from datetime import datetime
from kivy.app import App
from threading import Thread
import asyncio
@ -172,22 +172,25 @@ class FeedScreen(ProtectedScreen):
posts = ListProperty()
def on_pre_enter(self):
# self.log('ids:' +str(self.ids.post_carousel.ids))
for post in self.posts:
self.ids.post_carousel.remove_widget(post)
i=0
lim=25
for i,post in enumerate(reversed(self.app.get_posts())):
#if ln.startswith('@') or ln.startswith('RT '): continue
#i+=1
if i>lim: break
async def go():
# self.log('ids:' +str(self.ids.post_carousel.ids))
for post in self.posts:
self.ids.post_carousel.remove_widget(post)
#post = Post(title=f'Marx Zuckerberg', content=ln.strip())
self.log('???')
post_obj = PostCard(post)
self.posts.append(post_obj)
self.ids.post_carousel.add_widget(post_obj)
i=0
lim=25
posts=await self.app.get_posts()
for i,post in enumerate(reversed(posts)):
#if ln.startswith('@') or ln.startswith('RT '): continue
#i+=1
if i>lim: break
#post = Post(title=f'Marx Zuckerberg', content=ln.strip())
self.log('???')
post_obj = PostCard(post)
self.posts.append(post_obj)
self.ids.post_carousel.add_widget(post_obj)
asyncio.create_task(go())
def on_pre_enter_test(self):
i=0

@ -190,17 +190,16 @@ class PostScreen(ProtectedScreen):
# log('REUPLOADING')
# self.upload()
def do_post():
async def do_post():
file_id = self.img_id if hasattr(self,'img_id') else None
file_ext = self.img_ext if hasattr(self,'img_ext') else None
self.app.post(content=content, file_id=file_id, file_ext=file_ext)
await self.app.post(content=content, file_id=file_id, file_ext=file_ext)
import time
self.close_dialog()
self.open_dialog('posting')
#Thread(target=do_post).start()
do_post()
asyncio.create_task(do_post())
# class ViewPostScreen(ProtectedScreen):
# post_id = ObjectProperty()

@ -33,7 +33,7 @@ UPLOAD_DIR = 'uploads/'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
# PORT_SPEAK = 8468
PORT_LISTEN = 5637
PORT_LISTEN = 5639
# Api Functions
from threading import Thread
@ -74,25 +74,44 @@ class Api(object):
#self._node=self.connect()
pass
# @property
# def node(self):
# if not hasattr(self,'_node'):
# self._node=self.connect()
# return self._node
# def connect(self,port=PORT_LISTEN):
# self.log('connecting...')
# async def _connect():
# return await _getdb(self,port)
# return asyncio.run(_connect())
def get(self,key_or_keys):
async def connect_forever(self):
try:
i = 0
self._node = await self.connect()
while True:
self.log(f'Node status (pulse {i}): {self._node}')
# # get some sleep
# if self.root.ids.btn1.state != 'down' and i >= 2:
# i = 0
# self.log('Yawn, getting tired. Going to sleep')
# self.root.ids.btn1.trigger_action()
i += 1
await asyncio.sleep(2)
except asyncio.CancelledError as e:
self.log('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
self.log('Done wasting time')
@property
async def node(self):
if not hasattr(self,'_node'):
self._node=await self.connect()
return self._node
async def connect(self,port=PORT_LISTEN):
self.log('connecting...')
return await _getdb(self,port)
async def get(self,key_or_keys):
async def _get():
# self.log('async _get()',self.node)
node=await _getdb(self)
#node=self.node
#node=await _getdb(self)
node=await self.node
if type(key_or_keys) in {list,tuple,dict}:
keys = key_or_keys
@ -103,16 +122,18 @@ class Api(object):
key = key_or_keys
res = await node.get(key)
node.stop()
#node.stop()
return res
return asyncio.run(_get())
# return asyncio.run(_get())
#res = await asyncio.create_task(_get())
#self.log('RES =',res)
return await _get()
# return loop.create_task(_get())
def get_json(self,key_or_keys):
res = self.get(key_or_keys)
async def get_json(self,key_or_keys):
res = await self.get(key_or_keys)
self.log('GET_JSON',res)
if type(res)==list:
# self.log('is a list!',json.loads(res[0]))
@ -121,12 +142,12 @@ class Api(object):
#log('RES!!!',res)
return None if res is None else json.loads(res)
def set(self,key_or_keys,value_or_values):
async def set(self,key_or_keys,value_or_values):
async def _go():
# self.log('async _set()',self.node)
# node=self.node
node=await _getdb(self)
# node=self.node
#node=await _getdb(self)
node=await self.node
if type(key_or_keys) in {list,tuple,dict}:
@ -140,7 +161,7 @@ class Api(object):
value = value_or_values
res = await node.set(key,value) #'this is a test')
node.stop()
#node.stop()
return res
# loop=asyncio.get_event_loop()
@ -168,12 +189,9 @@ class Api(object):
# print(result)
# return result
res= asyncio.run(_go(), debug=True)#
# res= asyncio.run(_set(key_or_keys,value_or_values), debug=True)#
print('res = ',res)
return res
return _go()
def set_json(self,key,value):
async def set_json(self,key,value):
value_json = jsonify(value)
# self.log('OH NO!',sys.getsizeof(value_json))
return self.set(key,value_json)
@ -259,11 +277,11 @@ class Api(object):
return {'error':'keys do not match!'}
return {'success':'Login successful', 'username':name}
def append_json(self,key,data):
sofar=self.get_json(key)
async def append_json(self,key,data):
sofar=await self.get_json(key)
if sofar is None: sofar = []
new=sofar + ([data] if type(data)!=list else data)
if self.set_json(key, new):
if await self.set_json(key, new):
return {'success':'Length increased to %s' % len(new)}
return {'error':'Could not append json'}
@ -321,9 +339,9 @@ class Api(object):
return file_store
def post(self,data):
async def post(self,data):
post_id=get_random_id()
res = self.set_json('/post/'+post_id, data)
res = await self.set_json('/post/'+post_id, data)
self.log('Api.post() got data back from set_json():',res)
# ## add to channels
@ -337,14 +355,16 @@ class Api(object):
return {'success':'Posted! %s' % post_id, 'post_id':post_id}
return {'error':'Post failed'}
def get_post(self,post_id):
async def get_post(self,post_id):
return self.get_json('/post/'+post_id)
def get_posts(self,uri='/channel/earth'):
index = self.get_json('/posts'+uri)
async def get_posts(self,uri='/channel/earth'):
index = await self.get_json('/posts'+uri)
if index is None: return []
data = self.get_json(['/post/'+x for x in index])
return data
return await data

Loading…
Cancel
Save