import errno
import logging
from collections import namedtuple
import galaxy.auth.providers
from galaxy.exceptions import Conflict
from galaxy.security.validate_user_input import validate_publicname
from galaxy.util import (
parse_xml,
parse_xml_string,
plugin_config,
string_as_bool,
)
log = logging.getLogger(__name__)
AUTH_CONF_XML = """<?xml version="1.0"?>
<auth>
<authenticator>
<type>localdb</type>
<options>
<allow-password-change>true</allow-password-change>
</options>
</authenticator>
</auth>
"""
Authenticator = namedtuple("Authenticator", ["plugin", "filter_template", "options"])
[docs]def get_authenticators(auth_config_file, auth_config_file_set):
__plugins_dict = plugin_config.plugins_dict(galaxy.auth.providers, "plugin_type")
# parse XML
try:
ct = parse_xml(auth_config_file)
conf_root = ct.getroot()
except OSError as exc:
if exc.errno == errno.ENOENT and not auth_config_file_set:
conf_root = parse_xml_string(AUTH_CONF_XML)
else:
raise
authenticators = []
# process authenticators
for auth_elem in conf_root:
type_elem_text = auth_elem.find("type").text
plugin_class = __plugins_dict.get(type_elem_text)
if not plugin_class:
raise Exception(
f"Authenticator type '{type_elem_text}' not recognized, should be one of {', '.join(__plugins_dict)}"
)
plugin = plugin_class()
# check filterelem
filter_elem = auth_elem.find("filter")
if filter_elem is not None:
filter_template = str(filter_elem.text)
else:
filter_template = None
# extract options
options_elem = auth_elem.find("options")
options = {}
if options_elem is not None:
for opt in options_elem:
options[opt.tag] = opt.text
authenticator = Authenticator(
plugin=plugin,
filter_template=filter_template,
options=options,
)
authenticators.append(authenticator)
return authenticators
[docs]def parse_auth_results(trans, auth_results, options):
auth_return = {}
auth_result, auto_email, auto_username = auth_results[:3]
auto_username = str(auto_username).lower()
# make username unique
max_retries = int(options.get("max-retries", "10"))
try_number = 0
while try_number <= max_retries:
if try_number == 0:
test_name = auto_username
else:
test_name = f"{auto_username}-{try_number}"
validate_result = validate_publicname(trans, test_name)
if validate_result == "":
auto_username = test_name
break
else:
log.debug(f"Invalid username '{auto_username}': {validate_result}")
try_number += 1
else:
raise Conflict("Cannot make unique username")
log.debug(f"Email: {auto_email}, auto-register with username: {auto_username}")
auth_return["auto_reg"] = string_as_bool(options.get("auto-register", False))
auth_return["email"] = auto_email
auth_return["username"] = auto_username
auth_return["auto_create_roles"] = string_as_bool(options.get("auto-create-roles", False))
auth_return["auto_create_groups"] = string_as_bool(options.get("auto-create-groups", False))
auth_return["auto_assign_roles_to_groups_only"] = string_as_bool(
options.get("auto-assign-roles-to-groups-only", False)
)
if len(auth_results) == 4:
auth_return["attributes"] = auth_results[3]
return auth_return