Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.authnz
"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""
import datetime
import json
import logging
from typing import TYPE_CHECKING
import jwt
from galaxy import (
exceptions,
web,
)
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import BaseUIController
if TYPE_CHECKING:
from galaxy.webapps.base.webapp import GalaxyWebTransaction
log = logging.getLogger(__name__)
PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"
LOGIN_NEXT_COOKIE_NAME = "galaxy-oidc-login-next"
[docs]
class OIDC(BaseUIController):
[docs]
@web.json
@web.expose
@web.require_login("list third-party identities")
def index(self, trans, **kwargs):
"""
GET /authnz/
returns a list of third-party identities associated with the user.
:type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction
:param trans: Galaxy web transaction.
:param kwargs: empty dict
:rtype: list of dicts
:return: a list of third-party identities associated with the user account.
"""
rtv = []
# Process PSA tokens (unified authentication system)
for authnz in trans.user.social_auth:
token_info = {
"id": trans.app.security.encode_id(authnz.id),
"provider": authnz.provider,
"email": authnz.uid,
}
# Add provider label if available
provider_label = trans.app.authnz_manager.oidc_backends_config.get(authnz.provider, {}).get(
"label", authnz.provider
)
token_info["provider_label"] = provider_label
# Try to extract expiration from id_token if available
if authnz.extra_data and "id_token" in authnz.extra_data:
try:
userinfo = jwt.decode(
authnz.extra_data["id_token"],
options={"verify_signature": False, "verify_aud": False, "verify_exp": False},
)
if "exp" in userinfo:
token_info["expiration"] = str(datetime.datetime.utcfromtimestamp(userinfo["exp"]))
# Update email from token if available and different
if "email" in userinfo:
token_info["email"] = userinfo["email"]
except Exception:
# If token decoding fails, continue without expiration info
pass
rtv.append(token_info)
return rtv
[docs]
@web.json
@web.expose
def login(self, trans, provider, idphint=None, next=None, redirect=None):
if not trans.app.config.enable_oidc:
msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance."
log.debug(msg)
return trans.show_error_message(msg)
if next:
trans.set_cookie(value=next, name=LOGIN_NEXT_COOKIE_NAME, age=1)
else:
# If no next parameter is provided, ensure we unset any existing next cookie.
trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME)
success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint)
if success:
if redirect and redirect.lower() == "true":
return trans.response.send_redirect(redirect_uri)
else:
return {"redirect_uri": redirect_uri}
else:
raise exceptions.AuthenticationFailed(message)
[docs]
@web.expose
def callback(self, trans, provider, idphint=None, **kwargs):
user = trans.user.username if trans.user is not None else "anonymous"
login_next_cookie = trans.get_cookie(name=LOGIN_NEXT_COOKIE_NAME)
if login_next_cookie and login_next_cookie != "None":
# This cookie can sometimes be set to a literal string 'None', which we don't want to use as a redirect.
login_next = url_for(login_next_cookie)
else:
# Fallback to default redirect if no login_next cookie is found.
login_next = url_for("/")
if not bool(kwargs):
log.warning(f"OIDC callback received no data for provider `{provider}` and user `{user}`")
return trans.show_error_message(
f"Did not receive any information from the `{provider}` identity provider to complete user `{user}` authentication "
"flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note "
"that this endpoint is to receive authentication callbacks only, and should not be called/reached by "
"a user."
)
if "error" in kwargs:
log.warning(
"Error handling authentication callback from `{}` identity provider for user `{}` login request."
" Error message: {}".format(provider, user, kwargs.get("error", "None"))
)
error_description = kwargs.get("error_description")
if error_description:
error_msg = error_description
else:
error_msg = (
f"Failed to handle authentication callback from {provider}. "
"Please try again, and if the problem persists, contact "
"the Galaxy instance admin."
)
redirect_to = trans.url_builder("/login/start", message=error_msg, status="danger")
return trans.response.send_redirect(redirect_to)
try:
success, message, (redirect_url, user) = trans.app.authnz_manager.callback(
provider,
kwargs.get("state", " "),
kwargs["code"],
trans,
login_redirect_url=login_next,
idphint=idphint,
)
except exceptions.AuthenticationFailed:
raise
if success is False:
return trans.show_error_message(message)
if "?confirm" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
if "?connect_external_provider" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
elif redirect_url is None:
redirect_url = url_for("/")
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
)
trans.handle_user_login(user)
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
# Clear the login next cookie back to default.
trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME)
return trans.response.send_redirect(url_for(redirect_url))
[docs]
@web.expose
def create_user(self, trans: "GalaxyWebTransaction", provider: str, **kwargs):
try:
success, message, (redirect_url, user) = trans.app.authnz_manager.create_user(
provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/")
)
except exceptions.AuthenticationFailed as e:
return trans.response.send_redirect(
f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}"
)
if success is False:
return trans.show_error_message(message)
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
)
trans.handle_user_login(user)
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(url_for(redirect_url))
[docs]
@web.expose
@web.require_login("authenticate against the selected identity provider")
def disconnect(self, trans, provider, email=None, **kwargs):
if trans.user is None:
# Only logged in users are allowed here.
return
success, message, redirect_url = trans.app.authnz_manager.disconnect(
provider, trans, email, disconnect_redirect_url=url_for("/")
)
if success is False:
return trans.show_error_message(message)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(redirect_url)
[docs]
@web.json
@web.expose
def logout(self, trans, provider, **kwargs):
post_user_logout_href = trans.app.config.post_user_logout_href
if post_user_logout_href is not None:
post_user_logout_href = trans.request.base + url_for(post_user_logout_href)
success, message, redirect_uri = trans.app.authnz_manager.logout(
provider, trans, post_user_logout_href=post_user_logout_href
)
trans.handle_user_logout()
if success:
return {"redirect_uri": redirect_uri}
else:
return {"message": message}
[docs]
@web.expose
def get_logout_url(self, trans, provider=None, **kwargs):
idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME)
if idp_provider:
return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs]
@web.expose
@web.json
def get_cilogon_idps(self, trans, **kwargs):
try:
cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs)))
except Exception as e:
raise Exception(f"Invalid server response. {str(e)}.")
if allowed_idps := trans.app.authnz_manager.get_allowed_idps():
validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps))
if not (len(validated_idps) == len(allowed_idps)):
validated_entity_ids = [entity["EntityID"] for entity in validated_idps]
for idp in allowed_idps:
if idp not in validated_entity_ids:
log.debug(
"Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.",
idp,
)
return validated_idps
else:
return cilogon_idps