Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.authnz.managers

from __future__ import annotations

import builtins
import logging
from typing import (
    Optional,
    TYPE_CHECKING,
    TypedDict,
)

import jwt as pyjwt
from social_core.exceptions import (
    AuthAlreadyAssociated,
    AuthCanceled,
    AuthForbidden,
    AuthTokenError,
)
from social_core.utils import module_member

from galaxy import (
    exceptions,
    model,
)
from galaxy.model import UserAuthnzToken
from galaxy.util import (
    asbool,
    etree,
    listify,
    parse_xml,
    string_as_bool,
    unicodify,
)
from galaxy.util.resources import (
    as_file,
    resource_path,
)
from .psa_authnz import (
    BACKENDS,
    BACKENDS_NAME,
    PSAAuthnz,
)

if TYPE_CHECKING:
    from galaxy.managers.context import ProvidesAppContext
    from galaxy.webapps.base.webapp import GalaxyWebTransaction

OIDC_BACKEND_SCHEMA = resource_path(__name__, "xsd/oidc_backends_config.xsd")

log = logging.getLogger(__name__)

# Note: This if for backward compatibility. Icons can be specified in oidc_backends_config.xml.
DEFAULT_OIDC_IDP_ICONS = {
    "google": "https://developers.google.com/identity/images/btn_google_signin_light_normal_web.png",
    "elixir": "https://lifescience-ri.eu/fileadmin/lifescience-ri/media/Images/button-login-small.png",
    "okta": "https://www.okta.com/sites/all/themes/Okta/images/blog/Logos/Okta_Logo_BrightBlue_Medium.png",
}


