Refactored web.py to shrink size of file

pull/2262/head^2
Ozzie Isaacs 2 years ago
parent 47414ada69
commit e7464f2694

@ -23,32 +23,22 @@ import sys
import os
import mimetypes
from babel import Locale as LC
from babel import negotiate_locale
from babel.core import UnknownLocaleError
from flask import request, g
from flask import Flask
from .MyLoginManager import MyLoginManager
from flask_babel import Babel
from flask_principal import Principal
from . import config_sql
from . import logger
from . import cache_buster
from .cli import CliParameter
from .constants import CONFIG_DIR
from . import ub, db
from .reverseproxy import ReverseProxied
from .server import WebServer
from .dep_check import dependency_check
from . import services
from .updater import Updater
try:
import lxml
lxml_present = True
except ImportError:
lxml_present = False
from .babel import babel, BABEL_TRANSLATIONS
from . import config_sql
from . import logger
from . import cache_buster
from . import ub, db
try:
from flask_wtf.csrf import CSRFProtect
@ -78,6 +68,8 @@ mimetypes.add_type('application/ogg', '.oga')
mimetypes.add_type('text/css', '.css')
mimetypes.add_type('text/javascript; charset=UTF-8', '.js')
log = logger.create()
app = Flask(__name__)
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
@ -86,14 +78,8 @@ app.config.update(
WTF_CSRF_SSL_STRICT=False
)
lm = MyLoginManager()
babel = Babel()
_BABEL_TRANSLATIONS = set()
log = logger.create()
config = config_sql._ConfigSQL()
cli_param = CliParameter()
@ -120,9 +106,8 @@ def create_app():
cli_param.init()
ub.init_db(os.path.join(CONFIG_DIR, "app.db"), cli_param.user_credentials)
ub.init_db(cli_param.settings_path, cli_param.user_credentials)
# ub.init_db(os.path.join(CONFIG_DIR, "app.db"))
# pylint: disable=no-member
config_sql.load_configuration(config, ub.session, cli_param)
@ -139,26 +124,26 @@ def create_app():
if sys.version_info < (3, 0):
log.info(
'*** Python2 is EOL since end of 2019, this version of Calibre-Web is no longer supporting Python2, please update your installation to Python3 ***')
'*** Python2 is EOL since end of 2019, this version of Calibre-Web is no longer supporting Python2, '
'please update your installation to Python3 ***')
print(
'*** Python2 is EOL since end of 2019, this version of Calibre-Web is no longer supporting Python2, please update your installation to Python3 ***')
'*** Python2 is EOL since end of 2019, this version of Calibre-Web is no longer supporting Python2, '
'please update your installation to Python3 ***')
web_server.stop(True)
sys.exit(5)
if not lxml_present:
log.info('*** "lxml" is needed for calibre-web to run. Please install it using pip: "pip install lxml" ***')
print('*** "lxml" is needed for calibre-web to run. Please install it using pip: "pip install lxml" ***')
web_server.stop(True)
sys.exit(6)
if not wtf_present:
log.info('*** "flask-WTF" is needed for calibre-web to run. Please install it using pip: "pip install flask-WTF" ***')
print('*** "flask-WTF" is needed for calibre-web to run. Please install it using pip: "pip install flask-WTF" ***')
log.info('*** "flask-WTF" is needed for calibre-web to run. '
'Please install it using pip: "pip install flask-WTF" ***')
print('*** "flask-WTF" is needed for calibre-web to run. '
'Please install it using pip: "pip install flask-WTF" ***')
web_server.stop(True)
sys.exit(7)
for res in dependency_check() + dependency_check(True):
log.info('*** "{}" version does not fit the requirements. Should: {}, Found: {}, please consider installing required version ***'
.format(res['name'],
res['target'],
res['found']))
log.info('*** "{}" version does not fit the requirements. '
'Should: {}, Found: {}, please consider installing required version ***'
.format(res['name'],
res['target'],
res['found']))
app.wsgi_app = ReverseProxied(app.wsgi_app)
if os.environ.get('FLASK_DEBUG'):
@ -172,8 +157,8 @@ def create_app():
web_server.init_app(app, config)
babel.init_app(app)
_BABEL_TRANSLATIONS.update(str(item) for item in babel.list_translations())
_BABEL_TRANSLATIONS.add('en')
BABEL_TRANSLATIONS.update(str(item) for item in babel.list_translations())
BABEL_TRANSLATIONS.add('en')
if services.ldap:
services.ldap.init_app(app, config)
@ -185,27 +170,3 @@ def create_app():
return app
@babel.localeselector
def get_locale():
# if a user is logged in, use the locale from the user settings
user = getattr(g, 'user', None)
if user is not None and hasattr(user, "locale"):
if user.name != 'Guest': # if the account is the guest account bypass the config lang settings
return user.locale
preferred = list()
if request.accept_languages:
for x in request.accept_languages.values():
try:
preferred.append(str(LC.parse(x.replace('-', '_'))))
except (UnknownLocaleError, ValueError) as e:
log.debug('Could not parse locale "%s": %s', x, e)
return negotiate_locale(preferred or ['en'], _BABEL_TRANSLATIONS)
'''@babel.timezoneselector
def get_timezone():
user = getattr(g, 'user', None)
return user.timezone if user else None'''

@ -65,7 +65,7 @@ _VERSIONS = OrderedDict(
SQLite=sqlite3.sqlite_version,
)
_VERSIONS.update(ret)
_VERSIONS.update(uploader.get_versions(False))
_VERSIONS.update(uploader.get_versions())
def collect_stats():

