Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.auth.util

import errno
import logging
from collections import namedtuple

import galaxy.auth.providers
from galaxy.exceptions import Conflict
from galaxy.security.validate_user_input import validate_publicname
from galaxy.util import (
    parse_xml,
    parse_xml_string,
    plugin_config,
    string_as_bool,
)

log = logging.getLogger(__name__)

AUTH_CONF_XML = """<?xml version="1.0"?>
<auth>
    <authenticator>
        <type>localdb</type>
        <options>
            <allow-password-change>true</allow-password-change>
        </options>
    </authenticator>
</auth>
"""

Authenticator = namedtuple("Authenticator", ["plugin", "filter_template", "options"])


[docs]def get_authenticators(auth_config_file, auth_config_file_set): __plugins_dict = plugin_config.plugins_dict(galaxy.auth.providers, "plugin_type") # parse XML try: ct = parse_xml(auth_config_file) conf_root = ct.getroot() except OSError as exc: if exc.errno == errno.ENOENT and not auth_config_file_set: conf_root = parse_xml_string(AUTH_CONF_XML) else: raise authenticators = [] # process authenticators for auth_elem in conf_root: type_elem_text = auth_elem.find("type").text plugin_class = __plugins_dict.get(type_elem_text) if not plugin_class: raise Exception( f"Authenticator type '{type_elem_text}' not recognized, should be one of {', '.join(__plugins_dict)}" ) plugin = plugin_class() # check filterelem filter_elem = auth_elem.find("filter") if filter_elem is not None: filter_template = str(filter_elem.text) else: filter_template = None # extract options options_elem = auth_elem.find("options") options = {} if options_elem is not None: for opt in options_elem: options[opt.tag] = opt.text authenticator = Authenticator( plugin=plugin, filter_template=filter_template, options=options, ) authenticators.append(authenticator) return authenticators
[docs]def parse_auth_results(trans, auth_results, options): auth_return = {} auth_result, auto_email, auto_username = auth_results[:3] auto_username = str(auto_username).lower() # make username unique max_retries = int(options.get("max-retries", "10")) try_number = 0 while try_number <= max_retries: if try_number == 0: test_name = auto_username else: test_name = f"{auto_username}-{try_number}" validate_result = validate_publicname(trans, test_name) if validate_result == "": auto_username = test_name break else: log.debug(f"Invalid username '{auto_username}': {validate_result}") try_number += 1 else: raise Conflict("Cannot make unique username") log.debug(f"Email: {auto_email}, auto-register with username: {auto_username}") auth_return["auto_reg"] = string_as_bool(options.get("auto-register", False)) auth_return["email"] = auto_email auth_return["username"] = auto_username auth_return["auto_create_roles"] = string_as_bool(options.get("auto-create-roles", False)) auth_return["auto_create_groups"] = string_as_bool(options.get("auto-create-groups", False)) auth_return["auto_assign_roles_to_groups_only"] = string_as_bool( options.get("auto-assign-roles-to-groups-only", False) ) if len(auth_results) == 4: auth_return["attributes"] = auth_results[3] return auth_return