[docs] class RefreshResult(TypedDict): refreshed: bool reauthentication_required: bool
[docs] class AuthnzManager:
[docs] def __init__(self, app, oidc_config_file, oidc_backends_config_file): """ :type app: galaxy.app.UniverseApplication :param app: :type config: string :param config: sets the path for OIDC configuration file (e.g., oidc_backends_config.xml). """ self.app = app self.allowed_idps = None self._parse_oidc_config(oidc_config_file) self._parse_oidc_backends_config(oidc_backends_config_file)
def _parse_oidc_config(self, config_file): self.oidc_config = {} try: tree = parse_xml(config_file) root = tree.getroot() if root.tag != "OIDC": raise exceptions.ConfigurationError( "The root element in OIDC_Config xml file is expected to be `OIDC`, " f"found `{root.tag}` instead -- unable to continue." ) for child in root: if child.tag != "Setter": log.error( "Expect a node with `Setter` tag, found a node with `%s` tag instead; skipping this node.", child.tag, ) continue if "Property" not in child.attrib or "Value" not in child.attrib or "Type" not in child.attrib: log.error( "Could not find the node attributes `Property` and/or `Value` and/or `Type`;" f" found these attributes: `{child.attrib}`; skipping this node." ) continue try: if child.get("Type") == "bool": func = string_as_bool else: func = getattr(builtins, child.get("Type")) except AttributeError: log.error( "The value of attribute `Type`, `%s`, is not a valid built-in type; skipping this node", child.get("Type"), ) continue self.oidc_config[child.get("Property")] = func(child.get("Value")) except ImportError: raise except (etree.ParseError, exceptions.ConfigurationError) as e: raise exceptions.ConfigurationError(f"Invalid configuration at `{config_file}`: {e} -- unable to continue.") def _get_idp_icon(self, idp): return self.oidc_backends_config[idp].get("icon") or DEFAULT_OIDC_IDP_ICONS.get(idp) def _get_idp_button_text(self, idp): return self.oidc_backends_config[idp].get("custom_button_text") def _parse_oidc_backends_config(self, config_file): self.oidc_backends_config = {} self.oidc_backends_implementation = {} try: with as_file(OIDC_BACKEND_SCHEMA) as oidc_backend_schema_path: tree = parse_xml(config_file, schemafname=oidc_backend_schema_path) root = tree.getroot() if root.tag != "OIDC": raise exceptions.ConfigurationError( "The root element in OIDC config xml file is expected to be `OIDC`, " f"found `{root.tag}` instead -- unable to continue." ) for child in root: if child.tag != "provider": log.error( "Expect a node with `provider` tag, found a node with `%s` tag instead; skipping the node.", child.tag, ) continue if "name" not in child.attrib: log.error(f"Could not find a node attribute 'name'; skipping the node '{child.tag}'.") continue idp = child.get("name").lower() if idp in BACKENDS_NAME: self.oidc_backends_config[idp] = self._parse_idp_config(child) self.oidc_backends_implementation[idp] = "psa" self.app.config.oidc[idp] = { "icon": self._get_idp_icon(idp), "custom_button_text": self._get_idp_button_text(idp), } else: raise exceptions.ConfigurationError("Unknown provider specified") if "end_user_registration_endpoint" in self.oidc_backends_config[idp]: self.app.config.oidc[idp]["end_user_registration_endpoint"] = self.oidc_backends_config[idp][ "end_user_registration_endpoint" ] if "profile_url" in self.oidc_backends_config[idp]: self.app.config.oidc[idp]["profile_url"] = self.oidc_backends_config[idp]["profile_url"] if len(self.oidc_backends_config) == 0: raise exceptions.ConfigurationError("No valid provider configuration parsed.") # Force-import each configured backend so missing conditional # dependencies (e.g. pkce) fail Galaxy startup with an actionable # error instead of surfacing as a silent 401 on first OIDC login # (issue #22502). for idp in self.oidc_backends_config: try: module_member(BACKENDS[idp]) except ImportError as e: raise exceptions.ConfigurationError( f"Failed to import OIDC backend for provider '{idp}' " f"({BACKENDS[idp]}): {e}. This typically indicates a " "missing conditional dependency; re-run Galaxy's " "common_startup or install requirements from " "lib/galaxy/dependencies/conditional-requirements.txt." ) except ImportError: raise except (etree.ParseError, exceptions.ConfigurationError) as e: raise exceptions.ConfigurationError(f"Invalid configuration at `{config_file}`: {e} -- unable to continue.") def _parse_idp_config(self, config_xml): rtv = { "client_id": config_xml.find("client_id").text, "client_secret": config_xml.find("client_secret").text, "redirect_uri": config_xml.find("redirect_uri").text, "enable_idp_logout": asbool(config_xml.findtext("enable_idp_logout", "false")), } if config_xml.find("label") is not None: rtv["label"] = config_xml.find("label").text if config_xml.find("require_create_confirmation") is not None: rtv["require_create_confirmation"] = asbool(config_xml.find("require_create_confirmation").text) if config_xml.find("require_session_refresh") is not None: rtv["require_session_refresh"] = asbool(config_xml.find("require_session_refresh").text) if config_xml.find("prompt") is not None: rtv["prompt"] = config_xml.find("prompt").text if config_xml.find("api_url") is not None: rtv["api_url"] = config_xml.find("api_url").text if config_xml.find("url") is not None: rtv["url"] = config_xml.find("url").text if config_xml.find("icon") is not None: rtv["icon"] = config_xml.find("icon").text if config_xml.find("extra_scopes") is not None: rtv["extra_scopes"] = listify(config_xml.find("extra_scopes").text) if config_xml.find("tenant_id") is not None: rtv["tenant_id"] = config_xml.find("tenant_id").text if config_xml.find("oidc_endpoint") is not None: rtv["oidc_endpoint"] = config_xml.find("oidc_endpoint").text if config_xml.find("custom_button_text") is not None: rtv["custom_button_text"] = config_xml.find("custom_button_text").text if config_xml.find("pkce_support") is not None: rtv["pkce_support"] = asbool(config_xml.find("pkce_support").text) if config_xml.find("accepted_audiences") is not None: rtv["accepted_audiences"] = config_xml.find("accepted_audiences").text if config_xml.find("username_key") is not None: rtv["username_key"] = config_xml.find("username_key").text if config_xml.find("end_user_registration_endpoint") is not None: rtv["end_user_registration_endpoint"] = config_xml.find("end_user_registration_endpoint").text if config_xml.find("profile_url") is not None: rtv["profile_url"] = config_xml.find("profile_url").text if config_xml.find("domain") is not None: rtv["domain"] = config_xml.find("domain").text # this is a EGI Check-in specific config if config_xml.find("checkin_env") is not None: rtv["checkin_env"] = config_xml.find("checkin_env").text # Keycloak/CILogon IDP hint: tells Keycloak which federated IdP to redirect # to directly (kc_idp_hint), bypassing the Keycloak login page. # Corresponds to <idphint> in oidc_backends_config.xml (already in XSD). if config_xml.find("idphint") is not None: rtv["idphint"] = config_xml.find("idphint").text return rtv def _parse_custos_config(self, config_xml): rtv = { "url": config_xml.find("url").text, "client_id": config_xml.find("client_id").text, "client_secret": config_xml.find("client_secret").text, "redirect_uri": config_xml.find("redirect_uri").text, "enable_idp_logout": asbool(config_xml.findtext("enable_idp_logout", "false")), } if config_xml.find("credential_url") is not None: rtv["credential_url"] = config_xml.find("credential_url").text if config_xml.find("well_known_oidc_config_uri") is not None: rtv["well_known_oidc_config_uri"] = config_xml.find("well_known_oidc_config_uri").text if config_xml.findall("allowed_idp") is not None: self.allowed_idps = [idp.text for idp in config_xml.findall("allowed_idp")] if config_xml.find("ca_bundle") is not None: rtv["ca_bundle"] = config_xml.find("ca_bundle").text return rtv
[docs] def get_allowed_idps(self): # None, if no allowed idp list is set, and a list of EntityIDs if configured (in oidc_backend) return self.allowed_idps
def _unify_provider_name(self, provider: str) -> str | None: if provider.lower() in self.oidc_backends_config: return provider.lower() for k, v in BACKENDS_NAME.items(): if v == provider: return k.lower() return None def _get_authnz_backend(self, provider: str, idphint: str | None = None) -> tuple[bool, str, PSAAuthnz | None]: unified_provider_name = self._unify_provider_name(provider) if unified_provider_name is not None and unified_provider_name in self.oidc_backends_config: provider = unified_provider_name identity_provider_class = self._get_identity_provider_factory(self.oidc_backends_implementation[provider]) try: # All providers now use PSA implementation return ( True, "", identity_provider_class( unified_provider_name, self.oidc_config, self.oidc_backends_config[unified_provider_name], self.app.config, ), ) except Exception as e: log.exception(f"An error occurred when loading {identity_provider_class.__name__}") return False, unicodify(e), None else: msg = f"The requested identity provider, `{provider}`, is not a recognized/expected provider." log.debug(msg) return False, msg, None @staticmethod def _get_identity_provider_factory(implementation): if implementation == "psa": return PSAAuthnz else: return None
[docs] @staticmethod def can_user_assume_authn(trans, authn_id): qres = trans.sa_session.query(model.UserAuthnzToken).get(authn_id) if qres is None: msg = f"Authentication record with the given `authn_id` (`{trans.security.encode_id(authn_id)}`) not found." log.debug(msg) raise exceptions.ObjectNotFound(msg) if qres.user_id != trans.user.id: msg = ( f"The request authentication with ID `{trans.security.encode_id(authn_id)}` is not accessible to user with ID " f"`{trans.security.encode_id(trans.user.id)}`." ) log.warning(msg) raise exceptions.ItemAccessibilityException(msg)
[docs] def refresh_expiring_oidc_tokens_for_provider( self, trans: GalaxyWebTransaction, auth: UserAuthnzToken ) -> RefreshResult: """ Refresh expiring OIDC tokens for a specific provider. Returns: RefreshResult: A dictionary containing a boolean indicating success, and a boolean indicating if reauthentication is required """ try: if auth.provider is None: raise exceptions.AuthenticationFailed("Provider is not set") success, message, backend = self._get_authnz_backend(auth.provider) if backend is None: msg = f"Provider `{auth.provider}` not found" log.error(msg) return {"refreshed": False, "reauthentication_required": False} if success is False: msg = f"An error occurred when refreshing user token on `{auth.provider}` identity provider: {message}" log.error(msg) return {"refreshed": False, "reauthentication_required": False} refreshed = backend.refresh(trans, auth) if refreshed: log.debug(f"Refreshed user token via `{auth.provider}` identity provider") return {"refreshed": refreshed, "reauthentication_required": False} except (AuthTokenError, AuthCanceled, AuthForbidden): log.warning("Authentication session has expired or is invalid, reauth required.") return {"refreshed": False, "reauthentication_required": True} except Exception as e: log.warning(f"An error occurred when refreshing user token: {e}") return {"refreshed": False, "reauthentication_required": False}
[docs] def refresh_expiring_oidc_tokens( self, trans: GalaxyWebTransaction, user: Optional[model.User] = None ) -> str | None: """ Refresh expiring OIDC tokens for all providers associated with a user. Returns: str | None: The provider name if refresh fails and require_session_refresh is enabled, otherwise None """ user = trans.user or user if not isinstance(user, model.User): return None for auth in user.social_auth or []: result = self.refresh_expiring_oidc_tokens_for_provider(trans, auth) if auth.provider is None: continue provider = self._unify_provider_name(auth.provider) if provider is None: continue config = self.oidc_backends_config.get(provider, None) if config is None: continue # Redirect to OIDC login if refresh fails and require_session_refresh is enabled if config.get("require_session_refresh") and result["reauthentication_required"]: return provider return None
[docs] def authenticate( self, provider: str, trans: GalaxyWebTransaction, idphint: str | None = None ) -> tuple[bool, str, str | None]: """ :type provider: string :param provider: set the name of the identity provider to be used for authentication flow. :type trans: GalaxyWebTransaction :param trans: Galaxy web transaction. :return: an identity provider specific authentication redirect URI. """ try: success, message, backend = self._get_authnz_backend(provider, idphint=idphint) if backend is None: return False, f"Provider `{provider}` not found", None if success is False: return False, message, None # Check allowed IDPs for providers that support idphint (keycloak, cilogon) if idphint and self.allowed_idps and (idphint not in self.allowed_idps): msg = f"An error occurred when authenticating a user. Invalid EntityID: `{idphint}`" log.exception(msg) return False, msg, None redirect = backend.authenticate(trans, idphint) return ( True, f"Redirecting to the `{provider}` identity provider for authentication", redirect.url, ) except Exception: msg = f"An error occurred when authenticating a user on `{provider}` identity provider" log.exception(msg) return False, msg, None
[docs] def callback(self, provider, state_token, authz_code, trans, login_redirect_url, idphint=None): try: success, message, backend = self._get_authnz_backend(provider, idphint=idphint) if success is False: return False, message, (None, None) return success, message, backend.callback(state_token, authz_code, trans, login_redirect_url) except exceptions.AuthenticationFailed: raise except AuthCanceled: msg = f"Authentication with `{provider}` was canceled or the authorization code has expired. Please try logging in again." log.warning(msg) return False, msg, (None, None) except AuthAlreadyAssociated: msg = ( f"The account from `{provider}` is already linked to a different Galaxy user. " "Please log in to the Galaxy account that is already linked to this identity, " "or use a different identity provider account." ) log.warning(msg) return False, msg, (None, None) except AuthTokenError: msg = ( f"Authentication session with `{provider}` has expired or is invalid. " "This can happen when using multiple browser tabs during login. " "Please close other login tabs and try again." ) log.warning(msg) return False, msg, (None, None) except Exception: msg = f"An error occurred when handling callback from `{provider}` identity provider. Please contact an administrator for assistance." log.exception(msg) return False, msg, (None, None)
[docs] def create_user(self, provider: str, token: str, trans: ProvidesAppContext, login_redirect_url: str): try: success, message, backend = self._get_authnz_backend(provider) if backend is None: raise ValueError(f"Provider `{provider}` not found") if success is False: return False, message, (None, None) return success, message, backend.create_user(token, trans, login_redirect_url) except exceptions.AuthenticationFailed: log.exception("Error creating user") raise except Exception: msg = f"An error occurred when creating a user with `{provider}` identity provider. Please contact an administrator for assistance." log.exception(msg) return False, msg, (None, None)
def _assert_jwt_contains_scopes(self, user, jwt, required_scopes): if not jwt: raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} does not have the required scopes: [{required_scopes}]" ) scopes = jwt.get("scope") or "" if not set(required_scopes).issubset(scopes.split(" ")): raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} has JWT with scopes: [{scopes}] but not required scopes: [{required_scopes}]" ) def _validate_permissions(self, user, jwt): required_scopes = [f"{self.app.config.oidc_scope_prefix}:*"] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def _match_access_token_to_user_in_provider(self, sa_session, provider, access_token): try: success, message, backend = self._get_authnz_backend(provider) if success is False: msg = f"An error occurred when obtaining user by token with provider `{provider}`: {message}" log.error(msg) return None user, jwt = None, None try: user, jwt = backend.decode_user_access_token(sa_session, access_token) except pyjwt.exceptions.InvalidTokenError as e: log.warning("Could not decode access token: %s", e) raise exceptions.AuthenticationFailed(err_msg="Invalid access token.") except Exception: log.exception("Could not decode access token") raise exceptions.AuthenticationFailed(err_msg="Invalid access token or an unexpected error occurred.") if user and jwt: self._validate_permissions(user, jwt) return user elif not user and jwt: # jwt was decoded, but no user could be matched raise exceptions.AuthenticationFailed( err_msg="Cannot locate user by access token. The user should log into Galaxy at least once with this OIDC provider." ) # Both jwt and user are empty, which means that this provider can't process this access token return None except NotImplementedError: return None
[docs] def match_access_token_to_user(self, sa_session, access_token): for provider in self.oidc_backends_config: user = self._match_access_token_to_user_in_provider(sa_session, provider, access_token) if user: return user return None
[docs] def logout(self, provider, trans, post_user_logout_href=None): """ Log the user out of the identity provider. :type provider: string :param provider: set the name of the identity provider. :type trans: GalaxyWebTransaction :param trans: Galaxy web transaction. :type post_user_logout_href: string :param post_user_logout_href: (Optional) URL for identity provider to redirect to after logging user out. :return: a tuple (success boolean, message, redirect URI) """ try: # check if logout is enabled for this idp and return false if not unified_provider_name = self._unify_provider_name(provider) if self.oidc_backends_config[unified_provider_name]["enable_idp_logout"] is False: return False, f"IDP logout is not enabled for {provider}", None success, message, backend = self._get_authnz_backend(provider) if success is False: return False, message, None return True, message, backend.logout(trans, post_user_logout_href) except Exception: msg = f"An error occurred when logging out from `{provider}` identity provider. Please contact an administrator for assistance." log.exception(msg) return False, msg, None
[docs] def disconnect(self, provider, trans, email=None, disconnect_redirect_url=None, idphint=None): try: success, message, backend = self._get_authnz_backend(provider, idphint=idphint) if success is False: return False, message, None return backend.disconnect(provider, trans, disconnect_redirect_url, email=email) except Exception: msg = f"An error occurred when disconnecting authentication with `{provider}` identity provider for user `{trans.user.username}`" log.exception(msg) return False, msg, None