@ -28,11 +28,10 @@ import operator
from datetime import datetime, timedelta, time
from functools import wraps
from babel import Locale
from babel.dates import format_datetime, format_time, format_timedelta
from flask import Blueprint, flash, redirect, url_for, abort, request, make_response, send_from_directory, g, Response
from flask_login import login_required, current_user, logout_user, confirm_login
from flask_babel import gettext as _
from flask_babel import get_locale, format_time, format_datetime, format_timedelta
from flask import session as flask_session
from sqlalchemy import and_
from sqlalchemy.orm.attributes import flag_modified
@ -40,14 +39,14 @@ from sqlalchemy.exc import IntegrityError, OperationalError, InvalidRequestError
from sqlalchemy.sql.expression import func, or_, text
from . import constants, logger, helper, services, cli
from . import db, calibre_db, ub, web_server, get_locale, config, updater_thread, babel, gdriveutils, \
from . import db, calibre_db, ub, web_server, config, updater_thread, babel, gdriveutils, \
kobo_sync_status, schedule
from .helper import check_valid_domain, send_test_mail, reset_password, generate_password_hash, check_email, \
valid_email, check_username, update_thumbnail_cache
from .gdriveutils import is_gdrive_ready, gdrive_support
from .render_template import render_title_template, get_sidebar_config
from .services.worker import WorkerThread
from . import debug_info, _BABEL_TRANSLATIONS
from . import debug_info, BABEL_TRANSLATIONS
log = logger.create()
@ -205,9 +204,9 @@ def admin():
all_user = ub.session.query(ub.User).all()
email_settings = config.get_mail_settings()
schedule_time = format_time(time(hour=config.schedule_start_time), format="short", locale=locale)
schedule_time = format_time(time(hour=config.schedule_start_time), format="short")
t = timedelta(hours=config.schedule_duration // 60, minutes=config.schedule_duration % 60)
schedule_duration = format_timedelta(t, format="short", threshold=.99, locale=locale)
schedule_duration = format_timedelta(t, threshold=.99)
return render_title_template("admin.html", allUser=all_user, email=email_settings, config=config, commit=commit,
feature_support=feature_support, schedule_time=schedule_time,
@ -279,7 +278,7 @@ def view_configuration():
def edit_user_table():
visibility = current_user.view_settings.get('useredit', {})
languages = calibre_db.speaking_language()
translations = babel.list_translations() + [Locale('en')]
translations = [LC('en')] + babel.list_translations()
all_user = ub.session.query(ub.User)
tags = calibre_db.session.query(db.Tags)\
.join(db.books_tags_link)\
@ -398,7 +397,7 @@ def delete_user():
@login_required
@admin_required
def table_get_locale():
locale = babel.list_translations() + [Locale('en')]
locale = [LC('en')] + babel.list_translations()
ret = list()
current_locale = get_locale()
for loc in locale:
@ -499,7 +498,7 @@ def edit_list_user(param):
elif param == 'locale':
if user.name == "Guest":
raise Exception(_("Guest's Locale is determined automatically and can't be set"))
if vals['value'] in _BABEL_TRANSLATIONS:
if vals['value'] in BABEL_TRANSLATIONS:
user.locale = vals['value']
else:
raise Exception(_("No Valid Locale Given"))
@ -1668,12 +1667,11 @@ def edit_scheduledtasks():
time_field = list()
duration_field = list()
locale = get_locale()
for n in range(24):
time_field.append((n , format_time(time(hour=n), format="short", locale=locale)))
time_field.append((n , format_time(time(hour=n), format="short",)))
for n in range(5, 65, 5):
t = timedelta(hours=n // 60, minutes=n % 60)
duration_field.append((n, format_timedelta(t, format="short", threshold=.99, locale=locale)))
duration_field.append((n, format_timedelta(t, threshold=.9)))
return render_title_template("schedule_edit.html", config=content, starttime=time_field, duration=duration_field, title=_(u"Edit Scheduled Tasks Settings"))

@ -0,0 +1,30 @@
from babel import Locale as LC
from babel import negotiate_locale
from flask_babel import Babel
from babel.core import UnknownLocaleError
from flask import request, g
from . import logger
log = logger.create()
babel = Babel()
BABEL_TRANSLATIONS = set()
@babel.localeselector
def get_locale():
# if a user is logged in, use the locale from the user settings
user = getattr(g, 'user', None)
if user is not None and hasattr(user, "locale"):
if user.name != 'Guest': # if the account is the guest account bypass the config lang settings
return user.locale
preferred = list()
if request.accept_languages:
for x in request.accept_languages.values():
try:
preferred.append(str(LC.parse(x.replace('-', '_'))))
except (UnknownLocaleError, ValueError) as e:
log.debug('Could not parse locale "%s": %s', x, e)
return negotiate_locale(preferred or ['en'], BABEL_TRANSLATIONS)

@ -43,6 +43,7 @@ from sqlalchemy.sql.expression import and_, true, false, text, func, or_
from sqlalchemy.ext.associationproxy import association_proxy
from flask_login import current_user
from flask_babel import gettext as _
from flask_babel import get_locale
from flask import flash
from . import logger, ub, isoLanguages
@ -898,7 +899,6 @@ class CalibreDB:
# Creates for all stored languages a translated speaking name in the array for the UI
def speaking_language(self, languages=None, return_all_languages=False, with_count=False, reverse_order=False):
from . import get_locale
if with_count:
if not languages:

@ -36,11 +36,12 @@ except ImportError:
from flask import Blueprint, request, flash, redirect, url_for, abort, Markup, Response
from flask_babel import gettext as _
from flask_babel import lazy_gettext as N_
from flask_babel import get_locale
from flask_login import current_user, login_required
from sqlalchemy.exc import OperationalError, IntegrityError
# from sqlite3 import OperationalError as sqliteOperationalError
from . import constants, logger, isoLanguages, gdriveutils, uploader, helper, kobo_sync_status
from . import config, get_locale, ub, db
from . import config, ub, db
from . import calibre_db
from .services.worker import WorkerThread
from .tasks.upload import TaskUpload

@ -17,6 +17,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
from flask import render_template
from werkzeug.exceptions import default_exceptions
try:

@ -680,8 +680,3 @@ def get_error_text(client_secrets=None):
return 'Callback url (redirect url) is missing in client_secrets.json'
if client_secrets:
client_secrets.update(filedata['web'])
def get_versions():
return { # 'six': six_version,
'httplib2': httplib2_version}

@ -29,11 +29,11 @@ from tempfile import gettempdir
import requests
import unidecode
from babel.dates import format_datetime
from babel.units import format_unit
from flask import send_from_directory, make_response, redirect, abort, url_for
from flask_babel import gettext as _
from flask_babel import lazy_gettext as N_
from flask_babel import format_datetime, get_locale
from flask_login import current_user
from sqlalchemy.sql.expression import true, false, and_, or_, text, func
from sqlalchemy.exc import InvalidRequestError, OperationalError
@ -54,7 +54,7 @@ except ImportError:
from . import calibre_db, cli
from .tasks.convert import TaskConvert
from . import logger, config, get_locale, db, ub, fs
from . import logger, config, db, ub, fs
from . import gdriveutils as gd
from .constants import STATIC_DIR as _STATIC_DIR, CACHE_TYPE_THUMBNAILS, THUMBNAIL_TYPE_COVER, THUMBNAIL_TYPE_SERIES
from .subproc_wrapper import process_wait
@ -970,64 +970,6 @@ def json_serial(obj):
raise TypeError("Type %s not serializable" % type(obj))
# helper function for displaying the runtime of tasks
def format_runtime(runtime):
ret_val = ""
if runtime.days:
ret_val = format_unit(runtime.days, 'duration-day', length="long", locale=get_locale()) + ', '
mins, seconds = divmod(runtime.seconds, 60)
hours, minutes = divmod(mins, 60)
# ToDo: locale.number_symbols._data['timeSeparator'] -> localize time separator ?
if hours:
ret_val += '{:d}:{:02d}:{:02d}s'.format(hours, minutes, seconds)
elif minutes:
ret_val += '{:2d}:{:02d}s'.format(minutes, seconds)
else:
ret_val += '{:2d}s'.format(seconds)
return ret_val
# helper function to apply localize status information in tasklist entries
def render_task_status(tasklist):
renderedtasklist = list()
for __, user, __, task, __ in tasklist:
if user == current_user.name or current_user.role_admin():
ret = {}
if task.start_time:
ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale())
ret['runtime'] = format_runtime(task.runtime)
# localize the task status
if isinstance(task.stat, int):
if task.stat == STAT_WAITING:
ret['status'] = _(u'Waiting')
elif task.stat == STAT_FAIL:
ret['status'] = _(u'Failed')
elif task.stat == STAT_STARTED:
ret['status'] = _(u'Started')
elif task.stat == STAT_FINISH_SUCCESS:
ret['status'] = _(u'Finished')
elif task.stat == STAT_ENDED:
ret['status'] = _(u'Ended')
elif task.stat == STAT_CANCELLED:
ret['status'] = _(u'Cancelled')
else:
ret['status'] = _(u'Unknown Status')
ret['taskMessage'] = "{}: {}".format(task.name, task.message) if task.message else task.name
ret['progress'] = "{} %".format(int(task.progress * 100))
ret['user'] = escape(user) # prevent xss
# Hidden fields
ret['task_id'] = task.id
ret['stat'] = task.stat
ret['is_cancellable'] = task.is_cancellable
renderedtasklist.append(ret)
return renderedtasklist
def tags_filters():
negtags_list = current_user.list_denied_tags()
postags_list = current_user.list_allowed_tags()

@ -49,7 +49,7 @@ except ImportError:
def get_language_names(locale):
return _LANGUAGE_NAMES.get(locale)
return _LANGUAGE_NAMES.get(str(locale))
def get_language_name(locale, lang_code):

@ -24,6 +24,7 @@ from .shelf import shelf
from .remotelogin import remotelogin
from .search_metadata import meta
from .error_handler import init_errorhandler
from .tasks_status import tasks
try:
from kobo import kobo, get_kobo_activated
@ -48,16 +49,19 @@ def main():
from .gdrive import gdrive
from .editbooks import editbook
from .about import about
from .search import search
from . import web_server
init_errorhandler()
app.register_blueprint(search)
app.register_blueprint(tasks)
app.register_blueprint(web)
app.register_blueprint(opds)
app.register_blueprint(jinjia)
app.register_blueprint(about)
app.register_blueprint(shelf)
app.register_blueprint(admi) #
app.register_blueprint(admi)
app.register_blueprint(remotelogin)
app.register_blueprint(meta)
app.register_blueprint(gdrive)

@ -19,18 +19,12 @@
from flask import session
try:
from flask_dance.consumer.backend.sqla import SQLAlchemyBackend, first, _get_real_user
from flask_dance.consumer.storage.sqla import SQLAlchemyStorage as SQLAlchemyBackend
from flask_dance.consumer.storage.sqla import first, _get_real_user
from sqlalchemy.orm.exc import NoResultFound
backend_resultcode = False # prevent storing values with this resultcode
backend_resultcode = True # prevent storing values with this resultcode
except ImportError:
# fails on flask-dance >1.3, due to renaming
try:
from flask_dance.consumer.storage.sqla import SQLAlchemyStorage as SQLAlchemyBackend
from flask_dance.consumer.storage.sqla import first, _get_real_user
from sqlalchemy.orm.exc import NoResultFound
backend_resultcode = True # prevent storing values with this resultcode
except ImportError:
pass
pass
class OAuthBackend(SQLAlchemyBackend):

@ -26,15 +26,18 @@ from functools import wraps
from flask import Blueprint, request, render_template, Response, g, make_response, abort
from flask_login import current_user
from flask_babel import get_locale
from sqlalchemy.sql.expression import func, text, or_, and_, true
from sqlalchemy.exc import InvalidRequestError, OperationalError
from werkzeug.security import check_password_hash
from . import constants, logger, config, db, calibre_db, ub, services, get_locale, isoLanguages
from . import constants, logger, config, db, calibre_db, ub, services, isoLanguages
from .helper import get_download_link, get_book_cover
from .pagination import Pagination
from .web import render_read_books
from .usermanagement import load_user_from_request
from flask_babel import gettext as _
opds = Blueprint('opds', __name__)
log = logger.create()

@ -29,7 +29,6 @@
from urllib.parse import urlparse, urljoin
from flask import request, url_for, redirect

@ -16,9 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import render_template, request
from flask import render_template, g, abort, request
from flask_babel import gettext as _
from flask import g, abort
from werkzeug.local import LocalProxy
from flask_login import current_user

@ -0,0 +1,422 @@
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2022 OzzieIsaacs
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from datetime import datetime
from flask import Blueprint, request, redirect, url_for, flash
from flask import session as flask_session
from flask_login import current_user
from flask_babel import get_locale, format_date
from flask_babel import gettext as _
from sqlalchemy.sql.expression import func, not_, and_, or_, text
from sqlalchemy.sql.functions import coalesce
from . import logger, db, calibre_db, config, ub
from .usermanagement import login_required_if_no_ano
from .render_template import render_title_template
from .pagination import Pagination
search = Blueprint('search', __name__)
log = logger.create()
@search.route("/search", methods=["GET"])
@login_required_if_no_ano
def simple_search():
term = request.args.get("query")
if term:
return redirect(url_for('web.books_list', data="search", sort_param='stored', query=term.strip()))
else:
return render_title_template('search.html',
searchterm="",
result_count=0,
title=_(u"Search"),
page="search")
@search.route("/advsearch", methods=['POST'])
@login_required_if_no_ano
def advanced_search():
values = dict(request.form)
params = ['include_tag', 'exclude_tag', 'include_serie', 'exclude_serie', 'include_shelf', 'exclude_shelf',
'include_language', 'exclude_language', 'include_extension', 'exclude_extension']
for param in params:
values[param] = list(request.form.getlist(param))
flask_session['query'] = json.dumps(values)
return redirect(url_for('web.books_list', data="advsearch", sort_param='stored', query=""))
@search.route("/advsearch", methods=['GET'])
@login_required_if_no_ano
def advanced_search_form():
# Build custom columns names
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
return render_prepare_search_form(cc)
def adv_search_custom_columns(cc, term, q):
for c in cc:
if c.datatype == "datetime":
custom_start = term.get('custom_column_' + str(c.id) + '_start')
custom_end = term.get('custom_column_' + str(c.id) + '_end')
if custom_start:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.datetime(db.cc_classes[c.id].value) >= func.datetime(custom_start)))
if custom_end:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.datetime(db.cc_classes[c.id].value) <= func.datetime(custom_end)))
else:
custom_query = term.get('custom_column_' + str(c.id))
if custom_query != '' and custom_query is not None:
if c.datatype == 'bool':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == (custom_query == "True")))
elif c.datatype == 'int' or c.datatype == 'float':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == custom_query))
elif c.datatype == 'rating':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == int(float(custom_query) * 2)))
else:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.lower(db.cc_classes[c.id].value).ilike("%" + custom_query + "%")))
return q
def adv_search_language(q, include_languages_inputs, exclude_languages_inputs):
if current_user.filter_language() != "all":
q = q.filter(db.Books.languages.any(db.Languages.lang_code == current_user.filter_language()))
else:
for language in include_languages_inputs:
q = q.filter(db.Books.languages.any(db.Languages.id == language))
for language in exclude_languages_inputs:
q = q.filter(not_(db.Books.series.any(db.Languages.id == language)))
return q
def adv_search_ratings(q, rating_high, rating_low):
if rating_high:
rating_high = int(rating_high) * 2
q = q.filter(db.Books.ratings.any(db.Ratings.rating <= rating_high))
if rating_low:
rating_low = int(rating_low) * 2
q = q.filter(db.Books.ratings.any(db.Ratings.rating >= rating_low))
return q
def adv_search_read_status(q, read_status):
if read_status:
if config.config_read_column:
try:
if read_status == "True":
q = q.join(db.cc_classes[config.config_read_column], isouter=True) \
.filter(db.cc_classes[config.config_read_column].value == True)
else:
q = q.join(db.cc_classes[config.config_read_column], isouter=True) \
.filter(coalesce(db.cc_classes[config.config_read_column].value, False) != True)
except (KeyError, AttributeError):
log.error(u"Custom Column No.%d is not existing in calibre database", config.config_read_column)
flash(_("Custom Column No.%(column)d is not existing in calibre database",
column=config.config_read_column),
category="error")
return q
else:
if read_status == "True":
q = q.join(ub.ReadBook, db.Books.id == ub.ReadBook.book_id, isouter=True) \
.filter(ub.ReadBook.user_id == int(current_user.id),
ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
else:
q = q.join(ub.ReadBook, db.Books.id == ub.ReadBook.book_id, isouter=True) \
.filter(ub.ReadBook.user_id == int(current_user.id),
coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED)
return q
def adv_search_extension(q, include_extension_inputs, exclude_extension_inputs):
for extension in include_extension_inputs:
q = q.filter(db.Books.data.any(db.Data.format == extension))
for extension in exclude_extension_inputs:
q = q.filter(not_(db.Books.data.any(db.Data.format == extension)))
return q
def adv_search_tag(q, include_tag_inputs, exclude_tag_inputs):
for tag in include_tag_inputs:
q = q.filter(db.Books.tags.any(db.Tags.id == tag))
for tag in exclude_tag_inputs:
q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag)))
return q
def adv_search_serie(q, include_series_inputs, exclude_series_inputs):
for serie in include_series_inputs:
q = q.filter(db.Books.series.any(db.Series.id == serie))
for serie in exclude_series_inputs:
q = q.filter(not_(db.Books.series.any(db.Series.id == serie)))
return q
def adv_search_shelf(q, include_shelf_inputs, exclude_shelf_inputs):
q = q.outerjoin(ub.BookShelf, db.Books.id == ub.BookShelf.book_id)\
.filter(or_(ub.BookShelf.shelf == None, ub.BookShelf.shelf.notin_(exclude_shelf_inputs)))
if len(include_shelf_inputs) > 0:
q = q.filter(ub.BookShelf.shelf.in_(include_shelf_inputs))
return q
def extend_search_term(searchterm,
author_name,
book_title,
publisher,
pub_start,
pub_end,
tags,
rating_high,
rating_low,
read_status,
):
searchterm.extend((author_name.replace('|', ','), book_title, publisher))
if pub_start:
try:
searchterm.extend([_(u"Published after ") +
format_date(datetime.strptime(pub_start, "%Y-%m-%d"),
format='medium', locale=get_locale())])
except ValueError:
pub_start = u""
if pub_end:
try:
searchterm.extend([_(u"Published before ") +
format_date(datetime.strptime(pub_end, "%Y-%m-%d"),
format='medium', locale=get_locale())])
except ValueError:
pub_end = u""
elements = {'tag': db.Tags, 'serie':db.Series, 'shelf':ub.Shelf}
for key, db_element in elements.items():
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['include_' + key])).all()
searchterm.extend(tag.name for tag in tag_names)
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['exclude_' + key])).all()
searchterm.extend(tag.name for tag in tag_names)
language_names = calibre_db.session.query(db.Languages). \
filter(db.Languages.id.in_(tags['include_language'])).all()
if language_names:
language_names = calibre_db.speaking_language(language_names)
searchterm.extend(language.name for language in language_names)
language_names = calibre_db.session.query(db.Languages). \
filter(db.Languages.id.in_(tags['exclude_language'])).all()
if language_names:
language_names = calibre_db.speaking_language(language_names)
searchterm.extend(language.name for language in language_names)
if rating_high:
searchterm.extend([_(u"Rating <= %(rating)s", rating=rating_high)])
if rating_low:
searchterm.extend([_(u"Rating >= %(rating)s", rating=rating_low)])
if read_status:
searchterm.extend([_(u"Read Status = %(status)s", status=read_status)])
searchterm.extend(ext for ext in tags['include_extension'])
searchterm.extend(ext for ext in tags['exclude_extension'])
# handle custom columns
searchterm = " + ".join(filter(None, searchterm))
return searchterm, pub_start, pub_end
def render_adv_search_results(term, offset=None, order=None, limit=None):
sort = order[0] if order else [db.Books.sort]
pagination = None
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
if not config.config_read_column:
query = (calibre_db.session.query(db.Books, ub.ArchivedBook.is_archived, ub.ReadBook).select_from(db.Books)
.outerjoin(ub.ReadBook, and_(db.Books.id == ub.ReadBook.book_id,
int(current_user.id) == ub.ReadBook.user_id)))
else:
try:
read_column = cc[config.config_read_column]
query = (calibre_db.session.query(db.Books, ub.ArchivedBook.is_archived, read_column.value)
.select_from(db.Books)
.outerjoin(read_column, read_column.book == db.Books.id))
except (KeyError, AttributeError):
log.error("Custom Column No.%d is not existing in calibre database", config.config_read_column)
# Skip linking read column
query = calibre_db.session.query(db.Books, ub.ArchivedBook.is_archived, None)
query = query.outerjoin(ub.ArchivedBook, and_(db.Books.id == ub.ArchivedBook.book_id,
int(current_user.id) == ub.ArchivedBook.user_id))
q = query.outerjoin(db.books_series_link, db.Books.id == db.books_series_link.c.book)\
.outerjoin(db.Series)\
.filter(calibre_db.common_filters(True))
# parse multi selects to a complete dict
tags = dict()
elements = ['tag', 'serie', 'shelf', 'language', 'extension']
for element in elements:
tags['include_' + element] = term.get('include_' + element)
tags['exclude_' + element] = term.get('exclude_' + element)
author_name = term.get("author_name")
book_title = term.get("book_title")
publisher = term.get("publisher")
pub_start = term.get("publishstart")
pub_end = term.get("publishend")
rating_low = term.get("ratinghigh")
rating_high = term.get("ratinglow")
description = term.get("comment")
read_status = term.get("read_status")
if author_name:
author_name = author_name.strip().lower().replace(',', '|')
if book_title:
book_title = book_title.strip().lower()
if publisher:
publisher = publisher.strip().lower()
search_term = []
cc_present = False
for c in cc:
if c.datatype == "datetime":
column_start = term.get('custom_column_' + str(c.id) + '_start')
column_end = term.get('custom_column_' + str(c.id) + '_end')
if column_start:
search_term.extend([u"{} >= {}".format(c.name,
format_date(datetime.strptime(column_start, "%Y-%m-%d").date(),
format='medium',
locale=get_locale())
)])
cc_present = True
if column_end:
search_term.extend([u"{} <= {}".format(c.name,
format_date(datetime.strptime(column_end, "%Y-%m-%d").date(),
format='medium',
locale=get_locale())
)])
cc_present = True
elif term.get('custom_column_' + str(c.id)):
search_term.extend([(u"{}: {}".format(c.name, term.get('custom_column_' + str(c.id))))])
cc_present = True
if any(tags.values()) or author_name or book_title or publisher or pub_start or pub_end or rating_low \
or rating_high or description or cc_present or read_status:
search_term, pub_start, pub_end = extend_search_term(search_term,
author_name,
book_title,
publisher,
pub_start,
pub_end,
tags,
rating_high,
rating_low,
read_status)
if author_name:
q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_name + "%")))
if book_title:
q = q.filter(func.lower(db.Books.title).ilike("%" + book_title + "%"))
if pub_start:
q = q.filter(func.datetime(db.Books.pubdate) > func.datetime(pub_start))
if pub_end:
q = q.filter(func.datetime(db.Books.pubdate) < func.datetime(pub_end))
q = adv_search_read_status(q, read_status)
if publisher:
q = q.filter(db.Books.publishers.any(func.lower(db.Publishers.name).ilike("%" + publisher + "%")))
q = adv_search_tag(q, tags['include_tag'], tags['exclude_tag'])
q = adv_search_serie(q, tags['include_serie'], tags['exclude_serie'])
q = adv_search_shelf(q, tags['include_shelf'], tags['exclude_shelf'])
q = adv_search_extension(q, tags['include_extension'], tags['exclude_extension'])
q = adv_search_language(q, tags['include_language'], tags['exclude_language'])
q = adv_search_ratings(q, rating_high, rating_low)
if description:
q = q.filter(db.Books.comments.any(func.lower(db.Comments.text).ilike("%" + description + "%")))
# search custom columns
try:
q = adv_search_custom_columns(cc, term, q)
except AttributeError as ex:
log.debug_or_exception(ex)
flash(_("Error on search for custom columns, please restart Calibre-Web"), category="error")
q = q.order_by(*sort).all()
flask_session['query'] = json.dumps(term)
ub.store_combo_ids(q)
result_count = len(q)
if offset is not None and limit is not None:
offset = int(offset)
limit_all = offset + int(limit)
pagination = Pagination((offset / (int(limit)) + 1), limit, result_count)
else:
offset = 0
limit_all = result_count
entries = calibre_db.order_authors(q[offset:limit_all], list_return=True, combined=True)
return render_title_template('search.html',
adv_searchterm=search_term,
pagination=pagination,
entries=entries,
result_count=result_count,
title=_(u"Advanced Search"), page="advsearch",
order=order[1])
def render_prepare_search_form(cc):
# prepare data for search-form
tags = calibre_db.session.query(db.Tags)\
.join(db.books_tags_link)\
.join(db.Books)\
.filter(calibre_db.common_filters()) \
.group_by(text('books_tags_link.tag'))\
.order_by(db.Tags.name).all()
series = calibre_db.session.query(db.Series)\
.join(db.books_series_link)\
.join(db.Books)\
.filter(calibre_db.common_filters()) \
.group_by(text('books_series_link.series'))\
.order_by(db.Series.name)\
.filter(calibre_db.common_filters()).all()
shelves = ub.session.query(ub.Shelf)\
.filter(or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == int(current_user.id)))\
.order_by(ub.Shelf.name).all()
extensions = calibre_db.session.query(db.Data)\
.join(db.Books)\
.filter(calibre_db.common_filters()) \
.group_by(db.Data.format)\
.order_by(db.Data.format).all()
if current_user.filter_language() == u"all":
languages = calibre_db.speaking_language()
else:
languages = None
return render_title_template('search_form.html', tags=tags, languages=languages, extensions=extensions,
series=series,shelves=shelves, title=_(u"Advanced Search"), cc=cc, page="advsearch")
def render_search_results(term, offset=None, order=None, limit=None):
join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series
entries, result_count, pagination = calibre_db.get_search_results(term,
config,
offset,
order,
limit,
False,
*join)
return render_title_template('search.html',
searchterm=term,
pagination=pagination,
query=term,
adv_searchterm=term,
entries=entries,
result_count=result_count,
title=_(u"Search"),
page="search",
order=order[1])

