@ -35,13 +35,15 @@ from flask import Blueprint, flash, redirect, url_for, abort, request, make_resp
from flask_login import login_required , current_user , logout_user , confirm_login
from flask_babel import gettext as _
from sqlalchemy import and_
from sqlalchemy . orm . attributes import flag_modified
from sqlalchemy . exc import IntegrityError , OperationalError , InvalidRequestError
from sqlalchemy . sql . expression import func , or_
from . import constants , logger , helper , services
from . cli import filepicker
from . import db , calibre_db , ub , web_server , get_locale , config , updater_thread , babel , gdriveutils
from . helper import check_valid_domain , send_test_mail , reset_password , generate_password_hash
from . helper import check_valid_domain , send_test_mail , reset_password , generate_password_hash , check_email , \
valid_email , check_username
from . gdriveutils import is_gdrive_ready , gdrive_support
from . render_template import render_title_template , get_sidebar_config
from . import debug_info
@ -57,12 +59,12 @@ feature_support = {
' ldap ' : bool ( services . ldap ) ,
' goodreads ' : bool ( services . goodreads_support ) ,
' kobo ' : bool ( services . kobo ) ,
' updater ' : constants . UPDATER_AVAILABLE
' updater ' : constants . UPDATER_AVAILABLE ,
' gmail ' : bool ( services . gmail )
}
try :
# pylint: disable=unused-import
import rarfile
import rarfile # pylint: disable=unused-import
feature_support [ ' rar ' ] = True
except ( ImportError , SyntaxError ) :
feature_support [ ' rar ' ] = False
@ -150,7 +152,7 @@ def shutdown():
else :
showtext [ ' text ' ] = _ ( u ' Performing shutdown of server, please close window ' )
# stop gevent/tornado server
web_server . stop ( task == 0 )
web_server . stop ( task == 0 )
return json . dumps ( showtext )
if task == 2 :
@ -185,10 +187,10 @@ def admin():
else :
commit = version [ ' version ' ]
all _u ser = ub . session . query ( ub . User ) . all ( )
all U ser = ub . session . query ( ub . User ) . all ( )
email_settings = config . get_mail_settings ( )
kobo_support = feature_support [ ' kobo ' ] and config . config_kobo_sync
return render_title_template ( " admin.html " , allUser = all _u ser, email = email_settings , config = config , commit = commit ,
return render_title_template ( " admin.html " , allUser = all U ser, email = email_settings , config = config , commit = commit ,
feature_support = feature_support , kobo_support = kobo_support ,
title = _ ( u " Admin page " ) , page = " admin " )
@ -214,6 +216,173 @@ def view_configuration():
restrictColumns = restrict_columns ,
title = _ ( u " UI Configuration " ) , page = " uiconfig " )
@admi.route ( " /admin/usertable " )
@login_required
@admin_required
def edit_user_table ( ) :
visibility = current_user . view_settings . get ( ' useredit ' , { } )
languages = calibre_db . speaking_language ( )
translations = babel . list_translations ( ) + [ LC ( ' en ' ) ]
allUser = ub . session . query ( ub . User )
if not config . config_anonbrowse :
allUser = allUser . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ANONYMOUS ) != constants . ROLE_ANONYMOUS )
return render_title_template ( " user_table.html " ,
users = allUser . all ( ) ,
translations = translations ,
languages = languages ,
visiblility = visibility ,
all_roles = constants . ALL_ROLES ,
sidebar_settings = constants . sidebar_settings ,
title = _ ( u " Edit Users " ) ,
page = " usertable " )
@admi.route ( " /ajax/listusers " )
@login_required
@admin_required
def list_users ( ) :
off = request . args . get ( " offset " ) or 0
limit = request . args . get ( " limit " ) or 10
search = request . args . get ( " search " )
all_user = ub . session . query ( ub . User )
if not config . config_anonbrowse :
all_user = all_user . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ANONYMOUS ) != constants . ROLE_ANONYMOUS )
total_count = all_user . count ( )
if search :
users = all_user . filter ( or_ ( func . lower ( ub . User . name ) . ilike ( " % " + search + " % " ) ,
func . lower ( ub . User . kindle_mail ) . ilike ( " % " + search + " % " ) ,
func . lower ( ub . User . email ) . ilike ( " % " + search + " % " ) ) ) \
. offset ( off ) . limit ( limit ) . all ( )
filtered_count = len ( users )
else :
users = all_user . offset ( off ) . limit ( limit ) . all ( )
filtered_count = total_count
for user in users :
if user . default_language == " all " :
user . default = _ ( " all " )
else :
user . default = LC . parse ( user . default_language ) . get_language_name ( get_locale ( ) )
table_entries = { ' totalNotFiltered ' : total_count , ' total ' : filtered_count , " rows " : users }
js_list = json . dumps ( table_entries , cls = db . AlchemyEncoder )
response = make_response ( js_list )
response . headers [ " Content-Type " ] = " application/json; charset=utf-8 "
return response
@admi.route ( " /ajax/deleteuser " )
@login_required
@admin_required
def delete_user ( ) :
# ToDo User delete check also not last one
return " "
@admi.route ( " /ajax/getlocale " )
@login_required
@admin_required
def table_get_locale ( ) :
locale = babel . list_translations ( ) + [ LC ( ' en ' ) ]
ret = list ( )
current_locale = get_locale ( )
for loc in locale :
ret . append ( { ' value ' : str ( loc ) , ' text ' : loc . get_language_name ( current_locale ) } )
return json . dumps ( ret )
@admi.route ( " /ajax/getdefaultlanguage " )
@login_required
@admin_required
def table_get_default_lang ( ) :
languages = calibre_db . speaking_language ( )
ret = list ( )
ret . append ( { ' value ' : ' all ' , ' text ' : _ ( ' Show All ' ) } )
for lang in languages :
ret . append ( { ' value ' : lang . lang_code , ' text ' : lang . name } )
return json . dumps ( ret )
@admi.route ( " /ajax/editlistusers/<param> " , methods = [ ' POST ' ] )
@login_required
@admin_required
def edit_list_user ( param ) :
vals = request . form . to_dict ( flat = False )
all_user = ub . session . query ( ub . User )
if not config . config_anonbrowse :
all_user = all_user . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ANONYMOUS ) != constants . ROLE_ANONYMOUS )
# only one user is posted
if " pk " in vals :
users = [ all_user . filter ( ub . User . id == vals [ ' pk ' ] [ 0 ] ) . one_or_none ( ) ]
else :
if " pk[] " in vals :
users = all_user . filter ( ub . User . id . in_ ( vals [ ' pk[] ' ] ) ) . all ( )
else :
return " "
if ' field_index ' in vals :
vals [ ' field_index ' ] = vals [ ' field_index ' ] [ 0 ]
if ' value ' in vals :
vals [ ' value ' ] = vals [ ' value ' ] [ 0 ]
else :
return " "
for user in users :
try :
vals [ ' value ' ] = vals [ ' value ' ] . strip ( )
if param == ' name ' :
if user . name == " Guest " :
raise Exception ( _ ( " Guest Name can ' t be changed " ) )
user . name = check_username ( vals [ ' value ' ] )
elif param == ' email ' :
user . email = check_email ( vals [ ' value ' ] )
elif param == ' kindle_mail ' :
user . kindle_mail = valid_email ( vals [ ' value ' ] ) if vals [ ' value ' ] else " "
elif param == ' role ' :
if vals [ ' value ' ] == ' true ' :
user . role | = int ( vals [ ' field_index ' ] )
else :
if int ( vals [ ' field_index ' ] ) == constants . ROLE_ADMIN :
if not ub . session . query ( ub . User ) . \
filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ADMIN ) == constants . ROLE_ADMIN ,
ub . User . id != user . id ) . count ( ) :
return _ ( u " No admin user remaining, can ' t remove admin role " , nick = user . name ) , 400
user . role & = ~ int ( vals [ ' field_index ' ] )
elif param == ' sidebar_view ' :
if vals [ ' value ' ] == ' true ' :
user . sidebar_view | = int ( vals [ ' field_index ' ] )
else :
user . sidebar_view & = ~ int ( vals [ ' field_index ' ] )
elif param == ' denied_tags ' :
user . denied_tags = vals [ ' value ' ]
elif param == ' allowed_tags ' :
user . allowed_tags = vals [ ' value ' ]
elif param == ' allowed_column_value ' :
user . allowed_column_value = vals [ ' value ' ]
elif param == ' denied_column_value ' :
user . denied_column_value = vals [ ' value ' ]
elif param == ' locale ' :
user . locale = vals [ ' value ' ]
elif param == ' default_language ' :
user . default_language = vals [ ' value ' ]
except Exception as ex :
return str ( ex ) , 400
ub . session_commit ( )
return " "
@admi.route ( " /ajax/user_table_settings " , methods = [ ' POST ' ] )
@login_required
@admin_required
def update_table_settings ( ) :
current_user . view_settings [ ' useredit ' ] = json . loads ( request . data )
try :
try :
flag_modified ( current_user , " view_settings " )
except AttributeError :
pass
ub . session . commit ( )
except ( InvalidRequestError , OperationalError ) :
log . error ( " Invalid request received: {} " . format ( request ) )
return " Invalid request " , 400
return " "
@admi.route ( " /admin/viewconfig " , methods = [ " POST " ] )
@login_required
@ -262,6 +431,14 @@ def load_dialogtexts(element_id):
texts [ " main " ] = _ ( ' Do you really want to delete this user? ' )
elif element_id == " delete_shelf " :
texts [ " main " ] = _ ( ' Are you sure you want to delete this shelf? ' )
elif element_id == " select_locale " :
texts [ " main " ] = _ ( ' Are you sure you want to change locales of selected user(s)? ' )
elif element_id == " select_default_language " :
texts [ " main " ] = _ ( ' Are you sure you want to change visible book languages for selected user(s)? ' )
elif element_id == " role " :
texts [ " main " ] = _ ( ' Are you sure you want to change the selected role for the selected user(s)? ' )
elif element_id == " sidebar_view " :
texts [ " main " ] = _ ( ' Are you sure you want to change the selected visibility restrictions for the selected user(s)? ' )
return json . dumps ( texts )
@ -348,7 +525,7 @@ def edit_restriction(res_type, user_id):
elementlist = usr . list_allowed_tags ( )
elementlist [ int ( element [ ' id ' ] [ 1 : ] ) ] = element [ ' Element ' ]
usr . allowed_tags = ' , ' . join ( elementlist )
ub . session_commit ( " Changed allowed tags of user {} to {} " . format ( usr . n ickn ame, usr . allowed_tags ) )
ub . session_commit ( " Changed allowed tags of user {} to {} " . format ( usr . n ame, usr . allowed_tags ) )
if res_type == 3 : # CColumn per user
if isinstance ( user_id , int ) :
usr = ub . session . query ( ub . User ) . filter ( ub . User . id == int ( user_id ) ) . first ( )
@ -357,7 +534,7 @@ def edit_restriction(res_type, user_id):
elementlist = usr . list_allowed_column_values ( )
elementlist [ int ( element [ ' id ' ] [ 1 : ] ) ] = element [ ' Element ' ]
usr . allowed_column_value = ' , ' . join ( elementlist )
ub . session_commit ( " Changed allowed columns of user {} to {} " . format ( usr . n ickn ame, usr . allowed_column_value ) )
ub . session_commit ( " Changed allowed columns of user {} to {} " . format ( usr . n ame, usr . allowed_column_value ) )
if element [ ' id ' ] . startswith ( ' d ' ) :
if res_type == 0 : # Tags as template
elementlist = config . list_denied_tags ( )
@ -377,7 +554,7 @@ def edit_restriction(res_type, user_id):
elementlist = usr . list_denied_tags ( )
elementlist [ int ( element [ ' id ' ] [ 1 : ] ) ] = element [ ' Element ' ]
usr . denied_tags = ' , ' . join ( elementlist )
ub . session_commit ( " Changed denied tags of user {} to {} " . format ( usr . n ickn ame, usr . denied_tags ) )
ub . session_commit ( " Changed denied tags of user {} to {} " . format ( usr . n ame, usr . denied_tags ) )
if res_type == 3 : # CColumn per user
if isinstance ( user_id , int ) :
usr = ub . session . query ( ub . User ) . filter ( ub . User . id == int ( user_id ) ) . first ( )
@ -386,7 +563,7 @@ def edit_restriction(res_type, user_id):
elementlist = usr . list_denied_column_values ( )
elementlist [ int ( element [ ' id ' ] [ 1 : ] ) ] = element [ ' Element ' ]
usr . denied_column_value = ' , ' . join ( elementlist )
ub . session_commit ( " Changed denied columns of user {} to {} " . format ( usr . n ickn ame, usr . denied_column_value ) )
ub . session_commit ( " Changed denied columns of user {} to {} " . format ( usr . n ame, usr . denied_column_value ) )
return " "
@ -433,10 +610,10 @@ def add_restriction(res_type, user_id):
usr = current_user
if ' submit_allow ' in element :
usr . allowed_tags = restriction_addition ( element , usr . list_allowed_tags )
ub . session_commit ( " Changed allowed tags of user {} to {} " . format ( usr . n ickn ame, usr . list_allowed_tags ) )
ub . session_commit ( " Changed allowed tags of user {} to {} " . format ( usr . n ame, usr . list_allowed_tags ) )
elif ' submit_deny ' in element :
usr . denied_tags = restriction_addition ( element , usr . list_denied_tags )
ub . session_commit ( " Changed denied tags of user {} to {} " . format ( usr . n ickn ame, usr . list_denied_tags ) )
ub . session_commit ( " Changed denied tags of user {} to {} " . format ( usr . n ame, usr . list_denied_tags ) )
if res_type == 3 : # CustomC per user
if isinstance ( user_id , int ) :
usr = ub . session . query ( ub . User ) . filter ( ub . User . id == int ( user_id ) ) . first ( )
@ -444,11 +621,11 @@ def add_restriction(res_type, user_id):
usr = current_user
if ' submit_allow ' in element :
usr . allowed_column_value = restriction_addition ( element , usr . list_allowed_column_values )
ub . session_commit ( " Changed allowed columns of user {} to {} " . format ( usr . n ickn ame,
ub . session_commit ( " Changed allowed columns of user {} to {} " . format ( usr . n ame,
usr . list_allowed_column_values ) )
elif ' submit_deny ' in element :
usr . denied_column_value = restriction_addition ( element , usr . list_denied_column_values )
ub . session_commit ( " Changed denied columns of user {} to {} " . format ( usr . n ickn ame,
ub . session_commit ( " Changed denied columns of user {} to {} " . format ( usr . n ame,
usr . list_denied_column_values ) )
return " "
@ -480,10 +657,10 @@ def delete_restriction(res_type, user_id):
usr = current_user
if element [ ' id ' ] . startswith ( ' a ' ) :
usr . allowed_tags = restriction_deletion ( element , usr . list_allowed_tags )
ub . session_commit ( " Deleted allowed tags of user {} : {} " . format ( usr . n ickn ame, usr . list_allowed_tags ) )
ub . session_commit ( " Deleted allowed tags of user {} : {} " . format ( usr . n ame, usr . list_allowed_tags ) )
elif element [ ' id ' ] . startswith ( ' d ' ) :
usr . denied_tags = restriction_deletion ( element , usr . list_denied_tags )
ub . session_commit ( " Deleted denied tags of user {} : {} " . format ( usr . n ickn ame, usr . list_allowed_tags ) )
ub . session_commit ( " Deleted denied tags of user {} : {} " . format ( usr . n ame, usr . list_allowed_tags ) )
elif res_type == 3 : # Columns per user
if isinstance ( user_id , int ) :
usr = ub . session . query ( ub . User ) . filter ( ub . User . id == int ( user_id ) ) . first ( )
@ -491,12 +668,12 @@ def delete_restriction(res_type, user_id):
usr = current_user
if element [ ' id ' ] . startswith ( ' a ' ) :
usr . allowed_column_value = restriction_deletion ( element , usr . list_allowed_column_values )
ub . session_commit ( " Deleted allowed columns of user {} : {} " . format ( usr . n ickn ame,
ub . session_commit ( " Deleted allowed columns of user {} : {} " . format ( usr . n ame,
usr . list_allowed_column_values ) )
elif element [ ' id ' ] . startswith ( ' d ' ) :
usr . denied_column_value = restriction_deletion ( element , usr . list_denied_column_values )
ub . session_commit ( " Deleted denied columns of user {} : {} " . format ( usr . n ickn ame,
ub . session_commit ( " Deleted denied columns of user {} : {} " . format ( usr . n ame,
usr . list_denied_column_values ) )
return " "
@ -602,7 +779,6 @@ def pathchooser():
folders = [ ]
files = [ ]
# locale = get_locale()
for f in folders :
try :
data = { " name " : f , " fullpath " : os . path . join ( cwd , f ) }
@ -730,13 +906,35 @@ def _configuration_logfile_helper(to_save, gdrive_error):
return reboot_required , None
def _configuration_ldap_check ( reboot_required , to_save , gdrive_error ) :
def _configuration_ldap_helper ( to_save , gdrive_error ) :
reboot_required = False
reboot_required | = _config_string ( to_save , " config_ldap_provider_url " )
reboot_required | = _config_int ( to_save , " config_ldap_port " )
reboot_required | = _config_int ( to_save , " config_ldap_authentication " )
reboot_required | = _config_string ( to_save , " config_ldap_dn " )
reboot_required | = _config_string ( to_save , " config_ldap_serv_username " )
reboot_required | = _config_string ( to_save , " config_ldap_user_object " )
reboot_required | = _config_string ( to_save , " config_ldap_group_object_filter " )
reboot_required | = _config_string ( to_save , " config_ldap_group_members_field " )
reboot_required | = _config_string ( to_save , " config_ldap_member_user_object " )
reboot_required | = _config_checkbox ( to_save , " config_ldap_openldap " )
reboot_required | = _config_int ( to_save , " config_ldap_encryption " )
reboot_required | = _config_string ( to_save , " config_ldap_cacert_path " )
reboot_required | = _config_string ( to_save , " config_ldap_cert_path " )
reboot_required | = _config_string ( to_save , " config_ldap_key_path " )
_config_string ( to_save , " config_ldap_group_name " )
if to_save . get ( " config_ldap_serv_password " , " " ) != " " :
reboot_required | = 1
config . set_from_dictionary ( to_save , " config_ldap_serv_password " , base64 . b64encode , encode = ' UTF-8 ' )
config . save ( )
if not config . config_ldap_provider_url \
or not config . config_ldap_port \
or not config . config_ldap_dn \
or not config . config_ldap_user_object :
or not config . config_ldap_port \
or not config . config_ldap_dn \
or not config . config_ldap_user_object :
return reboot_required , _configuration_result ( _ ( ' Please Enter a LDAP Provider, '
' Port, DN and User Object Identifier ' ) , gdrive_error )
if config . config_ldap_authentication > constants . LDAP_AUTH_ANONYMOUS :
if config . config_ldap_authentication > constants . LDAP_AUTH_UNAUTHENTICATE :
if not config . config_ldap_serv_username or not bool ( config . config_ldap_serv_password ) :
@ -746,14 +944,6 @@ def _configuration_ldap_check(reboot_required, to_save, gdrive_error):
if not config . config_ldap_serv_username :
return reboot_required , _configuration_result ( ' Please Enter a LDAP Service Account ' , gdrive_error )
if config . config_ldap_group_object_filter :
if config . config_ldap_group_object_filter . count ( " %s " ) != 1 :
return reboot_required , \
_configuration_result ( _ ( ' LDAP Group Object Filter Needs to Have One " %s " Format Identifier ' ) ,
gdrive_error )
if config . config_ldap_group_object_filter . count ( " ( " ) != config . config_ldap_group_object_filter . count ( " ) " ) :
return reboot_required , _configuration_result ( _ ( ' LDAP Group Object Filter Has Unmatched Parenthesis ' ) ,
gdrive_error )
if config . config_ldap_group_object_filter :
if config . config_ldap_group_object_filter . count ( " %s " ) != 1 :
return reboot_required , \
@ -771,7 +961,7 @@ def _configuration_ldap_check(reboot_required, to_save, gdrive_error):
return reboot_required , _configuration_result ( _ ( ' LDAP User Object Filter Has Unmatched Parenthesis ' ) ,
gdrive_error )
if " ldap_import_user_filter " in to_save and to_save [ " ldap_import_user_filter " ] == ' 0 ' :
if to_save . get ( " ldap_import_user_filter " ) == ' 0 ' :
config . config_ldap_member_user_object = " "
else :
if config . config_ldap_member_user_object . count ( " %s " ) != 1 :
@ -793,31 +983,6 @@ def _configuration_ldap_check(reboot_required, to_save, gdrive_error):
return reboot_required , None
def _configuration_ldap_helper ( to_save , gdrive_error ) :
reboot_required = False
reboot_required | = _config_string ( to_save , " config_ldap_provider_url " )
reboot_required | = _config_int ( to_save , " config_ldap_port " )
reboot_required | = _config_int ( to_save , " config_ldap_authentication " )
reboot_required | = _config_string ( to_save , " config_ldap_dn " )
reboot_required | = _config_string ( to_save , " config_ldap_serv_username " )
reboot_required | = _config_string ( to_save , " config_ldap_user_object " )
reboot_required | = _config_string ( to_save , " config_ldap_group_object_filter " )
reboot_required | = _config_string ( to_save , " config_ldap_group_members_field " )
reboot_required | = _config_string ( to_save , " config_ldap_member_user_object " )
reboot_required | = _config_checkbox ( to_save , " config_ldap_openldap " )
reboot_required | = _config_int ( to_save , " config_ldap_encryption " )
reboot_required | = _config_string ( to_save , " config_ldap_cacert_path " )
reboot_required | = _config_string ( to_save , " config_ldap_cert_path " )
reboot_required | = _config_string ( to_save , " config_ldap_key_path " )
_config_string ( to_save , " config_ldap_group_name " )
if " config_ldap_serv_password " in to_save and to_save [ " config_ldap_serv_password " ] != " " :
reboot_required | = 1
config . set_from_dictionary ( to_save , " config_ldap_serv_password " , base64 . b64encode , encode = ' UTF-8 ' )
config . save ( )
return _configuration_ldap_check ( reboot_required , to_save , gdrive_error )
def _configuration_update_helper ( configured ) :
reboot_required = False
db_change = False
@ -921,8 +1086,8 @@ def _configuration_update_helper(configured):
if config . config_use_google_drive and is_gdrive_ready ( ) and not os . path . exists ( metadata_db ) :
gdriveutils . downloadFile ( None , " metadata.db " , metadata_db )
db_change = True
except Exception as e :
return _configuration_result ( ' %s ' % e , gdrive_error , configured )
except Exception as e x :
return _configuration_result ( ' %s ' % e x , gdrive_error , configured )
if db_change :
if not calibre_db . setup_db ( config , ub . app_DB_path ) :
@ -974,7 +1139,6 @@ def _configuration_result(error_flash=None, gdrive_error=None, configured=True):
def _handle_new_user ( to_save , content , languages , translations , kobo_support ) :
content . default_language = to_save [ " default_language " ]
# content.mature_content = "Show_mature_content" in to_save
content . locale = to_save . get ( " locale " , content . locale )
content . sidebar_view = sum ( int ( key [ 5 : ] ) for key in to_save if key . startswith ( ' show_ ' ) )
@ -982,28 +1146,21 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support):
content . sidebar_view | = constants . DETAIL_RANDOM
content . role = constants . selected_roles ( to_save )
if not to_save [ " nickname " ] or not to_save [ " email " ] or not to_save [ " password " ] :
flash ( _ ( u " Please fill out all fields! " ) , category = " error " )
return render_title_template ( " user_edit.html " , new_user = 1 , content = content , translations = translations ,
registered_oauth = oauth_check , kobo_support = kobo_support ,
title = _ ( u " Add new user " ) )
content . password = generate_password_hash ( to_save [ " password " ] )
existing_user = ub . session . query ( ub . User ) . filter ( func . lower ( ub . User . nickname ) == to_save [ " nickname " ] . lower ( ) ) \
. first ( )
existing_email = ub . session . query ( ub . User ) . filter ( ub . User . email == to_save [ " email " ] . lower ( ) ) \
. first ( )
if not existing_user and not existing_email :
content . nickname = to_save [ " nickname " ]
if config . config_public_reg and not check_valid_domain ( to_save [ " email " ] ) :
flash ( _ ( u " E-mail is not from valid domain " ) , category = " error " )
return render_title_template ( " user_edit.html " , new_user = 1 , content = content , translations = translations ,
registered_oauth = oauth_check , kobo_support = kobo_support ,
title = _ ( u " Add new user " ) )
else :
content . email = to_save [ " email " ]
else :
flash ( _ ( u " Found an existing account for this e-mail address or nickname. " ) , category = " error " )
try :
if not to_save [ " name " ] or not to_save [ " email " ] or not to_save [ " password " ] :
log . info ( " Missing entries on new user " )
raise Exception ( _ ( u " Please fill out all fields! " ) )
content . email = check_email ( to_save [ " email " ] )
# Query User name, if not existing, change
content . name = check_username ( to_save [ " name " ] )
if to_save . get ( " kindle_mail " ) :
content . kindle_mail = valid_email ( to_save [ " kindle_mail " ] )
if config . config_public_reg and not check_valid_domain ( content . email ) :
log . info ( " E-mail: {} for new user is not from valid domain " . format ( content . email ) )
raise Exception ( _ ( u " E-mail is not from valid domain " ) )
except Exception as ex :
flash ( str ( ex ) , category = " error " )
return render_title_template ( " user_edit.html " , new_user = 1 , content = content , translations = translations ,
languages = languages , title = _ ( u " Add new user " ) , page = " newuser " ,
kobo_support = kobo_support , registered_oauth = oauth_check )
@ -1014,49 +1171,33 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support):
content . denied_column_value = config . config_denied_column_value
ub . session . add ( content )
ub . session . commit ( )
flash ( _ ( u " User ' %(user)s ' created " , user = content . n ickn ame) , category = " success " )
flash ( _ ( u " User ' %(user)s ' created " , user = content . n ame) , category = " success " )
return redirect ( url_for ( ' admin.admin ' ) )
except IntegrityError :
ub . session . rollback ( )
flash ( _ ( u " Found an existing account for this e-mail address or nickname. " ) , category = " error " )
except OperationalError :
ub . session . rollback ( )
flash ( _ ( u " Settings DB is not Writeable " ) , category = " error " )
def delete_user ( content ) :
if ub . session . query ( ub . User ) . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ADMIN ) == constants . ROLE_ADMIN ,
ub . User . id != content . id ) . count ( ) :
ub . session . query ( ub . User ) . filter ( ub . User . id == content . id ) . delete ( )
ub . session_commit ( )
flash ( _ ( u " User ' %(nick)s ' deleted " , nick = content . nickname ) , category = " success " )
return redirect ( url_for ( ' admin.admin ' ) )
else :
flash ( _ ( u " No admin user remaining, can ' t delete user " , nick = content . nickname ) , category = " error " )
return redirect ( url_for ( ' admin.admin ' ) )
def save_edited_user ( content ) :
try :
ub . session_commit ( )
flash ( _ ( u " User ' %(nick)s ' updated " , nick = content . nickname ) , category = " success " )
except IntegrityError :
ub . session . rollback ( )
flash ( _ ( u " An unknown error occured. " ) , category = " error " )
flash ( _ ( u " Found an existing account for this e-mail address or name. " ) , category = " error " )
except OperationalError :
ub . session . rollback ( )
flash ( _ ( u " Settings DB is not Writeable " ) , category = " error " )
def _handle_edit_user ( to_save , content , languages , translations , kobo_support ) :
if " delete " in to_save :
return delete_user ( content )
if to_save . get ( " delete " ) :
if ub . session . query ( ub . User ) . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ADMIN ) == constants . ROLE_ADMIN ,
ub . User . id != content . id ) . count ( ) :
ub . session . query ( ub . User ) . filter ( ub . User . id == content . id ) . delete ( )
ub . session_commit ( )
flash ( _ ( u " User ' %(nick)s ' deleted " , nick = content . name ) , category = " success " )
return redirect ( url_for ( ' admin.admin ' ) )
else :
flash ( _ ( u " No admin user remaining, can ' t delete user " , nick = content . name ) , category = " error " )
return redirect ( url_for ( ' admin.admin ' ) )
else :
if not ub . session . query ( ub . User ) . filter ( ub . User . role . op ( ' & ' ) ( constants . ROLE_ADMIN ) == constants . ROLE_ADMIN ,
ub . User . id != content . id ) . count ( ) and ' admin_role ' not in to_save :
flash ( _ ( u " No admin user remaining, can ' t remove admin role " , nick = content . nickname ) , category = " error " )
flash ( _ ( u " No admin user remaining, can ' t remove admin role " , nick = content . name ) , category = " error " )
return redirect ( url_for ( ' admin.admin ' ) )
if " password " in to_save and to_save [ " password " ] :
if to_save . get ( " password " ) :
content . password = generate_password_hash ( to_save [ " password " ] )
anonymous = content . is_anonymous
content . role = constants . selected_roles ( to_save )
@ -1074,50 +1215,46 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support):
elif value not in val and content . check_visibility ( value ) :
content . sidebar_view & = ~ value
if " Show_detail_random " in to_save :
if to_save . get ( " Show_detail_random " ) :
content . sidebar_view | = constants . DETAIL_RANDOM
else :
content . sidebar_view & = ~ constants . DETAIL_RANDOM
if " default_language " in to_save :
if to_save . get ( " default_language " ) :
content . default_language = to_save [ " default_language " ]
if " locale " in to_save and to_save [ " locale " ] :
if to_save . get ( " locale " ) :
content . locale = to_save [ " locale " ]
if to_save [ " email " ] and to_save [ " email " ] != content . email :
existing_email = ub . session . query ( ub . User ) . filter ( ub . User . email == to_save [ " email " ] . lower ( ) ) \
. first ( )
if not existing_email :
content . email = to_save [ " email " ]
else :
flash ( _ ( u " Found an existing account for this e-mail address. " ) , category = " error " )
return render_title_template ( " user_edit.html " ,
translations = translations ,
languages = languages ,
mail_configured = config . get_mail_server_configured ( ) ,
kobo_support = kobo_support ,
new_user = 0 ,
content = content ,
registered_oauth = oauth_check ,
title = _ ( u " Edit User %(nick)s " , nick = content . nickname ) , page = " edituser " )
if " nickname " in to_save and to_save [ " nickname " ] != content . nickname :
# Query User nickname, if not existing, change
if not ub . session . query ( ub . User ) . filter ( ub . User . nickname == to_save [ " nickname " ] ) . scalar ( ) :
content . nickname = to_save [ " nickname " ]
else :
flash ( _ ( u " This username is already taken " ) , category = " error " )
return render_title_template ( " user_edit.html " ,
translations = translations ,
languages = languages ,
mail_configured = config . get_mail_server_configured ( ) ,
new_user = 0 , content = content ,
registered_oauth = oauth_check ,
kobo_support = kobo_support ,
title = _ ( u " Edit User %(nick)s " , nick = content . nickname ) ,
page = " edituser " )
if " kindle_mail " in to_save and to_save [ " kindle_mail " ] != content . kindle_mail :
content . kindle_mail = to_save [ " kindle_mail " ]
return save_edited_user ( content )
try :
if to_save . get ( " email " , content . email ) != content . email :
content . email = check_email ( to_save [ " email " ] )
# Query User name, if not existing, change
if to_save . get ( " name " , content . name ) != content . name :
if to_save . get ( " name " ) == " Guest " :
raise Exception ( _ ( " Guest Name can ' t be changed " ) )
content . name = check_username ( to_save [ " name " ] )
if to_save . get ( " kindle_mail " ) != content . kindle_mail :
content . kindle_mail = valid_email ( to_save [ " kindle_mail " ] ) if to_save [ " kindle_mail " ] else " "
except Exception as ex :
flash ( str ( ex ) , category = " error " )
return render_title_template ( " user_edit.html " ,
translations = translations ,
languages = languages ,
mail_configured = config . get_mail_server_configured ( ) ,
kobo_support = kobo_support ,
new_user = 0 ,
content = content ,
registered_oauth = oauth_check ,
title = _ ( u " Edit User %(nick)s " , nick = content . name ) ,
page = " edituser " )
try :
ub . session_commit ( )
flash ( _ ( u " User ' %(nick)s ' updated " , nick = content . name ) , category = " success " )
except IntegrityError :
ub . session . rollback ( )
flash ( _ ( u " An unknown error occured. " ) , category = " error " )
except OperationalError :
ub . session . rollback ( )
flash ( _ ( u " Settings DB is not Writeable " ) , category = " error " )
@admi.route ( " /admin/user/new " , methods = [ " GET " , " POST " ] )
@ -1145,7 +1282,7 @@ def new_user():
def edit_mailsettings ( ) :
content = config . get_mail_settings ( )
return render_title_template ( " email_edit.html " , content = content , title = _ ( u " Edit E-mail Server Settings " ) ,
page = " mailset " )
page = " mailset " , feature_support = feature_support )
@admi.route ( " /admin/mailsettings " , methods = [ " POST " ] )
@ -1153,14 +1290,30 @@ def edit_mailsettings():
@admin_required
def update_mailsettings ( ) :
to_save = request . form . to_dict ( )
_config_int ( to_save , " mail_server_type " )
if to_save . get ( " invalidate " ) :
config . mail_gmail_token = { }
try :
flag_modified ( config , " mail_gmail_token " )
except AttributeError :
pass
elif to_save . get ( " gmail " ) :
try :
config . mail_gmail_token = services . gmail . setup_gmail ( config . mail_gmail_token )
flash ( _ ( u " G-Mail Account Verification Successful " ) , category = " success " )
except Exception as ex :
flash ( str ( ex ) , category = " error " )
log . error ( ex )
return edit_mailsettings ( )
_config_string ( to_save , " mail_server " )
_config_int ( to_save , " mail_port " )
_config_int ( to_save , " mail_use_ssl " )
_config_string ( to_save , " mail_login " )
_config_string ( to_save , " mail_password " )
_config_string ( to_save , " mail_from " )
_config_int ( to_save , " mail_size " , lambda y : int ( y ) * 1024 * 1024 )
else :
_config_string ( to_save , " mail_server " )
_config_int ( to_save , " mail_port " )
_config_int ( to_save , " mail_use_ssl " )
_config_string ( to_save , " mail_login " )
_config_string ( to_save , " mail_password " )
_config_string ( to_save , " mail_from " )
_config_int ( to_save , " mail_size " , lambda y : int ( y ) * 1024 * 1024 )
try :
config . save ( )
except ( OperationalError , InvalidRequestError ) :
@ -1170,10 +1323,10 @@ def update_mailsettings():
if to_save . get ( " test " ) :
if current_user . email :
result = send_test_mail ( current_user . email , current_user . n ickn ame)
result = send_test_mail ( current_user . email , current_user . n ame)
if result is None :
flash ( _ ( u " Test e-mail queued for sending to %(email)s , please check Tasks for result " , email = current_user . email ) ,
category = " info " )
flash ( _ ( u " Test e-mail queued for sending to %(email)s , please check Tasks for result " ,
email = current_user . email ) , category = " info " )
else :
flash ( _ ( u " There was an error sending the Test e-mail: %(res)s " , res = result ) , category = " error " )
else :
@ -1189,7 +1342,7 @@ def update_mailsettings():
@admin_required
def edit_user ( user_id ) :
content = ub . session . query ( ub . User ) . filter ( ub . User . id == int ( user_id ) ) . first ( ) # type: ub.User
if not content or ( not config . config_anonbrowse and content . n ickn ame == " Guest " ) :
if not content or ( not config . config_anonbrowse and content . n ame == " Guest " ) :
flash ( _ ( u " User not found " ) , category = " error " )
return redirect ( url_for ( ' admin.admin ' ) )
languages = calibre_db . speaking_language ( )
@ -1206,7 +1359,8 @@ def edit_user(user_id):
registered_oauth = oauth_check ,
mail_configured = config . get_mail_server_configured ( ) ,
kobo_support = kobo_support ,
title = _ ( u " Edit User %(nick)s " , nick = content . nickname ) , page = " edituser " )
title = _ ( u " Edit User %(nick)s " , nick = content . name ) ,
page = " edituser " )
@admi.route ( " /admin/resetpassword/<int:user_id> " )
@ -1328,18 +1482,15 @@ def get_updater_status():
return ' '
def create_ldap_user ( user , user_data , config ) :
imported = 0
showtext = None
def ldap_import_create_user ( user , user_data ) :
user_login_field = extract_dynamic_field_from_filter ( user , config . config_ldap_user_object )
username = user_data [ user_login_field ] [ 0 ] . decode ( ' utf-8 ' )
# check for duplicate username
if ub . session . query ( ub . User ) . filter ( func . lower ( ub . User . n ickn ame) == username . lower ( ) ) . first ( ) :
# if ub.session.query(ub.User).filter(ub.User.n ickn ame == username).first():
if ub . session . query ( ub . User ) . filter ( func . lower ( ub . User . n ame) == username . lower ( ) ) . first ( ) :
# if ub.session.query(ub.User).filter(ub.User.n ame == username).first():
log . warning ( " LDAP User %s Already in Database " , user_data )
return imported , showtext
return 0 , None
kindlemail = ' '
if ' mail ' in user_data :
@ -1350,13 +1501,15 @@ def create_ldap_user(user, user_data, config):
else :
log . debug ( ' No Mail Field Found in LDAP Response ' )
useremail = username + ' @email.com '
# check for duplicate email
if ub . session . query ( ub . User ) . filter ( func . lower ( ub . User . email ) == useremail . lower ( ) ) . first ( ) :
log . warning ( " LDAP Email %s Already in Database " , user_data )
return imported , showtext
try :
# check for duplicate email
useremail = check_email ( useremail )
except Exception as ex :
log . warning ( " LDAP Email Error: {} , {} " . format ( user_data , ex ) )
return 0 , None
content = ub . User ( )
content . nickname = username
content . n ame = username
content . password = ' ' # dummy password which will be replaced by ldap one
content . email = useremail
content . kindle_mail = kindlemail
@ -1369,12 +1522,12 @@ def create_ldap_user(user, user_data, config):
ub . session . add ( content )
try :
ub . session . commit ( )
imported = 1
except Exception as e :
log . warning ( " Failed to create LDAP user: %s - %s " , user , e )
return 1 , None # increase no of users
except Exception as e x :
log . warning ( " Failed to create LDAP user: %s - %s " , user , e x )
ub . session . rollback ( )
showtext = _ ( u ' Failed to Create at Least One LDAP User ' )
return imported , showtext
message = _ ( u ' Failed to Create at Least One LDAP User ' )
return 0 , message
@admi.route ( ' /import_ldap_users ' )
@ -1404,23 +1557,23 @@ def import_ldap_users():
query_filter = config . config_ldap_user_object
try :
user_identifier = extract_user_identifier ( user , query_filter )
except Exception as e :
log . warning ( e )
except Exception as e x :
log . warning ( e x )
continue
else :
user_identifier = user
query_filter = None
try :
user_data = services . ldap . get_object_details ( user = user_identifier , query_filter = query_filter )
except AttributeError as e :
log . debug_or_exception ( e )
except AttributeError as e x :
log . debug_or_exception ( e x )
continue
if user_data :
success, txt = create_ldap _user( user , user_data , config )
# In case of error store text for showing it
if txt :
showtext [ ' text ' ] = txt
imported + = success
user_count, message = ldap_import_create _user( user , user_data )
if message :
showtext [ ' text ' ] = message
else :
imported + = user_count
else :
log . warning ( " LDAP User: %s Not Found " , user )
showtext [ ' text ' ] = _ ( u ' At Least One LDAP User Not Found in Database ' )