Source code for galaxy.auth.util

import errno
import logging
from collections import namedtuple

import galaxy.auth.providers
from galaxy.exceptions import Conflict
from galaxy.security.validate_user_input import validate_publicname
from galaxy.util import (
    parse_xml,
    parse_xml_string,
    plugin_config,
    string_as_bool,
)

log = logging.getLogger(__name__)

AUTH_CONF_XML = """<?xml version="1.0"?>
<auth>
    <authenticator>
        <type>localdb</type>
        <options>
            <allow-password-change>true</allow-password-change>
        </options>
    </authenticator>
</auth>
"""

Authenticator = namedtuple("Authenticator", ["plugin", "filter_template", "options"])


[docs]def get_authenticators(auth_config_file, auth_config_file_set): __plugins_dict = plugin_config.plugins_dict(galaxy.auth.providers, "plugin_type") # parse XML try: ct = parse_xml(auth_config_file) conf_root = ct.getroot() except OSError as exc: if exc.errno == errno.ENOENT and not auth_config_file_set: conf_root = parse_xml_string(AUTH_CONF_XML) else: raise authenticators = [] # process authenticators for auth_elem in conf_root: type_elem_text = auth_elem.find("type").text plugin_class = __plugins_dict.get(type_elem_text) if not plugin_class: raise Exception( f"Authenticator type '{type_elem_text}' not recognized, should be one of {', '.join(__plugins_dict)}" ) plugin = plugin_class() # check filterelem filter_elem = auth_elem.find("filter") if filter_elem is not None: filter_template = str(filter_elem.text) else: filter_template = None # extract options options_elem = auth_elem.find("options") options = {} if options_elem is not None: for opt in options_elem: options[opt.tag] = opt.text authenticator = Authenticator( plugin=plugin, filter_template=filter_template, options=options, ) authenticators.append(authenticator) return authenticators
[docs]def parse_auth_results(trans, auth_results, options): auth_return = {} auth_result, auto_email, auto_username = auth_results[:3] auto_username = str(auto_username).lower() # make username unique max_retries = int(options.get("max-retries", "10")) try_number = 0 while try_number <= max_retries: if try_number == 0: test_name = auto_username else: test_name = f"{auto_username}-{try_number}" validate_result = validate_publicname(trans, test_name) if validate_result == "": auto_username = test_name break else: log.debug(f"Invalid username '{auto_username}': {validate_result}") try_number += 1 else: raise Conflict("Cannot make unique username") log.debug(f"Email: {auto_email}, auto-register with username: {auto_username}") auth_return["auto_reg"] = string_as_bool(options.get("auto-register", False)) auth_return["email"] = auto_email auth_return["username"] = auto_username auth_return["auto_create_roles"] = string_as_bool(options.get("auto-create-roles", False)) auth_return["auto_create_groups"] = string_as_bool(options.get("auto-create-groups", False)) auth_return["auto_assign_roles_to_groups_only"] = string_as_bool( options.get("auto-assign-roles-to-groups-only", False) ) if len(auth_results) == 4: auth_return["attributes"] = auth_results[3] return auth_return