Source code for galaxy.webapps.galaxy.controllers.authnz

"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""

import datetime
import json
import logging

import jwt

from galaxy import (
    exceptions,
    web,
)
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import JSAppLauncher

log = logging.getLogger(__name__)

PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"


[docs]class OIDC(JSAppLauncher):
[docs] @web.json @web.expose @web.require_login("list third-party identities") def index(self, trans, **kwargs): """ GET /authnz/ returns a list of third-party identities associated with the user. :type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction :param trans: Galaxy web transaction. :param kwargs: empty dict :rtype: list of dicts :return: a list of third-party identities associated with the user account. """ rtv = [] for authnz in trans.user.social_auth: rtv.append( {"id": trans.app.security.encode_id(authnz.id), "provider": authnz.provider, "email": authnz.uid} ) # Add cilogon and custos identities for token in trans.user.custos_auth: # for purely displaying the info to user, we bypass verification of # signature, audience, and expiration as that's potentially useful # information to share with the end user try: userinfo = jwt.decode( token.id_token, options={"verify_signature": False, "verify_aud": False, "verify_exp": False} ) rtv.append( { "id": trans.app.security.encode_id(token.id), "provider": token.provider, "email": userinfo["email"], "expiration": str(datetime.datetime.utcfromtimestamp(userinfo["exp"])), } ) except Exception: rtv.append( { "id": trans.app.security.encode_id(token.id), "provider": token.provider, "error": "Unable to decode token", } ) return rtv
[docs] @web.json @web.expose def login(self, trans, provider, idphint=None): if not trans.app.config.enable_oidc: msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance." log.debug(msg) return trans.show_error_message(msg) success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint=idphint) if success: return {"redirect_uri": redirect_uri} else: raise exceptions.AuthenticationFailed(message)
[docs] @web.expose def callback(self, trans, provider, idphint=None, **kwargs): user = trans.user.username if trans.user is not None else "anonymous" if not bool(kwargs): log.error(f"OIDC callback received no data for provider `{provider}` and user `{user}`") return trans.show_error_message( "Did not receive any information from the `{}` identity provider to complete user `{}` authentication " "flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note " "that this endpoint is to receive authentication callbacks only, and should not be called/reached by " "a user.".format(provider, user) ) if "error" in kwargs: log.error( "Error handling authentication callback from `{}` identity provider for user `{}` login request." " Error message: {}".format(provider, user, kwargs.get("error", "None")) ) return trans.show_error_message( f"Failed to handle authentication callback from {provider}. " "Please try again, and if the problem persists, contact " "the Galaxy instance admin" ) try: success, message, (redirect_url, user) = trans.app.authnz_manager.callback( provider, kwargs.get("state", " "), kwargs["code"], trans, login_redirect_url=url_for("/"), idphint=idphint, ) except exceptions.AuthenticationFailed: raise if success is False: return trans.show_error_message(message) if "?confirm" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) if "?connect_external_provider" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) elif redirect_url is None: redirect_url = url_for("/") user = user if user is not None else trans.user if user is None: return trans.show_error_message( "An unknown error occurred when handling the callback from `{}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin.".format(provider) ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose def create_user(self, trans, provider, **kwargs): try: success, message, (redirect_url, user) = trans.app.authnz_manager.create_user( provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/") ) except exceptions.AuthenticationFailed as e: return trans.response.send_redirect( f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}" ) if success is False: return trans.show_error_message(message) user = user if user is not None else trans.user if user is None: return trans.show_error_message( "An unknown error occurred when handling the callback from `{}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin.".format(provider) ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose @web.require_login("authenticate against the selected identity provider") def disconnect(self, trans, provider, email=None, **kwargs): if trans.user is None: # Only logged in users are allowed here. return success, message, redirect_url = trans.app.authnz_manager.disconnect( provider, trans, email, disconnect_redirect_url=url_for("/") ) if success is False: return trans.show_error_message(message) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(redirect_url)
[docs] @web.json @web.expose def logout(self, trans, provider, **kwargs): post_user_logout_href = trans.app.config.post_user_logout_href if post_user_logout_href is not None: post_user_logout_href = trans.request.base + url_for(post_user_logout_href) success, message, redirect_uri = trans.app.authnz_manager.logout( provider, trans, post_user_logout_href=post_user_logout_href ) trans.handle_user_logout() if success: return {"redirect_uri": redirect_uri} else: return {"message": message}
[docs] @web.expose def get_logout_url(self, trans, provider=None, **kwargs): idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME) if idp_provider: return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs] @web.expose @web.json def get_cilogon_idps(self, trans, **kwargs): try: cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs))) except Exception as e: raise Exception(f"Invalid server response. {str(e)}.") if allowed_idps := trans.app.authnz_manager.get_allowed_idps(): validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps)) if not (len(validated_idps) == len(allowed_idps)): validated_entity_ids = [entity["EntityID"] for entity in validated_idps] for idp in allowed_idps: if idp not in validated_entity_ids: log.debug( "Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.", idp, ) return validated_idps else: return cilogon_idps