Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.auth.util
import errno
import logging
from collections import namedtuple
import galaxy.auth.providers
from galaxy.exceptions import Conflict
from galaxy.security.validate_user_input import validate_publicname
from galaxy.util import (
parse_xml,
parse_xml_string,
plugin_config,
string_as_bool,
)
log = logging.getLogger(__name__)
AUTH_CONF_XML = """<?xml version="1.0"?>
<auth>
<authenticator>
<type>localdb</type>
<options>
<allow-password-change>true</allow-password-change>
</options>
</authenticator>
</auth>
"""
Authenticator = namedtuple("Authenticator", ["plugin", "filter_template", "options"])
[docs]def get_authenticators(auth_config_file, auth_config_file_set):
__plugins_dict = plugin_config.plugins_dict(galaxy.auth.providers, "plugin_type")
# parse XML
try:
ct = parse_xml(auth_config_file)
conf_root = ct.getroot()
except OSError as exc:
if exc.errno == errno.ENOENT and not auth_config_file_set:
conf_root = parse_xml_string(AUTH_CONF_XML)
else:
raise
authenticators = []
# process authenticators
for auth_elem in conf_root:
type_elem_text = auth_elem.find("type").text
plugin_class = __plugins_dict.get(type_elem_text)
if not plugin_class:
raise Exception(
f"Authenticator type '{type_elem_text}' not recognized, should be one of {', '.join(__plugins_dict)}"
)
plugin = plugin_class()
# check filterelem
filter_elem = auth_elem.find("filter")
if filter_elem is not None:
filter_template = str(filter_elem.text)
else:
filter_template = None
# extract options
options_elem = auth_elem.find("options")
options = {}
if options_elem is not None:
for opt in options_elem:
options[opt.tag] = opt.text
authenticator = Authenticator(
plugin=plugin,
filter_template=filter_template,
options=options,
)
authenticators.append(authenticator)
return authenticators
[docs]def parse_auth_results(trans, auth_results, options):
auth_return = {}
auth_result, auto_email, auto_username = auth_results[:3]
auto_username = str(auto_username).lower()
# make username unique
max_retries = int(options.get("max-retries", "10"))
try_number = 0
while try_number <= max_retries:
if try_number == 0:
test_name = auto_username
else:
test_name = f"{auto_username}-{try_number}"
validate_result = validate_publicname(trans, test_name)
if validate_result == "":
auto_username = test_name
break
else:
log.debug(f"Invalid username '{auto_username}': {validate_result}")
try_number += 1
else:
raise Conflict("Cannot make unique username")
log.debug(f"Email: {auto_email}, auto-register with username: {auto_username}")
auth_return["auto_reg"] = string_as_bool(options.get("auto-register", False))
auth_return["email"] = auto_email
auth_return["username"] = auto_username
auth_return["auto_create_roles"] = string_as_bool(options.get("auto-create-roles", False))
auth_return["auto_create_groups"] = string_as_bool(options.get("auto-create-groups", False))
auth_return["auto_assign_roles_to_groups_only"] = string_as_bool(
options.get("auto-assign-roles-to-groups-only", False)
)
if len(auth_results) == 4:
auth_return["attributes"] = auth_results[3]
return auth_return