@ -22,17 +22,17 @@ import inspect
import json
import os
import sys
# from time import time
from dataclasses import asdict
from flask import Blueprint, Response, request, url_for
from flask_login import current_user
from flask_login import login_required
from flask_babel import get_locale
from sqlalchemy.exc import InvalidRequestError, OperationalError
from sqlalchemy.orm.attributes import flag_modified
from cps.services.Metadata import Metadata
from . import constants, get_locale, logger, ub, web_server
from . import constants, logger, ub, web_server
# current_milli_time = lambda: int(round(time() * 1000))

@ -55,7 +55,8 @@ class TaskConvert(CalibreTask):
def run(self, worker_thread):
self.worker_thread = worker_thread
if config.config_use_google_drive:
worker_db = db.CalibreDB(expire_on_commit=False)
worker_db = db.CalibreDB()
worker_db.init_db(expire_on_commit=False)
cur_book = worker_db.get_book(self.book_id)
self.title = cur_book.title
data = worker_db.get_book_format(self.book_id, self.settings['old_book_format'])
@ -104,7 +105,8 @@ class TaskConvert(CalibreTask):
def _convert_ebook_format(self):
error_message = None
local_db = db.CalibreDB(expire_on_commit=False)
local_db = db.CalibreDB()
local_db.init_db(expire_on_commit=False)
file_path = self.file_path
book_id = self.book_id
format_old_ext = u'.' + self.settings['old_book_format'].lower()

