refactored login routines

pull/2189/head^2
Ozzie Isaacs 1 year ago
parent cf9a7d538f
commit 1c3b69c710

@ -102,10 +102,12 @@ def admin_required(f):
@admi.before_app_request @admi.before_app_request
def before_request(): def before_request():
# make remember me function work # make remember me function work
if current_user.is_authenticated: #if current_user.is_authenticated:
confirm_login() # print("before request confirm request {}".format(request.path))
if not ub.check_user_session(current_user.id, flask_session.get('_id')) and 'opds' not in request.path: # confirm_login()
logout_user() #if not ub.check_user_session(current_user.id, flask_session.get('_id')) and 'opds' not in request.path:
# log.info("before logout {}".format(request.path))
# logout_user()
g.constants = constants g.constants = constants
g.user = current_user g.user = current_user
g.google_site_verification = os.getenv('GOOGLE_SITE_VERIFICATION','') g.google_site_verification = os.getenv('GOOGLE_SITE_VERIFICATION','')
@ -114,8 +116,6 @@ def before_request():
g.allow_upload = config.config_uploading g.allow_upload = config.config_uploading
g.current_theme = config.config_theme g.current_theme = config.config_theme
g.config_authors_max = config.config_authors_max g.config_authors_max = config.config_authors_max
g.shelves_access = ub.session.query(ub.Shelf).filter(
or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == current_user.id)).order_by(ub.Shelf.name).all()
if '/static/' not in request.path and not config.db_configured and \ if '/static/' not in request.path and not config.db_configured and \
request.endpoint not in ('admin.ajax_db_config', request.endpoint not in ('admin.ajax_db_config',
'admin.simulatedbchange', 'admin.simulatedbchange',

@ -22,16 +22,16 @@
import datetime import datetime
from urllib.parse import unquote_plus from urllib.parse import unquote_plus
from functools import wraps
from flask import Blueprint, request, render_template, Response, g, make_response, abort
from flask import Blueprint, request, render_template, g, make_response, abort
from flask_login import current_user from flask_login import current_user
from flask_babel import get_locale from flask_babel import get_locale
from sqlalchemy.sql.expression import func, text, or_, and_, true from sqlalchemy.sql.expression import func, text, or_, and_, true
from sqlalchemy.exc import InvalidRequestError, OperationalError from sqlalchemy.exc import InvalidRequestError, OperationalError
from werkzeug.security import check_password_hash
from . import constants, logger, config, db, calibre_db, ub, services, isoLanguages from . import logger, config, db, calibre_db, ub, isoLanguages
from .usermanagement import requires_basic_auth_if_no_ano
from .helper import get_download_link, get_book_cover from .helper import get_download_link, get_book_cover
from .pagination import Pagination from .pagination import Pagination
from .web import render_read_books from .web import render_read_books
@ -43,19 +43,6 @@ opds = Blueprint('opds', __name__)
log = logger.create() log = logger.create()
def requires_basic_auth_if_no_ano(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if config.config_anonbrowse != 1:
if not auth or auth.type != 'basic' or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
if config.config_login_type == constants.LOGIN_LDAP and services.ldap and config.config_anonbrowse != 1:
return services.ldap.basic_auth_required(f)
return decorated
@opds.route("/opds/") @opds.route("/opds/")
@opds.route("/opds") @opds.route("/opds")
@requires_basic_auth_if_no_ano @requires_basic_auth_if_no_ano
@ -478,27 +465,6 @@ def feed_search(term):
return render_xml_template('feed.xml', searchterm="") return render_xml_template('feed.xml', searchterm="")
def check_auth(username, password):
try:
username = username.encode('windows-1252')
except UnicodeEncodeError:
username = username.encode('utf-8')
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) ==
username.decode('utf-8').lower()).first()
if bool(user and check_password_hash(str(user.password), password)):
return True
else:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
log.warning('OPDS Login failed for user "%s" IP-address: %s', username.decode('utf-8'), ip_address)
return False
def authenticate():
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
def render_xml_template(*args, **kwargs): def render_xml_template(*args, **kwargs):
# ToDo: return time in current timezone similar to %z # ToDo: return time in current timezone similar to %z

@ -140,6 +140,7 @@ table .bg-dark-danger a { color: #fff; }
.container-fluid .book { .container-fluid .book {
margin-top: 20px; margin-top: 20px;
max-width: 180px;
display: flex; display: flex;
flex-direction: column; flex-direction: column;
} }

@ -364,12 +364,6 @@ $(function() {
layoutMode : "fitRows" layoutMode : "fitRows"
}); });
$(".grid").isotope({
// options
itemSelector : ".grid-item",
layoutMode : "fitColumns"
});
if ($(".load-more").length && $(".next").length) { if ($(".load-more").length && $(".next").length) {
var $loadMore = $(".load-more .row").infiniteScroll({ var $loadMore = $(".load-more .row").infiniteScroll({
debug: false, debug: false,

@ -23,9 +23,11 @@ from functools import wraps
from sqlalchemy.sql.expression import func from sqlalchemy.sql.expression import func
from werkzeug.security import check_password_hash from werkzeug.security import check_password_hash
from flask_login import login_required, login_user from flask_login import login_required, login_user
from flask import request, Response
from . import lm, ub, config, constants, services from . import lm, ub, config, constants, services, logger
log = logger.create()
def login_required_if_no_ano(func): def login_required_if_no_ano(func):
@wraps(func) @wraps(func)
@ -36,6 +38,47 @@ def login_required_if_no_ano(func):
return decorated_view return decorated_view
def requires_basic_auth_if_no_ano(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if config.config_anonbrowse != 1:
if not auth or auth.type != 'basic' or not check_auth(auth.username, auth.password):
return authenticate()
print("opds_requires_basic_auth")
user = load_user_from_auth_header(auth.username, auth.password)
if not user:
return None
login_user(user)
return f(*args, **kwargs)
if config.config_login_type == constants.LOGIN_LDAP and services.ldap and config.config_anonbrowse != 1:
return services.ldap.basic_auth_required(f)
return decorated
def check_auth(username, password):
try:
username = username.encode('windows-1252')
except UnicodeEncodeError:
username = username.encode('utf-8')
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) ==
username.decode('utf-8').lower()).first()
if bool(user and check_password_hash(str(user.password), password)):
return True
else:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
log.warning('OPDS Login failed for user "%s" IP-address: %s', username.decode('utf-8'), ip_address)
return False
def authenticate():
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
def _fetch_user_by_name(username): def _fetch_user_by_name(username):
return ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).first() return ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).first()
@ -43,11 +86,13 @@ def _fetch_user_by_name(username):
@lm.user_loader @lm.user_loader
def load_user(user_id): def load_user(user_id):
print("load_user: {}".format(user_id))
return ub.session.query(ub.User).filter(ub.User.id == int(user_id)).first() return ub.session.query(ub.User).filter(ub.User.id == int(user_id)).first()
@lm.request_loader @lm.request_loader
def load_user_from_request(request): def load_user_from_request(request):
print("load_from_request")
if config.config_allow_reverse_proxy_header_login: if config.config_allow_reverse_proxy_header_login:
rp_header_name = config.config_reverse_proxy_login_header_name rp_header_name = config.config_reverse_proxy_login_header_name
if rp_header_name: if rp_header_name:
@ -58,30 +103,33 @@ def load_user_from_request(request):
login_user(user) login_user(user)
return user return user
auth_header = request.headers.get("Authorization") #auth_header = request.headers.get("Authorization")
if auth_header: #if auth_header:
user = load_user_from_auth_header(auth_header) # user = load_user_from_auth_header(auth_header)
if user: # if user:
return user # login_user(user)
# return user
return
return None
def load_user_from_auth_header(header_val):
if header_val.startswith('Basic '): def load_user_from_auth_header(basic_username, basic_password):
header_val = header_val.replace('Basic ', '', 1) #if header_val.startswith('Basic '):
basic_username = basic_password = '' # nosec # header_val = header_val.replace('Basic ', '', 1)
try: #basic_username = basic_password = '' # nosec
header_val = base64.b64decode(header_val).decode('utf-8') #try:
# Users with colon are invalid: rfc7617 page 4 # header_val = base64.b64decode(header_val).decode('utf-8')
basic_username = header_val.split(':', 1)[0] # # Users with colon are invalid: rfc7617 page 4
basic_password = header_val.split(':', 1)[1] # basic_username = header_val.split(':', 1)[0]
except (TypeError, UnicodeDecodeError, binascii.Error): # basic_password = header_val.split(':', 1)[1]
pass #except (TypeError, UnicodeDecodeError, binascii.Error):
# pass
user = _fetch_user_by_name(basic_username) user = _fetch_user_by_name(basic_username)
if user and config.config_login_type == constants.LOGIN_LDAP and services.ldap: if user and config.config_login_type == constants.LOGIN_LDAP and services.ldap:
if services.ldap.bind_user(str(user.password), basic_password): if services.ldap.bind_user(str(user.password), basic_password):
login_user(user)
return user return user
if user and check_password_hash(str(user.password), basic_password): if user and check_password_hash(str(user.password), basic_password):
login_user(user)
return user return user
return return None

@ -24,7 +24,7 @@ import mimetypes
import chardet # dependency of requests import chardet # dependency of requests
import copy import copy
from flask import Blueprint, jsonify from flask import Blueprint, jsonify, g
from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for
from flask import session as flask_session from flask import session as flask_session
from flask_babel import gettext as _ from flask_babel import gettext as _
@ -79,7 +79,7 @@ except ImportError:
@app.after_request @app.after_request
def add_security_headers(resp): def add_security_headers_and_shelves(resp):
csp = "default-src 'self'" csp = "default-src 'self'"
csp += ''.join([' ' + host for host in config.config_trustedhosts.strip().split(',')]) csp += ''.join([' ' + host for host in config.config_trustedhosts.strip().split(',')])
csp += " 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; img-src 'self'" csp += " 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; img-src 'self'"
@ -98,6 +98,9 @@ def add_security_headers(resp):
resp.headers['X-Frame-Options'] = 'SAMEORIGIN' resp.headers['X-Frame-Options'] = 'SAMEORIGIN'
resp.headers['X-XSS-Protection'] = '1; mode=block' resp.headers['X-XSS-Protection'] = '1; mode=block'
resp.headers['Strict-Transport-Security'] = 'max-age=31536000;' resp.headers['Strict-Transport-Security'] = 'max-age=31536000;'
g.shelves_access = ub.session.query(ub.Shelf).filter(
or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == current_user.id)).order_by(ub.Shelf.name).all()
return resp return resp

Loading…
Cancel
Save