"""
Contains implementations of the authentication logic.
"""
import logging
from galaxy.auth.util import (
get_authenticators,
parse_auth_results,
)
from galaxy.exceptions import Conflict
from galaxy.util import string_as_bool
log = logging.getLogger(__name__)
[docs]class AuthManager:
[docs] def __init__(self, config):
self.redact_username_in_logs = config.redact_username_in_logs
self.authenticators = get_authenticators(config.auth_config_file, config.is_set("auth_config_file"))
[docs] def check_registration_allowed(self, email, username, password, request):
"""Checks if the provided email/username is allowed to register."""
message = ""
status = "done"
for provider, options in self.active_authenticators(email, username, password):
allow_reg = _get_allow_register(options)
if allow_reg == "challenge":
auth_results = provider.authenticate(email, username, password, options, request)
if auth_results[0] is True:
break
if auth_results[0] is None:
message = "Invalid email address/username or password."
status = "error"
break
elif allow_reg is True:
break
elif allow_reg is False:
message = "Account registration not required for your account. Please simply login."
status = "error"
break
return message, status
[docs] def check_auto_registration(self, trans, login, password, no_password_check=False):
"""
Checks the username/email & password using auth providers in order.
If a match is found, returns the 'auto-register' option for that provider.
"""
if "@" in login:
email = login
username = None
else:
email = None
username = login
auth_return = {"auto_reg": False, "email": "", "username": ""}
for provider, options in self.active_authenticators(email, username, password):
if provider is None:
log.debug(f"Unable to find module: {options}")
else:
options["no_password_check"] = no_password_check
auth_results = provider.authenticate(email, username, password, options, trans.request)
if auth_results[0] is True:
try:
auth_return = parse_auth_results(trans, auth_results, options)
except Conflict as conflict:
log.exception(conflict)
raise
return auth_return
elif auth_results[0] is None:
log.debug("Login: '%s', stopping due to failed non-continue", login)
break # end authentication (skip rest)
return auth_return
[docs] def check_password(self, user, password, request):
"""Checks the username/email and password using auth providers."""
for provider, options in self.active_authenticators(user.email, user.username, password):
if provider is None:
log.debug(f"Unable to find module: {options}")
else:
auth_result = provider.authenticate_user(user, password, options, request)
if auth_result is True:
return True # accept user
elif auth_result is None:
break # end authentication (skip rest)
return False
[docs] def check_change_password(self, user, current_password, request):
"""Checks that auth provider allows password changes and current_password
matches.
"""
for provider, options in self.active_authenticators(user.email, user.username, current_password):
if provider is None:
log.debug(f"Unable to find module: {options}")
else:
auth_result = provider.authenticate_user(user, current_password, options, request)
if auth_result is True:
if string_as_bool(options.get("allow-password-change", False)):
return
else:
return "Password change not supported."
elif auth_result is None:
break # end authentication (skip rest)
return "Invalid current password."
[docs] def active_authenticators(self, email, username, password):
"""Yields AuthProvider instances for the provided configfile that match the
filters.
"""
try:
for authenticator in self.authenticators:
filter_template = authenticator.filter_template
if filter_template:
filter_str = filter_template.format(email=email, username=username, password=password)
passed_filter = eval(filter_str, {"__builtins__": None}, {"str": str})
if not passed_filter:
continue # skip to next
options = authenticator.options
options["redact_username_in_logs"] = self.redact_username_in_logs
yield authenticator.plugin, options
except Exception:
log.exception("Active Authenticators Failure")
raise
def _get_allow_register(d):
s = d.get("allow-register", True)
if (lower_s := str(s).lower()) == "challenge":
return lower_s
else:
return string_as_bool(s)