@ -68,7 +68,8 @@ class TaskGenerateCoverThumbnails(CalibreTask):
self.log = logger.create()
self.book_id = book_id
self.app_db_session = ub.get_new_session_instance()
self.calibre_db = db.CalibreDB(expire_on_commit=False)
self.calibre_db = db.CalibreDB()
self.calibre_db.init_db(expire_on_commit=False)
self.cache = fs.FileSystem()
self.resolutions = [
constants.COVER_THUMBNAIL_SMALL,
@ -238,7 +239,8 @@ class TaskGenerateSeriesThumbnails(CalibreTask):
super(TaskGenerateSeriesThumbnails, self).__init__(task_message)
self.log = logger.create()
self.app_db_session = ub.get_new_session_instance()
self.calibre_db = db.CalibreDB(expire_on_commit=False)
self.calibre_db = db.CalibreDB()
self.calibre_db.init_db(expire_on_commit=False)
self.cache = fs.FileSystem()
self.resolutions = [
constants.COVER_THUMBNAIL_SMALL,
@ -448,7 +450,8 @@ class TaskClearCoverThumbnailCache(CalibreTask):
super(TaskClearCoverThumbnailCache, self).__init__(task_message)
self.log = logger.create()
self.book_id = book_id
self.calibre_db = db.CalibreDB(expire_on_commit=False)
self.calibre_db = db.CalibreDB()
self.calibre_db.init_db(expire_on_commit=False)
self.app_db_session = ub.get_new_session_instance()
self.cache = fs.FileSystem()

@ -0,0 +1,95 @@
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2022 OzzieIsaacs
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from markupsafe import escape
from flask import Blueprint, jsonify
from flask_login import login_required, current_user
from flask_babel import gettext as _
from flask_babel import get_locale, format_datetime
from babel.units import format_unit
from . import logger
from .render_template import render_title_template
from .services.worker import WorkerThread, STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS
tasks = Blueprint('tasks', __name__)
log = logger.create()
@tasks.route("/ajax/emailstat")
@login_required
def get_email_status_json():
tasks = WorkerThread.getInstance().tasks
return jsonify(render_task_status(tasks))
@tasks.route("/tasks")
@login_required
def get_tasks_status():
# if current user admin, show all email, otherwise only own emails
tasks = WorkerThread.getInstance().tasks
answer = render_task_status(tasks)
return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks")
# helper function to apply localize status information in tasklist entries
def render_task_status(tasklist):
rendered_tasklist = list()
for __, user, __, task in tasklist:
if user == current_user.name or current_user.role_admin():
ret = {}
if task.start_time:
ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale())
ret['runtime'] = format_runtime(task.runtime)
# localize the task status
if isinstance(task.stat, int):
if task.stat == STAT_WAITING:
ret['status'] = _(u'Waiting')
elif task.stat == STAT_FAIL:
ret['status'] = _(u'Failed')
elif task.stat == STAT_STARTED:
ret['status'] = _(u'Started')
elif task.stat == STAT_FINISH_SUCCESS:
ret['status'] = _(u'Finished')
else:
ret['status'] = _(u'Unknown Status')
ret['taskMessage'] = "{}: {}".format(_(task.name), task.message)
ret['progress'] = "{} %".format(int(task.progress * 100))
ret['user'] = escape(user) # prevent xss
rendered_tasklist.append(ret)
return rendered_tasklist
# helper function for displaying the runtime of tasks
def format_runtime(runtime):
ret_val = ""
if runtime.days:
ret_val = format_unit(runtime.days, 'duration-day', length="long", locale=get_locale()) + ', '
minutes, seconds = divmod(runtime.seconds, 60)
hours, minutes = divmod(minutes, 60)
# ToDo: locale.number_symbols._data['timeSeparator'] -> localize time separator ?
if hours:
ret_val += '{:d}:{:02d}:{:02d}s'.format(hours, minutes, seconds)
elif minutes:
ret_val += '{:2d}:{:02d}s'.format(minutes, seconds)
else:
ret_val += '{:2d}s'.format(seconds)
return ret_val

@ -41,7 +41,7 @@
<a class="navbar-brand" href="{{url_for('web.index')}}">{{instance}}</a>
</div>
{% if g.user.is_authenticated or g.allow_anonymous %}
<form class="navbar-form navbar-left" role="search" action="{{url_for('web.search')}}" method="GET">
<form class="navbar-form navbar-left" role="search" action="{{url_for('search.simple_search')}}" method="GET">
<div class="form-group input-group input-group-sm">
<label for="query" class="sr-only">{{_('Search')}}</label>
<input type="text" class="form-control" id="query" name="query" placeholder="{{_('Search Library')}}" value="{{searchterm}}">
@ -54,7 +54,7 @@
<div class="navbar-collapse collapse">
{% if g.user.is_authenticated or g.allow_anonymous %}
<ul class="nav navbar-nav ">
<li><a href="{{url_for('web.advanced_search')}}" id="advanced_search"><span class="glyphicon glyphicon-search"></span><span class="hidden-sm"> {{_('Advanced Search')}}</span></a></li>
<li><a href="{{url_for('search.advanced_search')}}" id="advanced_search"><span class="glyphicon glyphicon-search"></span><span class="hidden-sm"> {{_('Advanced Search')}}</span></a></li>
</ul>
{% endif %}
<ul class="nav navbar-nav navbar-right" id="main-nav">
@ -71,7 +71,7 @@
</li>
{% endif %}
{% if not g.user.is_anonymous and not simple%}
<li><a id="top_tasks" href="{{url_for('web.get_tasks_status')}}"><span class="glyphicon glyphicon-tasks"></span> <span class="hidden-sm">{{_('Tasks')}}</span></a></li>
<li><a id="top_tasks" href="{{url_for('tasks.get_tasks_status')}}"><span class="glyphicon glyphicon-tasks"></span> <span class="hidden-sm">{{_('Tasks')}}</span></a></li>
{% endif %}
{% if g.user.role_admin() %}
<li><a id="top_admin" data-text="{{_('Settings')}}" href="{{url_for('admin.admin')}}"><span class="glyphicon glyphicon-dashboard"></span> <span class="hidden-sm">{{_('Admin')}}</span></a></li>

@ -2,7 +2,7 @@
{% block body %}
<h1 class="{{page}}">{{title}}</h1>
<div class="col-md-10 col-lg-6">
<form role="form" id="search" action="{{ url_for('web.advanced_search_form') }}" method="POST">
<form role="form" id="search" action="{{ url_for('search.advanced_search_form') }}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="form-group">
<label for="book_title">{{_('Book Title')}}</label>

@ -5,7 +5,7 @@
{% block body %}
<div class="discover">
<h2>{{_('Tasks')}}</h2>
<table class="table table-no-bordered" id="tasktable" data-url="{{ url_for('web.get_email_status_json') }}" data-sort-name="starttime" data-sort-order="asc" data-locale="{{ g.user.locale }}">
<table class="table table-no-bordered" id="tasktable" data-url="{{ url_for('task.get_email_status_json') }}" data-sort-name="starttime" data-sort-order="asc" data-locale="{{ g.user.locale }}">
<thead>
<tr>
{% if g.user.role_admin() %}

@ -857,7 +857,7 @@ def init_db(app_db_path, user_credentials=None):
def get_new_session_instance():
new_engine = create_engine(u'sqlite:///{0}'.format(cli.settings_path), echo=False)
new_engine = create_engine(u'sqlite:///{0}'.format(app_DB_path), echo=False)
new_session = scoped_session(sessionmaker())
new_session.configure(bind=new_engine)

@ -28,7 +28,7 @@ from io import BytesIO
from tempfile import gettempdir
import requests
from babel.dates import format_datetime
from flask_babel import format_datetime
from flask_babel import gettext as _
from . import constants, logger # config, web_server

@ -27,12 +27,6 @@ from .helper import split_authors
log = logger.create()
try:
from lxml.etree import LXML_VERSION as lxmlversion
except ImportError:
lxmlversion = None
try:
from wand.image import Image, Color
from wand import version as ImageVersion
@ -101,7 +95,7 @@ def default_meta(tmp_file_path, original_file_name, original_file_extension):
extension=original_file_extension,
title=original_file_name,
author=_(u'Unknown'),
cover=None, #pdf_preview(tmp_file_path, original_file_name),
cover=None,
description="",
tags="",
series="",
@ -237,29 +231,12 @@ def pdf_preview(tmp_file_path, tmp_dir):
return None
def get_versions(all=True):
def get_versions():
ret = dict()
if not use_generic_pdf_cover:
ret['Image Magick'] = ImageVersion.MAGICK_VERSION
else:
ret['Image Magick'] = u'not installed'
if all:
if not use_generic_pdf_cover:
ret['Wand'] = ImageVersion.VERSION
else:
ret['Wand'] = u'not installed'
if use_pdf_meta:
ret['PyPdf'] = PyPdfVersion
else:
ret['PyPdf'] = u'not installed'
if lxmlversion:
ret['lxml'] = '.'.join(map(str, lxmlversion))
else:
ret['lxml'] = u'not installed'
if comic.use_comic_meta:
ret['Comic_API'] = comic.comic_version or u'installed'
else:
ret['Comic_API'] = u'not installed'
return ret

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2018-2019 OzzieIsaacs, cervinko, jkrehm, bodybybuddha, ok11,
# andy29485, idalin, Kyosfonica, wuqi, Kennyl, lemmsh,
@ -21,35 +19,32 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from datetime import datetime
import json
import mimetypes
import chardet # dependency of requests
import copy
from functools import wraps
from babel.dates import format_date
from babel import Locale
from flask import Blueprint, jsonify
from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for
from flask import session as flask_session
from flask_babel import gettext as _
from flask_babel import get_locale
from flask_login import login_user, logout_user, login_required, current_user
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
from sqlalchemy.sql.expression import text, func, false, not_, and_, or_
from sqlalchemy.sql.expression import text, func, false, not_, and_
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.sql.functions import coalesce
from .services.worker import WorkerThread
from werkzeug.datastructures import Headers
from werkzeug.security import generate_password_hash, check_password_hash
from . import constants, logger, isoLanguages, services
from . import babel, db, ub, config, get_locale, app
from . import babel, db, ub, config, app
from . import calibre_db, kobo_sync_status
from .search import render_search_results, render_adv_search_results
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
from .helper import check_valid_domain, render_task_status, check_email, check_username, \
from .helper import check_valid_domain, check_email, check_username, \
get_book_cover, get_series_cover_thumbnail, get_download_link, send_mail, generate_random_password, \
send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password, valid_email, \
edit_book_read_status
@ -75,10 +70,12 @@ except ImportError:
oauth_check = {}
register_user_with_oauth = logout_oauth_user = get_oauth_status = None
try:
from natsort import natsorted as sort
except ImportError:
sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files
from functools import wraps
#try:
# from natsort import natsorted as sort
#except ImportError:
# sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files
@app.after_request
@ -102,6 +99,7 @@ def add_security_headers(resp):
web = Blueprint('web', __name__)
log = logger.create()
@ -770,57 +768,6 @@ def render_archived_books(page, sort_param):
title=name, page=page_name, order=sort_param[1])
def render_prepare_search_form(cc):
# prepare data for search-form
tags = calibre_db.session.query(db.Tags) \
.join(db.books_tags_link) \
.join(db.Books) \
.filter(calibre_db.common_filters()) \
.group_by(text('books_tags_link.tag')) \
.order_by(db.Tags.name).all()
series = calibre_db.session.query(db.Series) \
.join(db.books_series_link) \
.join(db.Books) \
.filter(calibre_db.common_filters()) \
.group_by(text('books_series_link.series')) \
.order_by(db.Series.name) \
.filter(calibre_db.common_filters()).all()
shelves = ub.session.query(ub.Shelf) \
.filter(or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == int(current_user.id))) \
.order_by(ub.Shelf.name).all()
extensions = calibre_db.session.query(db.Data) \
.join(db.Books) \
.filter(calibre_db.common_filters()) \
.group_by(db.Data.format) \
.order_by(db.Data.format).all()
if current_user.filter_language() == u"all":
languages = calibre_db.speaking_language()
else:
languages = None
return render_title_template('search_form.html', tags=tags, languages=languages, extensions=extensions,
series=series, shelves=shelves, title=_(u"Advanced Search"), cc=cc, page="advsearch")
def render_search_results(term, offset=None, order=None, limit=None):
join = db.books_series_link, db.books_series_link.c.book == db.Books.id, db.Series
entries, result_count, pagination = calibre_db.get_search_results(term,
config,
offset,
order,
limit,
*join)
return render_title_template('search.html',
searchterm=term,
pagination=pagination,
query=term,
adv_searchterm=term,
entries=entries,
result_count=result_count,
title=_(u"Search"),
page="search",
order=order[1])
# ################################### View Books list ##################################################################
@ -1153,321 +1100,6 @@ def category_list():
abort(404)
# ################################### Task functions ################################################################
@web.route("/tasks")
@login_required
def get_tasks_status():
# if current user admin, show all email, otherwise only own emails
tasks = WorkerThread.get_instance().tasks
answer = render_task_status(tasks)
return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks")
# ################################### Search functions ################################################################
@web.route("/search", methods=["GET"])
@login_required_if_no_ano
def search():
term = request.args.get("query")
if term:
return redirect(url_for('web.books_list', data="search", sort_param='stored', query=term.strip()))
else:
return render_title_template('search.html',
searchterm="",
result_count=0,
title=_(u"Search"),
page="search")
@web.route("/advsearch", methods=['POST'])
@login_required_if_no_ano
def advanced_search():
values = dict(request.form)
params = ['include_tag', 'exclude_tag', 'include_serie', 'exclude_serie', 'include_shelf', 'exclude_shelf',
'include_language', 'exclude_language', 'include_extension', 'exclude_extension']
for param in params:
values[param] = list(request.form.getlist(param))
flask_session['query'] = json.dumps(values)
return redirect(url_for('web.books_list', data="advsearch", sort_param='stored', query=""))
def adv_search_custom_columns(cc, term, q):
for c in cc:
if c.datatype == "datetime":
custom_start = term.get('custom_column_' + str(c.id) + '_start')
custom_end = term.get('custom_column_' + str(c.id) + '_end')
if custom_start:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.datetime(db.cc_classes[c.id].value) >= func.datetime(custom_start)))
if custom_end:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.datetime(db.cc_classes[c.id].value) <= func.datetime(custom_end)))
else:
custom_query = term.get('custom_column_' + str(c.id))
if custom_query != '' and custom_query is not None:
if c.datatype == 'bool':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == (custom_query == "True")))
elif c.datatype == 'int' or c.datatype == 'float':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == custom_query))
elif c.datatype == 'rating':
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
db.cc_classes[c.id].value == int(float(custom_query) * 2)))
else:
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
func.lower(db.cc_classes[c.id].value).ilike("%" + custom_query + "%")))
return q
def adv_search_read_status(q, read_status):
if read_status:
if config.config_read_column:
try:
if read_status == "True":
q = q.join(db.cc_classes[config.config_read_column], isouter=True) \
.filter(db.cc_classes[config.config_read_column].value == True)
else:
q = q.join(db.cc_classes[config.config_read_column], isouter=True) \
.filter(coalesce(db.cc_classes[config.config_read_column].value, False) != True)
except (KeyError, AttributeError, IndexError):
log.error(
"Custom Column No.{} is not existing in calibre database".format(config.config_read_column))
flash(_("Custom Column No.%(column)d is not existing in calibre database",
column=config.config_read_column),
category="error")
return q
else:
if read_status == "True":
q = q.join(ub.ReadBook, db.Books.id == ub.ReadBook.book_id, isouter=True) \
.filter(ub.ReadBook.user_id == int(current_user.id),
ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
else:
q = q.join(ub.ReadBook, db.Books.id == ub.ReadBook.book_id, isouter=True) \
.filter(ub.ReadBook.user_id == int(current_user.id),
coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED)
return q
def adv_search_language(q, include_languages_inputs, exclude_languages_inputs):
if current_user.filter_language() != "all":
q = q.filter(db.Books.languages.any(db.Languages.lang_code == current_user.filter_language()))
else:
return adv_search_text(q, include_languages_inputs, exclude_languages_inputs, db.Languages.id)
return q
def adv_search_ratings(q, rating_high, rating_low):
if rating_high:
rating_high = int(rating_high) * 2
q = q.filter(db.Books.ratings.any(db.Ratings.rating <= rating_high))
if rating_low:
rating_low = int(rating_low) * 2
q = q.filter(db.Books.ratings.any(db.Ratings.rating >= rating_low))
return q
def adv_search_text(q, include_inputs, exclude_inputs, data_table):
for inp in include_inputs:
q = q.filter(getattr(db.Books, data_table.class_.__tablename__).any(data_table == inp))
for excl in exclude_inputs:
q = q.filter(not_(getattr(db.Books, data_table.class_.__tablename__).any(data_table == excl)))
return q
def adv_search_shelf(q, include_shelf_inputs, exclude_shelf_inputs):
q = q.outerjoin(ub.BookShelf, db.Books.id == ub.BookShelf.book_id) \
.filter(or_(ub.BookShelf.shelf == None, ub.BookShelf.shelf.notin_(exclude_shelf_inputs)))
if len(include_shelf_inputs) > 0:
q = q.filter(ub.BookShelf.shelf.in_(include_shelf_inputs))
return q
def extend_search_term(searchterm,
author_name,
book_title,
publisher,
pub_start,
pub_end,
tags,
rating_high,
rating_low,
read_status,
):
searchterm.extend((author_name.replace('|', ','), book_title, publisher))
if pub_start:
try:
searchterm.extend([_(u"Published after ") +
format_date(datetime.strptime(pub_start, "%Y-%m-%d"),
format='medium', locale=get_locale())])
except ValueError:
pub_start = u""
if pub_end:
try:
searchterm.extend([_(u"Published before ") +
format_date(datetime.strptime(pub_end, "%Y-%m-%d"),
format='medium', locale=get_locale())])
except ValueError:
pub_end = u""
elements = {'tag': db.Tags, 'serie': db.Series, 'shelf': ub.Shelf}
for key, db_element in elements.items():
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['include_' + key])).all()
searchterm.extend(tag.name for tag in tag_names)
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['exclude_' + key])).all()
searchterm.extend(tag.name for tag in tag_names)
language_names = calibre_db.session.query(db.Languages). \
filter(db.Languages.id.in_(tags['include_language'])).all()
if language_names:
language_names = calibre_db.speaking_language(language_names)
searchterm.extend(language.name for language in language_names)
language_names = calibre_db.session.query(db.Languages). \
filter(db.Languages.id.in_(tags['exclude_language'])).all()
if language_names:
language_names = calibre_db.speaking_language(language_names)
searchterm.extend(language.name for language in language_names)
if rating_high:
searchterm.extend([_(u"Rating <= %(rating)s", rating=rating_high)])
if rating_low:
searchterm.extend([_(u"Rating >= %(rating)s", rating=rating_low)])
if read_status:
searchterm.extend([_(u"Read Status = %(status)s", status=read_status)])
searchterm.extend(ext for ext in tags['include_extension'])
searchterm.extend(ext for ext in tags['exclude_extension'])
# handle custom columns
searchterm = " + ".join(filter(None, searchterm))
return searchterm, pub_start, pub_end
def render_adv_search_results(term, offset=None, order=None, limit=None):
sort_param = order[0] if order else [db.Books.sort]
pagination = None
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
q = query.outerjoin(db.books_series_link, db.books_series_link.c.book == db.Books.id) \
.outerjoin(db.Series) \
.filter(calibre_db.common_filters(True))
# parse multiselects to a complete dict
tags = dict()
elements = ['tag', 'serie', 'shelf', 'language', 'extension']
for element in elements:
tags['include_' + element] = term.get('include_' + element)
tags['exclude_' + element] = term.get('exclude_' + element)
author_name = term.get("author_name")
book_title = term.get("book_title")
publisher = term.get("publisher")
pub_start = term.get("publishstart")
pub_end = term.get("publishend")
rating_low = term.get("ratinghigh")
rating_high = term.get("ratinglow")
description = term.get("comment")
read_status = term.get("read_status")
if author_name:
author_name = author_name.strip().lower().replace(',', '|')
if book_title:
book_title = book_title.strip().lower()
if publisher:
publisher = publisher.strip().lower()
search_term = []
cc_present = False
for c in cc:
if c.datatype == "datetime":
column_start = term.get('custom_column_' + str(c.id) + '_start')
column_end = term.get('custom_column_' + str(c.id) + '_end')
if column_start:
search_term.extend([u"{} >= {}".format(c.name,
format_date(datetime.strptime(column_start, "%Y-%m-%d").date(),
format='medium',
locale=get_locale())
)])
cc_present = True
if column_end:
search_term.extend([u"{} <= {}".format(c.name,
format_date(datetime.strptime(column_end, "%Y-%m-%d").date(),
format='medium',
locale=get_locale())
)])
cc_present = True
elif term.get('custom_column_' + str(c.id)):
search_term.extend([(u"{}: {}".format(c.name, term.get('custom_column_' + str(c.id))))])
cc_present = True
if any(tags.values()) or author_name or book_title or \
publisher or pub_start or pub_end or rating_low or rating_high \
or description or cc_present or read_status:
search_term, pub_start, pub_end = extend_search_term(search_term,
author_name,
book_title,
publisher,
pub_start,
pub_end,
tags,
rating_high,
rating_low,
read_status)
# q = q.filter()
if author_name:
q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_name + "%")))
if book_title:
q = q.filter(func.lower(db.Books.title).ilike("%" + book_title + "%"))
if pub_start:
q = q.filter(func.datetime(db.Books.pubdate) > func.datetime(pub_start))
if pub_end:
q = q.filter(func.datetime(db.Books.pubdate) < func.datetime(pub_end))
q = adv_search_read_status(q, read_status)
if publisher:
q = q.filter(db.Books.publishers.any(func.lower(db.Publishers.name).ilike("%" + publisher + "%")))
q = adv_search_text(q, tags['include_tag'], tags['exclude_tag'], db.Tags.id)
q = adv_search_text(q, tags['include_serie'], tags['exclude_serie'], db.Series.id)
q = adv_search_text(q, tags['include_extension'], tags['exclude_extension'], db.Data.format)
q = adv_search_shelf(q, tags['include_shelf'], tags['exclude_shelf'])
q = adv_search_language(q, tags['include_language'], tags['exclude_language'])
q = adv_search_ratings(q, rating_high, rating_low, )
if description:
q = q.filter(db.Books.comments.any(func.lower(db.Comments.text).ilike("%" + description + "%")))
# search custom columns
try:
q = adv_search_custom_columns(cc, term, q)
except AttributeError as ex:
log.error_or_exception(ex)
flash(_("Error on search for custom columns, please restart Calibre-Web"), category="error")
q = q.order_by(*sort_param).all()
flask_session['query'] = json.dumps(term)
ub.store_combo_ids(q)
result_count = len(q)
if offset is not None and limit is not None:
offset = int(offset)
limit_all = offset + int(limit)
pagination = Pagination((offset / (int(limit)) + 1), limit, result_count)
else:
offset = 0
limit_all = result_count
entries = calibre_db.order_authors(q[offset:limit_all], list_return=True, combined=True)
return render_title_template('search.html',
adv_searchterm=search_term,
pagination=pagination,
entries=entries,
result_count=result_count,
title=_(u"Advanced Search"),
page="advsearch",
order=order[1])
@web.route("/advsearch", methods=['GET'])
@login_required_if_no_ano
def advanced_search_form():
# Build custom columns names
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
return render_prepare_search_form(cc)
# ################################### Download/Send ##################################################################
@ -1892,10 +1524,10 @@ def show_book(book_id):
entry.kindle_list = check_send_to_kindle(entry)
entry.reader_list = check_read_formats(entry)
entry.audioentries = []
entry.audio_entries = []
for media_format in entry.data:
if media_format.format.lower() in constants.EXTENSIONS_AUDIO:
entry.audioentries.append(media_format.format.lower())
entry.audio_entries.append(media_format.format.lower())
return render_title_template('detail.html',
entry=entry,

Loading…
Cancel
Save