OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
import datetime
import json
import logging
import jwt
from galaxy import (
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import JSAppLauncher
log = logging.getLogger(__name__)
PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"
[docs]class OIDC(JSAppLauncher):
[docs] @web.json
@web.require_login("list third-party identities")
def index(self, trans, **kwargs):
GET /authnz/
returns a list of third-party identities associated with the user.
:type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction
:param trans: Galaxy web transaction.
:param kwargs: empty dict
:rtype: list of dicts
:return: a list of third-party identities associated with the user account.
rtv = []
for authnz in trans.user.social_auth:
{"id": trans.app.security.encode_id(authnz.id), "provider": authnz.provider, "email": authnz.uid}
# Add cilogon and custos identities
for token in trans.user.custos_auth:
# for purely displaying the info to user, we bypass verification of
# signature, audience, and expiration as that's potentially useful
# information to share with the end user
userinfo = jwt.decode(
token.id_token, options={"verify_signature": False, "verify_aud": False, "verify_exp": False}
"id": trans.app.security.encode_id(token.id),
"provider": token.provider,
"email": userinfo["email"],
"expiration": str(datetime.datetime.utcfromtimestamp(userinfo["exp"])),
except Exception:
"id": trans.app.security.encode_id(token.id),
"provider": token.provider,
"error": "Unable to decode token",
return rtv
[docs] @web.json
def login(self, trans, provider, idphint=None):
if not trans.app.config.enable_oidc:
msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance."
return trans.show_error_message(msg)
success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint=idphint)
if success:
return {"redirect_uri": redirect_uri}
raise exceptions.AuthenticationFailed(message)
[docs] @web.expose
def callback(self, trans, provider, idphint=None, **kwargs):
user = trans.user.username if trans.user is not None else "anonymous"
if not bool(kwargs):
log.error(f"OIDC callback received no data for provider `{provider}` and user `{user}`")
return trans.show_error_message(
f"Did not receive any information from the `{provider}` identity provider to complete user `{user}` authentication "
"flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note "
"that this endpoint is to receive authentication callbacks only, and should not be called/reached by "
"a user."
if "error" in kwargs:
"Error handling authentication callback from `{}` identity provider for user `{}` login request."
" Error message: {}".format(provider, user, kwargs.get("error", "None"))
return trans.show_error_message(
f"Failed to handle authentication callback from {provider}. "
"Please try again, and if the problem persists, contact "
"the Galaxy instance admin"
success, message, (redirect_url, user) = trans.app.authnz_manager.callback(
kwargs.get("state", " "),
except exceptions.AuthenticationFailed:
if success is False:
return trans.show_error_message(message)
if "?confirm" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
if "?connect_external_provider" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
elif redirect_url is None:
redirect_url = url_for("/")
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose
def create_user(self, trans, provider, **kwargs):
success, message, (redirect_url, user) = trans.app.authnz_manager.create_user(
provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/")
except exceptions.AuthenticationFailed as e:
return trans.response.send_redirect(
f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}"
if success is False:
return trans.show_error_message(message)
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(url_for(redirect_url))
[docs] @web.expose
@web.require_login("authenticate against the selected identity provider")
def disconnect(self, trans, provider, email=None, **kwargs):
if trans.user is None:
# Only logged in users are allowed here.
success, message, redirect_url = trans.app.authnz_manager.disconnect(
provider, trans, email, disconnect_redirect_url=url_for("/")
if success is False:
return trans.show_error_message(message)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(redirect_url)
[docs] @web.json
def logout(self, trans, provider, **kwargs):
post_user_logout_href = trans.app.config.post_user_logout_href
if post_user_logout_href is not None:
post_user_logout_href = trans.request.base + url_for(post_user_logout_href)
success, message, redirect_uri = trans.app.authnz_manager.logout(
provider, trans, post_user_logout_href=post_user_logout_href
if success:
return {"redirect_uri": redirect_uri}
return {"message": message}
[docs] @web.expose
def get_logout_url(self, trans, provider=None, **kwargs):
idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME)
if idp_provider:
return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs] @web.expose
def get_cilogon_idps(self, trans, **kwargs):
cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs)))
except Exception as e:
raise Exception(f"Invalid server response. {str(e)}.")
if allowed_idps := trans.app.authnz_manager.get_allowed_idps():
validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps))
if not (len(validated_idps) == len(allowed_idps)):
validated_entity_ids = [entity["EntityID"] for entity in validated_idps]
for idp in allowed_idps:
if idp not in validated_entity_ids:
"Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.",
return validated_idps
return cilogon_idps