Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.authnz

"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""


import datetime
import json
import logging

import jwt

from galaxy import (
    exceptions,
    web,
)
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import JSAppLauncher

log = logging.getLogger(__name__)

PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"


[docs]class OIDC(JSAppLauncher):
[docs] @web.json @web.expose @web.require_login("list third-party identities") def index(self, trans, **kwargs): """ GET /authnz/ returns a list of third-party identities associated with the user. :type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction :param trans: Galaxy web transaction. :param kwargs: empty dict :rtype: list of dicts :return: a list of third-party identities associated with the user account. """ rtv = [] for authnz in trans.user.social_auth: rtv.append( {"id": trans.app.security.encode_id(authnz.id), "provider": authnz.provider, "email": authnz.uid} ) # Add cilogon and custos identities for token in trans.user.custos_auth: # for purely displaying the info to user, we bypass verification of # signature, audience, and expiration as that's potentially useful # information to share with the end user try: userinfo = jwt.decode( token.id_token, options={"verify_signature": False, "verify_aud": False, "verify_exp": False} ) rtv.append( { "id": trans.app.security.encode_id(token.id), "provider": token.provider, "email": userinfo["email"], "expiration": str(datetime.datetime.utcfromtimestamp(userinfo["exp"])), } ) except Exception: rtv.append( { "id": trans.app.security.encode_id(token.id), "provider": token.provider, "error": "Unable to decode token", } ) return rtv
[docs] @web.json @web.expose def login(self, trans, provider, idphint=None): if not trans.app.config.enable_oidc: msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance." log.debug(msg) return trans.show_error_message(msg) success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint=idphint) if success: return {"redirect_uri": redirect_uri} else: raise exceptions.AuthenticationFailed(message)
[docs] @web.expose def callback(self, trans, provider, idphint=None, **kwargs): user = trans.user.username if trans.user is not None else "anonymous" if not bool(kwargs): log.error(f"OIDC callback received no data for provider `{provider}` and user `{user}`") return trans.show_error_message( "Did not receive any information from the `{}` identity provider to complete user `{}` authentication " "flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note " "that this endpoint is to receive authentication callbacks only, and should not be called/reached by " "a user.".format(provider, user) ) if "error" in kwargs: log.error( "Error handling authentication callback from `{}` identity provider for user `{}` login request." " Error message: {}".format(provider, user, kwargs.get("error", "None")) ) return trans.show_error_message( "Failed to handle authentication callback from {}. " "Please try again, and if the problem persists, contact " "the Galaxy instance admin".format(provider) ) try: success, message, (redirect_url, user) = trans.app.authnz_manager.callback( provider, kwargs.get("state", " "), kwargs["code"], trans, login_redirect_url=url_for("/"), idphint=idphint, ) except exceptions.AuthenticationFailed: raise if success is False: return trans.show_error_message(message) if "?confirm" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) if "?connect_external_provider" in redirect_url: return trans.response.send_redirect(url_for(redirect_url)) elif redirect_url is None: redirect_url = url_for("/") user = user if user is not None else trans.user if user is None: return trans.show_error_message( "An unknown error occurred when handling the callback from `{}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin.".format(provider) ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose def create_user(self, trans, provider, **kwargs): try: success, message, (redirect_url, user) = trans.app.authnz_manager.create_user( provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/") ) except exceptions.AuthenticationFailed as e: return trans.response.send_redirect( f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}" ) if success is False: return trans.show_error_message(message) user = user if user is not None else trans.user if user is None: return trans.show_error_message( "An unknown error occurred when handling the callback from `{}` " "identity provider. Please try again, and if the problem persists, " "contact the Galaxy instance admin.".format(provider) ) trans.handle_user_login(user) # Record which idp provider was logged into, so we can logout of it later trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose @web.require_login("authenticate against the selected identity provider") def disconnect(self, trans, provider, email=None, **kwargs): if trans.user is None: # Only logged in users are allowed here. return success, message, redirect_url = trans.app.authnz_manager.disconnect( provider, trans, email, disconnect_redirect_url=url_for("/") ) if success is False: return trans.show_error_message(message) if redirect_url is None: redirect_url = url_for("/") return trans.response.send_redirect(redirect_url)
[docs] @web.json @web.expose def logout(self, trans, provider, **kwargs): post_user_logout_href = trans.app.config.post_user_logout_href if post_user_logout_href is not None: post_user_logout_href = trans.request.base + url_for(post_user_logout_href) success, message, redirect_uri = trans.app.authnz_manager.logout( provider, trans, post_user_logout_href=post_user_logout_href ) trans.handle_user_logout() if success: return {"redirect_uri": redirect_uri} else: return {"message": message}
[docs] @web.expose def get_logout_url(self, trans, provider=None, **kwargs): idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME) if idp_provider: return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs] @web.expose @web.json def get_cilogon_idps(self, trans, **kwargs): allowed_idps = trans.app.authnz_manager.get_allowed_idps() try: cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs))) except Exception as e: raise Exception(f"Invalid server response. {str(e)}.") if allowed_idps: validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps)) if not (len(validated_idps) == len(allowed_idps)): validated_entity_ids = [entity["EntityID"] for entity in validated_idps] for idp in allowed_idps: if idp not in validated_entity_ids: log.debug( "Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.", idp, ) return validated_idps else: return cilogon_idps