From 53ee0aaee1aafa3f0eda8288e9fe2ec2c6058da9 Mon Sep 17 00:00:00 2001 From: Ozzie Isaacs Date: Sun, 14 Mar 2021 17:34:47 +0100 Subject: [PATCH] Some functions refactored --- cps/admin.py | 206 +++++++++++++++++++++++++++++---------------------- 1 file changed, 119 insertions(+), 87 deletions(-) diff --git a/cps/admin.py b/cps/admin.py index 767aece3..b932bd8e 100644 --- a/cps/admin.py +++ b/cps/admin.py @@ -61,6 +61,7 @@ feature_support = { } try: + # pylint: disable=unused-import import rarfile feature_support['rar'] = True except (ImportError, SyntaxError): @@ -184,10 +185,10 @@ def admin(): else: commit = version['version'] - allUser = ub.session.query(ub.User).all() + all_user = ub.session.query(ub.User).all() email_settings = config.get_mail_settings() kobo_support = feature_support['kobo'] and config.config_kobo_sync - return render_title_template("admin.html", allUser=allUser, email=email_settings, config=config, commit=commit, + return render_title_template("admin.html", allUser=all_user, email=email_settings, config=config, commit=commit, feature_support=feature_support, kobo_support=kobo_support, title=_(u"Admin page"), page="admin") @@ -729,35 +730,13 @@ def _configuration_logfile_helper(to_save, gdrive_error): return reboot_required, None -def _configuration_ldap_helper(to_save, gdrive_error): - reboot_required = False - reboot_required |= _config_string(to_save, "config_ldap_provider_url") - reboot_required |= _config_int(to_save, "config_ldap_port") - reboot_required |= _config_int(to_save, "config_ldap_authentication") - reboot_required |= _config_string(to_save, "config_ldap_dn") - reboot_required |= _config_string(to_save, "config_ldap_serv_username") - reboot_required |= _config_string(to_save, "config_ldap_user_object") - reboot_required |= _config_string(to_save, "config_ldap_group_object_filter") - reboot_required |= _config_string(to_save, "config_ldap_group_members_field") - reboot_required |= _config_string(to_save, "config_ldap_member_user_object") - reboot_required |= _config_checkbox(to_save, "config_ldap_openldap") - reboot_required |= _config_int(to_save, "config_ldap_encryption") - reboot_required |= _config_string(to_save, "config_ldap_cacert_path") - reboot_required |= _config_string(to_save, "config_ldap_cert_path") - reboot_required |= _config_string(to_save, "config_ldap_key_path") - _config_string(to_save, "config_ldap_group_name") - if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "": - reboot_required |= 1 - config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8') - config.save() - +def _configuration_ldap_check(reboot_required, to_save, gdrive_error): if not config.config_ldap_provider_url \ - or not config.config_ldap_port \ - or not config.config_ldap_dn \ - or not config.config_ldap_user_object: + or not config.config_ldap_port \ + or not config.config_ldap_dn \ + or not config.config_ldap_user_object: return reboot_required, _configuration_result(_('Please Enter a LDAP Provider, ' 'Port, DN and User Object Identifier'), gdrive_error) - if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS: if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE: if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password): @@ -767,6 +746,14 @@ def _configuration_ldap_helper(to_save, gdrive_error): if not config.config_ldap_serv_username: return reboot_required, _configuration_result('Please Enter a LDAP Service Account', gdrive_error) + if config.config_ldap_group_object_filter: + if config.config_ldap_group_object_filter.count("%s") != 1: + return reboot_required, \ + _configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'), + gdrive_error) + if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"): + return reboot_required, _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'), + gdrive_error) if config.config_ldap_group_object_filter: if config.config_ldap_group_object_filter.count("%s") != 1: return reboot_required, \ @@ -806,6 +793,31 @@ def _configuration_ldap_helper(to_save, gdrive_error): return reboot_required, None +def _configuration_ldap_helper(to_save, gdrive_error): + reboot_required = False + reboot_required |= _config_string(to_save, "config_ldap_provider_url") + reboot_required |= _config_int(to_save, "config_ldap_port") + reboot_required |= _config_int(to_save, "config_ldap_authentication") + reboot_required |= _config_string(to_save, "config_ldap_dn") + reboot_required |= _config_string(to_save, "config_ldap_serv_username") + reboot_required |= _config_string(to_save, "config_ldap_user_object") + reboot_required |= _config_string(to_save, "config_ldap_group_object_filter") + reboot_required |= _config_string(to_save, "config_ldap_group_members_field") + reboot_required |= _config_string(to_save, "config_ldap_member_user_object") + reboot_required |= _config_checkbox(to_save, "config_ldap_openldap") + reboot_required |= _config_int(to_save, "config_ldap_encryption") + reboot_required |= _config_string(to_save, "config_ldap_cacert_path") + reboot_required |= _config_string(to_save, "config_ldap_cert_path") + reboot_required |= _config_string(to_save, "config_ldap_key_path") + _config_string(to_save, "config_ldap_group_name") + if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "": + reboot_required |= 1 + config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8') + config.save() + + return _configuration_ldap_check(reboot_required, to_save, gdrive_error) + + def _configuration_update_helper(configured): reboot_required = False db_change = False @@ -1011,18 +1023,33 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support): ub.session.rollback() flash(_(u"Settings DB is not Writeable"), category="error") +def delete_user(content): + if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, + ub.User.id != content.id).count(): + ub.session.query(ub.User).filter(ub.User.id == content.id).delete() + ub.session_commit() + flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success") + return redirect(url_for('admin.admin')) + else: + flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error") + return redirect(url_for('admin.admin')) + + +def save_edited_user(content): + try: + ub.session_commit() + flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success") + except IntegrityError: + ub.session.rollback() + flash(_(u"An unknown error occured."), category="error") + except OperationalError: + ub.session.rollback() + flash(_(u"Settings DB is not Writeable"), category="error") + def _handle_edit_user(to_save, content, languages, translations, kobo_support): if "delete" in to_save: - if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, - ub.User.id != content.id).count(): - ub.session.query(ub.User).filter(ub.User.id == content.id).delete() - ub.session_commit() - flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success") - return redirect(url_for('admin.admin')) - else: - flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error") - return redirect(url_for('admin.admin')) + return delete_user(content) else: if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, ub.User.id != content.id).count() and 'admin_role' not in to_save: @@ -1090,15 +1117,7 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support): if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail: content.kindle_mail = to_save["kindle_mail"] - try: - ub.session_commit() - flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success") - except IntegrityError: - ub.session.rollback() - flash(_(u"An unknown error occured."), category="error") - except OperationalError: - ub.session.rollback() - flash(_(u"Settings DB is not Writeable"), category="error") + return save_edited_user(content) @admi.route("/admin/user/new", methods=["GET", "POST"]) @@ -1310,6 +1329,55 @@ def get_updater_status(): return '' +def create_ldap_user(user, user_data, config): + imported = 0 + showtext = None + + user_login_field = extract_dynamic_field_from_filter(user, config.config_ldap_user_object) + + username = user_data[user_login_field][0].decode('utf-8') + # check for duplicate username + if ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == username.lower()).first(): + # if ub.session.query(ub.User).filter(ub.User.nickname == username).first(): + log.warning("LDAP User %s Already in Database", user_data) + return imported, showtext + + kindlemail = '' + if 'mail' in user_data: + useremail = user_data['mail'][0].decode('utf-8') + if len(user_data['mail']) > 1: + kindlemail = user_data['mail'][1].decode('utf-8') + + else: + log.debug('No Mail Field Found in LDAP Response') + useremail = username + '@email.com' + # check for duplicate email + if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first(): + log.warning("LDAP Email %s Already in Database", user_data) + return imported, showtext + + content = ub.User() + content.nickname = username + content.password = '' # dummy password which will be replaced by ldap one + content.email = useremail + content.kindle_mail = kindlemail + content.role = config.config_default_role + content.sidebar_view = config.config_default_show + content.allowed_tags = config.config_allowed_tags + content.denied_tags = config.config_denied_tags + content.allowed_column_value = config.config_allowed_column_value + content.denied_column_value = config.config_denied_column_value + ub.session.add(content) + try: + ub.session.commit() + imported = 1 + except Exception as e: + log.warning("Failed to create LDAP user: %s - %s", user, e) + ub.session.rollback() + showtext = _(u'Failed to Create at Least One LDAP User') + return imported, showtext + + @admi.route('/import_ldap_users') @login_required @admin_required @@ -1349,47 +1417,11 @@ def import_ldap_users(): log.debug_or_exception(e) continue if user_data: - user_login_field = extract_dynamic_field_from_filter(user, config.config_ldap_user_object) - - username = user_data[user_login_field][0].decode('utf-8') - # check for duplicate username - if ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == username.lower()).first(): - # if ub.session.query(ub.User).filter(ub.User.nickname == username).first(): - log.warning("LDAP User %s Already in Database", user_data) - continue - - kindlemail = '' - if 'mail' in user_data: - useremail = user_data['mail'][0].decode('utf-8') - if len(user_data['mail']) > 1: - kindlemail = user_data['mail'][1].decode('utf-8') - - else: - log.debug('No Mail Field Found in LDAP Response') - useremail = username + '@email.com' - # check for duplicate email - if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first(): - log.warning("LDAP Email %s Already in Database", user_data) - continue - content = ub.User() - content.nickname = username - content.password = '' # dummy password which will be replaced by ldap one - content.email = useremail - content.kindle_mail = kindlemail - content.role = config.config_default_role - content.sidebar_view = config.config_default_show - content.allowed_tags = config.config_allowed_tags - content.denied_tags = config.config_denied_tags - content.allowed_column_value = config.config_allowed_column_value - content.denied_column_value = config.config_denied_column_value - ub.session.add(content) - try: - ub.session.commit() - imported += 1 - except Exception as e: - log.warning("Failed to create LDAP user: %s - %s", user, e) - ub.session.rollback() - showtext['text'] = _(u'Failed to Create at Least One LDAP User') + success, txt = create_ldap_user(user, user_data, config) + # In case of error store text for showing it + if txt: + showtext['text'] = txt + imported += success else: log.warning("LDAP User: %s Not Found", user) showtext['text'] = _(u'At Least One LDAP User Not Found in Database')