"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""
import datetime
import json
import logging
import jwt
from galaxy import (
exceptions,
web,
)
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import BaseUIController
log = logging.getLogger(__name__)
PROVIDER_COOKIE_NAME = "galaxy-oidc-provider"
LOGIN_NEXT_COOKIE_NAME = "galaxy-oidc-login-next"
[docs]
class OIDC(BaseUIController):
[docs]
@web.json
@web.expose
@web.require_login("list third-party identities")
def index(self, trans, **kwargs):
"""
GET /authnz/
returns a list of third-party identities associated with the user.
:type trans: galaxy.webapps.base.webapp.GalaxyWebTransaction
:param trans: Galaxy web transaction.
:param kwargs: empty dict
:rtype: list of dicts
:return: a list of third-party identities associated with the user account.
"""
rtv = []
# Process PSA tokens (unified authentication system)
for authnz in trans.user.social_auth:
token_info = {
"id": trans.app.security.encode_id(authnz.id),
"provider": authnz.provider,
"email": authnz.uid,
}
# Add provider label if available
provider_label = trans.app.authnz_manager.oidc_backends_config.get(authnz.provider, {}).get(
"label", authnz.provider
)
token_info["provider_label"] = provider_label
# Try to extract expiration from id_token if available
if authnz.extra_data and "id_token" in authnz.extra_data:
try:
userinfo = jwt.decode(
authnz.extra_data["id_token"],
options={"verify_signature": False, "verify_aud": False, "verify_exp": False},
)
if "exp" in userinfo:
token_info["expiration"] = str(datetime.datetime.utcfromtimestamp(userinfo["exp"]))
# Update email from token if available and different
if "email" in userinfo:
token_info["email"] = userinfo["email"]
except Exception:
# If token decoding fails, continue without expiration info
pass
rtv.append(token_info)
return rtv
[docs]
@web.json
@web.expose
def login(self, trans, provider, idphint=None, next=None, redirect=None):
if not trans.app.config.enable_oidc:
msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance."
log.debug(msg)
return trans.show_error_message(msg)
if next:
trans.set_cookie(value=next, name=LOGIN_NEXT_COOKIE_NAME, age=1)
else:
# If no next parameter is provided, ensure we unset any existing next cookie.
trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME)
success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint)
if success:
if redirect and redirect.lower() == "true":
return trans.response.send_redirect(redirect_uri)
else:
return {"redirect_uri": redirect_uri}
else:
raise exceptions.AuthenticationFailed(message)
[docs]
@web.expose
def callback(self, trans, provider, idphint=None, **kwargs):
user = trans.user.username if trans.user is not None else "anonymous"
login_next_cookie = trans.get_cookie(name=LOGIN_NEXT_COOKIE_NAME)
if login_next_cookie and login_next_cookie != "None":
# This cookie can sometimes be set to a literal string 'None', which we don't want to use as a redirect.
login_next = url_for(login_next_cookie)
else:
# Fallback to default redirect if no login_next cookie is found.
login_next = url_for("/")
if not bool(kwargs):
log.warning(f"OIDC callback received no data for provider `{provider}` and user `{user}`")
return trans.show_error_message(
f"Did not receive any information from the `{provider}` identity provider to complete user `{user}` authentication "
"flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note "
"that this endpoint is to receive authentication callbacks only, and should not be called/reached by "
"a user."
)
if "error" in kwargs:
log.warning(
"Error handling authentication callback from `{}` identity provider for user `{}` login request."
" Error message: {}".format(provider, user, kwargs.get("error", "None"))
)
error_description = kwargs.get("error_description")
if error_description:
error_msg = error_description
else:
error_msg = (
f"Failed to handle authentication callback from {provider}. "
"Please try again, and if the problem persists, contact "
"the Galaxy instance admin."
)
redirect_to = trans.url_builder("/login/start", message=error_msg, status="danger")
return trans.response.send_redirect(redirect_to)
try:
success, message, (redirect_url, user) = trans.app.authnz_manager.callback(
provider,
kwargs.get("state", " "),
kwargs["code"],
trans,
login_redirect_url=login_next,
idphint=idphint,
)
except exceptions.AuthenticationFailed:
raise
if success is False:
return trans.show_error_message(message)
if "?confirm" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
if "?connect_external_provider" in redirect_url:
return trans.response.send_redirect(url_for(redirect_url))
elif redirect_url is None:
redirect_url = url_for("/")
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
)
trans.handle_user_login(user)
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
# Clear the login next cookie back to default.
trans.set_cookie(value="/", name=LOGIN_NEXT_COOKIE_NAME)
return trans.response.send_redirect(url_for(redirect_url))
[docs]
@web.expose
def create_user(self, trans, provider, **kwargs):
try:
success, message, (redirect_url, user) = trans.app.authnz_manager.create_user(
provider, token=kwargs.get("token", " "), trans=trans, login_redirect_url=url_for("/")
)
except exceptions.AuthenticationFailed as e:
return trans.response.send_redirect(
f"{trans.request.url_path + url_for('/')}root/login?message={str(e) or 'Duplicate Email'}"
)
if success is False:
return trans.show_error_message(message)
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message(
f"An unknown error occurred when handling the callback from `{provider}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin."
)
trans.handle_user_login(user)
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(url_for(redirect_url))
[docs]
@web.expose
@web.require_login("authenticate against the selected identity provider")
def disconnect(self, trans, provider, email=None, **kwargs):
if trans.user is None:
# Only logged in users are allowed here.
return
success, message, redirect_url = trans.app.authnz_manager.disconnect(
provider, trans, email, disconnect_redirect_url=url_for("/")
)
if success is False:
return trans.show_error_message(message)
if redirect_url is None:
redirect_url = url_for("/")
return trans.response.send_redirect(redirect_url)
[docs]
@web.json
@web.expose
def logout(self, trans, provider, **kwargs):
post_user_logout_href = trans.app.config.post_user_logout_href
if post_user_logout_href is not None:
post_user_logout_href = trans.request.base + url_for(post_user_logout_href)
success, message, redirect_uri = trans.app.authnz_manager.logout(
provider, trans, post_user_logout_href=post_user_logout_href
)
trans.handle_user_logout()
if success:
return {"redirect_uri": redirect_uri}
else:
return {"message": message}
[docs]
@web.expose
def get_logout_url(self, trans, provider=None, **kwargs):
idp_provider = provider if provider else trans.get_cookie(name=PROVIDER_COOKIE_NAME)
if idp_provider:
return trans.response.send_redirect(url_for(controller="authnz", action="logout", provider=idp_provider))
[docs]
@web.expose
@web.json
def get_cilogon_idps(self, trans, **kwargs):
try:
cilogon_idps = json.loads(url_get("https://cilogon.org/idplist/", params=dict(kwargs)))
except Exception as e:
raise Exception(f"Invalid server response. {str(e)}.")
if allowed_idps := trans.app.authnz_manager.get_allowed_idps():
validated_idps = list(filter(lambda idp: idp["EntityID"] in allowed_idps, cilogon_idps))
if not (len(validated_idps) == len(allowed_idps)):
validated_entity_ids = [entity["EntityID"] for entity in validated_idps]
for idp in allowed_idps:
if idp not in validated_entity_ids:
log.debug(
"Invalid EntityID entered: %s. Use https://cilogon.org/idplist/ to find desired institution's EntityID.",
idp,
)
return validated_idps
else:
return cilogon_idps