Source code for galaxy.webapps.galaxy.controllers.authnz

"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""

import datetime
import json
import logging

import jwt

from galaxy import (
    exceptions,
    web,
)
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import BaseUIController

log = logging.getLogger(__name__)

PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"
LOGIN_NEXT_COOKIE_NAME = "galaxy-oidc-login-next"


[docs] class OIDC(BaseUIController):
[docs] @web.json @web.expose @web.require_login("list third-party identities") def index(self, trans, **kwargs): """ GET /authnz/ returns a list of third-party identities associated with the user. :type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction :param trans: Galaxy web transaction. :param kwargs: empty dict :rtype: list of dicts :return: a list of third-party identities associated with the user account. """ rtv = [] # Process PSA tokens (unified authentication system) for authnz in trans.user.social_auth: token_info = { "id": trans.app.security.encode_id(authnz.id), "provider": authnz.provider, "email": authnz.uid, } # Add provider label if available provider_label = trans.app.authnz_manager.oidc_backends_config.get(authnz.provider, {}).get( "label", authnz.provider ) token_info["provider_label"] = provider_label # Try to extract expiration from id_token if available if authnz.extra_data and "id_token" in authnz.extra_data: try: userinfo = jwt.decode( authnz.extra_data["id_token"], options={"verify_signature": False, "verify_aud": False, "verify_exp": False}, ) if "exp" in userinfo: token_info["expiration"] = str(datetime.datetime.utcfromtimestamp(userinfo["exp"])) # Update email from token if available and different if "email" in userinfo: token_info["email"] = userinfo["email"] except Exception: # If token decoding fails, continue without expiration info pass rtv.append(token_info) return rtv
[docs] @web.json @web.expose def login(self, trans, provider, idphint=None, next=None, redirect=None): if not trans.app.config.enable_oidc: msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance." log.debug(msg) return trans.show_error_message(msg) if next: trans.set_cookie(value=next, name=LOGIN_NEXT_COOKIE_NAME, age=1) else: # If no next parameter is provided, ensure we unset any existing next cookie. trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME) success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint) if success: if redirect and redirect.lower() == "true": return trans.response.send_redirect(redirect_uri) else: return {"redirect_uri": redirect_uri} else: raise exceptions.AuthenticationFailed(message)
[docs] @web.expose def callback(self, trans, provider, idphint=None, **kwargs): user = trans.user.username if trans.user is not None else "anonymous" login_next_cookie = trans.get_cookie(name=LOGIN_NEXT_COOKIE_NAME) if login_next_cookie and login_next_cookie != "None": # This cookie can sometimes be set to a literal string 'None', which we don't want to use as a redirect. login_next = url_for(login_next_cookie) else: # Fallback to default redirect if no login_next cookie is found. login_next = url_for("/") if not bool(kwargs): log.warning(f"OIDC callback received no data for provider `{provider}` and user `{user}`") return trans.show_error_message( f"Did not receive any information from the `{provider}` identity provider to complete user `{user}` authentication " "flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note " "that this endpoint is to receive authentication callbacks only, and should not be called/reached by " "a user." ) if "error" in kwargs: log.warning( "Error handling authentication callback from `{}` identity provider for user `{}` login request." " Error message: {}".format(provider, user, kwargs.get("error", "None")) ) error_description = kwargs.get("error_description") if error_description: error_msg = error_description else: error_msg = ( f"Failed to handle authentication callback from {provider}. " "Please try again, and if the problem persists, contact " "the Galaxy instance admin." ) redirect_to = trans.url_builder("/login/start", message=error_msg, status="danger") return trans.response.send_redirect(redirect_to) try: success, message, (redirect_url, user) = trans.app.authnz_manager.callback( provider, kwargs.get("state", " "), kwargs["code"], trans, login_redirect_url=login_next, idphint=idphint, ) except exceptions.AuthenticationFailed: raise if success is False: return trans.show_error_message(message) if "?confirm" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) if "?connect_external_provider" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) elif redirect_url is None: redirect_url = url_for("/") user = user if user is not None else trans.user if user is None: return trans.show_error_message( f"An unknown error occurred when handling the callback from `{provider}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin." ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) # Clear the login next cookie back to default. trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME) return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose def create_user(self, trans, provider, **kwargs): try: success, message, (redirect_url, user) = trans.app.authnz_manager.create_user( provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/") ) except exceptions.AuthenticationFailed as e: return trans.response.send_redirect( f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}" ) if success is False: return trans.show_error_message(message) user = user if user is not None else trans.user if user is None: return trans.show_error_message( f"An unknown error occurred when handling the callback from `{provider}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin." ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose @web.require_login("authenticate against the selected identity provider") def disconnect(self, trans, provider, email=None, **kwargs): if trans.user is None: # Only logged in users are allowed here. return success, message, redirect_url = trans.app.authnz_manager.disconnect( provider, trans, email, disconnect_redirect_url=url_for("/") ) if success is False: return trans.show_error_message(message) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(redirect_url)
[docs] @web.json @web.expose def logout(self, trans, provider, **kwargs): post_user_logout_href = trans.app.config.post_user_logout_href if post_user_logout_href is not None: post_user_logout_href = trans.request.base + url_for(post_user_logout_href) success, message, redirect_uri = trans.app.authnz_manager.logout( provider, trans, post_user_logout_href=post_user_logout_href ) trans.handle_user_logout() if success: return {"redirect_uri": redirect_uri} else: return {"message": message}
[docs] @web.expose def get_logout_url(self, trans, provider=None, **kwargs): idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME) if idp_provider: return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs] @web.expose @web.json def get_cilogon_idps(self, trans, **kwargs): try: cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs))) except Exception as e: raise Exception(f"Invalid server response. {str(e)}.") if allowed_idps := trans.app.authnz_manager.get_allowed_idps(): validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps)) if not (len(validated_idps) == len(allowed_idps)): validated_entity_ids = [entity["EntityID"] for entity in validated_idps] for idp in allowed_idps: if idp not in validated_entity_ids: log.debug( "Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.", idp, ) return validated_idps else: return cilogon_idps