From f10f0dada6fb43f693da637ff0f9210af96f923d Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sat, 22 Aug 2020 16:31:00 -0400 Subject: [PATCH] First working PoC with a new task structure --- cps/helper.py | 62 +++++------- cps/services/worker.py | 209 +++++++++++++++++++++++++++++++++++++++++ cps/tasks/__init__.py | 0 cps/tasks/convert.py | 196 ++++++++++++++++++++++++++++++++++++++ cps/web.py | 12 +-- 5 files changed, 433 insertions(+), 46 deletions(-) create mode 100644 cps/services/worker.py create mode 100644 cps/tasks/__init__.py create mode 100644 cps/tasks/convert.py diff --git a/cps/helper.py b/cps/helper.py index 681719a9..1634ec63 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -40,6 +40,7 @@ from sqlalchemy.sql.expression import true, false, and_, or_, text, func from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash from . import calibre_db +from .tasks.convert import TaskConvert try: from urllib.parse import quote @@ -66,6 +67,8 @@ from .pagination import Pagination from .subproc_wrapper import process_wait from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY +from .services.worker import WorkerThread +from . import tasks log = logger.create() @@ -103,7 +106,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, text = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title)) settings['old_book_format'] = old_book_format settings['new_book_format'] = new_book_format - worker.add_convert(file_path, book.id, user_id, text, settings, kindle_mail) + WorkerThread.add(user_id, TaskConvert(file_path, book.id, text, settings, kindle_mail)) return None else: error_message = _(u"%(format)s not found: %(fn)s", @@ -703,47 +706,30 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for task in tasklist: - if task['user'] == current_user.nickname or current_user.role_admin(): - if task['formStarttime']: - task['starttime'] = format_datetime(task['formStarttime'], format='short', locale=get_locale()) - # task2['formStarttime'] = "" - else: - if 'starttime' not in task: - task['starttime'] = "" - - if 'formRuntime' not in task: - task['runtime'] = "" - else: - task['runtime'] = format_runtime(task['formRuntime']) + for user, task in tasklist: + if user == current_user.nickname or current_user.role_admin(): + ret = {} + if task.start_time: + ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale()) + ret['runtime'] = format_runtime(task.runtime) # localize the task status - if isinstance( task['stat'], int): - if task['stat'] == STAT_WAITING: - task['status'] = _(u'Waiting') - elif task['stat'] == STAT_FAIL: - task['status'] = _(u'Failed') - elif task['stat'] == STAT_STARTED: - task['status'] = _(u'Started') - elif task['stat'] == STAT_FINISH_SUCCESS: - task['status'] = _(u'Finished') - else: - task['status'] = _(u'Unknown Status') - - # localize the task type - if isinstance( task['taskType'], int): - if task['taskType'] == TASK_EMAIL: - task['taskMessage'] = _(u'E-mail: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] - elif task['taskType'] == TASK_UPLOAD: - task['taskMessage'] = _(u'Upload: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT_ANY: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] + if isinstance(task.stat, int): + if task.stat == STAT_WAITING: + ret['status'] = _(u'Waiting') + elif task.stat == STAT_FAIL: + ret['status'] = _(u'Failed') + elif task.stat == STAT_STARTED: + ret['status'] = _(u'Started') + elif task.stat == STAT_FINISH_SUCCESS: + ret['status'] = _(u'Finished') else: - task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess'] + ret['status'] = _(u'Unknown Status') - renderedtasklist.append(task) + ret['taskMessage'] = "{}: {}".format(_(task.name), task.message) + ret['progress'] = "{} %".format(int(task.progress * 100)) + ret['user'] = user + renderedtasklist.append(ret) return renderedtasklist diff --git a/cps/services/worker.py b/cps/services/worker.py new file mode 100644 index 00000000..61f199f9 --- /dev/null +++ b/cps/services/worker.py @@ -0,0 +1,209 @@ + +from __future__ import division, print_function, unicode_literals +import sys +import os +import re +import smtplib +import socket +import time +import threading +try: + import queue +except ImportError: + import Queue as queue +from glob import glob +from shutil import copyfile +from datetime import datetime + +try: + from StringIO import StringIO + from email.MIMEBase import MIMEBase + from email.MIMEMultipart import MIMEMultipart + from email.MIMEText import MIMEText +except ImportError: + from io import StringIO + from email.mime.base import MIMEBase + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + +from email import encoders +from email.utils import formatdate +from email.utils import make_msgid +from email.generator import Generator +from flask_babel import gettext as _ + +from cps import calibre_db, db +from cps import logger, config +from cps.subproc_wrapper import process_open +from cps import gdriveutils +from flask_babel import gettext as _ +import abc + +log = logger.create() + +# task 'status' consts +STAT_WAITING = 0 +STAT_FAIL = 1 +STAT_STARTED = 2 +STAT_FINISH_SUCCESS = 3 + + +def _get_main_thread(): + for t in threading.enumerate(): + if t.__class__.__name__ == '_MainThread': + return t + raise Exception("main thread not found?!") + + + +class ImprovedQueue(queue.Queue): + def to_list(self): + """ + Returns a copy of all items in the queue without removing them. + """ + + with self.mutex: + return list(self.queue) + +#Class for all worker tasks in the background +class WorkerThread(threading.Thread): + __instance = None + + @classmethod + def getInstance(cls): + if cls._instance is None: + cls._instance = WorkerThread() + return cls._instance + + def __init__(self): + threading.Thread.__init__(self) + + self.finished = list() + + self.db_queue = queue.Queue() + calibre_db.add_queue(self.db_queue) + + self.doLock = threading.Lock() + self.queue = ImprovedQueue() + + # todo: figure this stuff out and where it should goes + self.asyncSMTP = None + + self.start() + + @classmethod + def add(cls, user, task): + ins = cls.getInstance() + ins.queue.put((user, task)) + + @property + def tasks(self): + with self.doLock: + tasks = list(self.queue.to_list()) + self.finished + return tasks # todo: order by data added + + # Main thread loop starting the different tasks + def run(self): + main_thread = _get_main_thread() + while main_thread.is_alive(): + user, item = self.queue.get() + + # add to list so that in-progress tasks show up + with self.doLock: + self.finished.append((user, item)) + + try: + item.start(self) + print(item) + except Exception as e: + log.exception(e) + + self.queue.task_done() + + def get_send_status(self): + raise NotImplementedError + # if self.asyncSMTP: + # return self.asyncSMTP.getTransferStatus() + # else: + # return "0 %" + + def _delete_completed_tasks(self): + raise NotImplementedError() + # for index, task in reversed(list(enumerate(self.UIqueue))): + # if task['progress'] == "100 %": + # # delete tasks + # self.queue.pop(index) + # self.UIqueue.pop(index) + # # if we are deleting entries before the current index, adjust the index + # if index <= self.current and self.current: + # self.current -= 1 + # self.last = len(self.queue) + +class CalibreTask(metaclass=abc.ABCMeta): + + def __init__(self, message): + self._progress = 0 + self.stat = STAT_WAITING + self.error = None + self.start_time = None + self.end_time = None + self.message = message + + @abc.abstractmethod + def run(self, worker_thread): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + @abc.abstractmethod + def name(self): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + def start(self, *args): + self.start_time = datetime.now() + self.run(*args) + self.end_time = datetime.now() + + @property + def stat(self): + return self._stat + + @stat.setter + def stat(self, x): + self._stat = x + + @property + def progress(self): + return self._progress + + @progress.setter + def progress(self, x): + # todo: throw error if outside of [0,1] + self._progress = x + + @property + def error(self): + return self._error + + @error.setter + def error(self, x): + self._error = x + + @property + def runtime(self): + return (self.end_time or datetime.now()) - self.start_time + + @progress.setter + def progress(self, x): + # todo: throw error if outside of [0,1] + self._progress = x + + def _handleError(self, error_message): + log.error(error_message) + self.stat = STAT_FAIL + self.progress = 1 + self.error = error_message + + def _handleSuccess(self): + self.stat = STAT_FINISH_SUCCESS + self.progress = 1 diff --git a/cps/tasks/__init__.py b/cps/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py new file mode 100644 index 00000000..58b45d33 --- /dev/null +++ b/cps/tasks/convert.py @@ -0,0 +1,196 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import re + +from glob import glob +from shutil import copyfile + +from cps.services.worker import CalibreTask +from cps import calibre_db, db +from cps import logger, config +from cps.subproc_wrapper import process_open +from flask_babel import gettext as _ + +log = logger.create() + + +class TaskConvert(CalibreTask): + def __init__(self, file_path, bookid, taskMessage, settings, kindle_mail): + super().__init__(taskMessage) + self.file_path = file_path + self.bookid = bookid + self.settings = settings + self.kindle_mail = kindle_mail + + self.results = dict() + + def run(self, worker_thread): + self.worker_thread = worker_thread + filename = self._convert_ebook_format() + # todo: re-enable this (need to set up gdrive to test with) + # if filename: + # if config.config_use_google_drive: + # gdriveutils.updateGdriveCalibreFromLocal() + # if curr_task == TASK_CONVERT: + # self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], + # filename, self.queue[index]['settings'], self.queue[index]['kindle'], + # self.UIqueue[index]['user'], self.queue[index]['title'], + # self.queue[index]['settings']['body'], internal=True) + + self._handleSuccess() + pass + + def _convert_ebook_format(self): + error_message = None + file_path = self.file_path + book_id = self.bookid + format_old_ext = u'.' + self.settings['old_book_format'].lower() + format_new_ext = u'.' + self.settings['new_book_format'].lower() + + # check to see if destination format already exists - + # if it does - mark the conversion task as complete and return a success + # this will allow send to kindle workflow to continue to work + if os.path.isfile(file_path + format_new_ext): + log.info("Book id %d already converted to %s", book_id, format_new_ext) + cur_book = calibre_db.get_book(book_id) + self.results['path'] = file_path + self.results['title'] = cur_book.title + self._handleSuccess() + return file_path + format_new_ext + else: + log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", + book_id, + format_new_ext) + + if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': + check, error_message = self._convert_kepubify(file_path, + format_old_ext, + format_new_ext) + else: + # check if calibre converter-executable is existing + if not os.path.exists(config.config_converterpath): + # ToDo Text is not translated + self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) + return + check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext) + + if check == 0: + cur_book = calibre_db.get_book(book_id) + if os.path.isfile(file_path + format_new_ext): + # self.db_queue.join() + new_format = db.Data(name=cur_book.data[0].name, + book_format=self.settings['new_book_format'].upper(), + book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) + + # todo: this may not be needed anymore, might be able to access the DB directly now. See #1565 + task = {'task':'add_format','id': book_id, 'format': new_format} + self.worker_thread.db_queue.put(task) + # To Do how to handle error? + + '''cur_book.data.append(new_format) + try: + # db.session.merge(cur_book) + calibre_db.session.commit() + except OperationalError as e: + calibre_db.session.rollback() + log.error("Database error: %s", e) + self._handleError(_(u"Database error: %(error)s.", error=e)) + return''' + + self.results['path'] = cur_book.path + self.results['title'] = cur_book.title + if config.config_use_google_drive: + os.remove(file_path + format_old_ext) + self._handleSuccess() + return file_path + format_new_ext + else: + error_message = format_new_ext.upper() + ' format not found on disk' + log.info("ebook converter failed with error while converting book") + if not error_message: + error_message = 'Ebook converter failed with unknown error' + self._handleError(error_message) + return + + def _convert_kepubify(self, file_path, format_old_ext, format_new_ext): + quotes = [1, 3] + command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] + try: + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Kepubify-converter failed: %(error)s", error=e) + self.progress = 0.01 + while True: + nextline = p.stdout.readlines() + nextline = [x.strip('\n') for x in nextline if x != '\n'] + if sys.version_info < (3, 0): + nextline = [x.decode('utf-8') for x in nextline] + for line in nextline: + log.debug(line) + if p.poll() is not None: + break + + # ToD Handle + # process returncode + check = p.returncode + + # move file + if check == 0: + converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) + if len(converted_file) == 1: + copyfile(converted_file[0], (file_path + format_new_ext)) + os.unlink(converted_file[0]) + else: + return 1, _(u"Converted file not found or more than one file in folder %(folder)s", + folder=os.path.dirname(file_path)) + return check, None + + def _convert_calibre(self, file_path, format_old_ext, format_new_ext): + try: + # Linux py2.7 encode as list without quotes no empty element for parameters + # linux py3.x no encode and as list without quotes no empty element for parameters + # windows py2.7 encode as string with quotes empty element for parameters is okay + # windows py 3.x no encode and as string with quotes empty element for parameters is okay + # separate handling for windows and linux + quotes = [1, 2] + command = [config.config_converterpath, (file_path + format_old_ext), + (file_path + format_new_ext)] + quotes_index = 3 + if config.config_calibre: + parameters = config.config_calibre.split(" ") + for param in parameters: + command.append(param) + quotes.append(quotes_index) + quotes_index += 1 + + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Ebook-converter failed: %(error)s", error=e) + + while p.poll() is None: + nextline = p.stdout.readline() + if os.name == 'nt' and sys.version_info < (3, 0): + nextline = nextline.decode('windows-1252') + elif os.name == 'posix' and sys.version_info < (3, 0): + nextline = nextline.decode('utf-8') + log.debug(nextline.strip('\r\n')) + # parse progress string from calibre-converter + progress = re.search(r"(\d+)%\s.*", nextline) + if progress: + self.progress = int(progress.group(1)) / 100 + + # process returncode + check = p.returncode + calibre_traceback = p.stderr.readlines() + error_message = "" + for ele in calibre_traceback: + if sys.version_info < (3, 0): + ele = ele.decode('utf-8') + log.debug(ele.strip('\n')) + if not ele.startswith('Traceback') and not ele.startswith(' File'): + error_message = "Calibre failed with error: %s" % ele.strip('\n') + return check, error_message + + @property + def name(self): + return "Convert" diff --git a/cps/web.py b/cps/web.py index 7e39054d..6064c1e9 100644 --- a/cps/web.py +++ b/cps/web.py @@ -33,7 +33,7 @@ import re from babel import Locale as LC from babel.dates import format_date from babel.core import UnknownLocaleError -from flask import Blueprint +from flask import Blueprint, jsonify from flask import render_template, request, redirect, send_from_directory, make_response, g, flash, abort, url_for from flask_babel import gettext as _ from flask_login import login_user, logout_user, login_required, current_user, confirm_login @@ -48,7 +48,7 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, cli +from . import constants, logger, isoLanguages, services, worker, worker2, cli from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download @@ -383,12 +383,8 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = worker.get_taskstatus() - answer = render_task_status(tasks) - js = json.dumps(answer, default=json_serial) - response = make_response(js) - response.headers["Content-Type"] = "application/json; charset=utf-8" - return response + tasks = worker2._worker2.tasks + return jsonify(render_task_status(tasks)) @web.route("/ajax/bookmark//", methods=['POST'])