Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.authnz
"""
OAuth 2.0 and OpenID Connect Authentication and Authorization Controller.
"""
from __future__ import absolute_import
import json
import logging
import jwt
from galaxy import exceptions
from galaxy import web
from galaxy.util import url_get
from galaxy.web import url_for
from galaxy.webapps.base.controller import JSAppLauncher
log = logging.getLogger(__name__)
PROVIDER_COOKIE_NAME = 'oidc-provider'
[docs]class OIDC(JSAppLauncher):
[docs] @web.json
@web.expose
@web.require_login("list third-party identities")
def index(self, trans, **kwargs):
"""
GET /authnz/
returns a list of third-party identities associated with the user.
:type trans: galaxy.web.framework.webapp.GalaxyWebTransaction
:param trans: Galaxy web transaction.
:param kwargs: empty dict
:rtype: list of dicts
:return: a list of third-party identities associated with the user account.
"""
rtv = []
for authnz in trans.user.social_auth:
rtv.append({'id': trans.app.security.encode_id(authnz.id), 'provider': authnz.provider, 'email': authnz.uid})
# Add cilogon and custos identities
for token in trans.user.custos_auth:
userinfo = jwt.decode(token.id_token, verify=False)
rtv.append({'id': trans.app.security.encode_id(token.id), 'provider': token.provider, 'email': userinfo['email']})
return rtv
[docs] @web.json
@web.expose
def login(self, trans, provider, idphint=None):
if not trans.app.config.enable_oidc:
msg = "Login to Galaxy using third-party identities is not enabled on this Galaxy instance."
log.debug(msg)
return trans.show_error_message(msg)
success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans, idphint=idphint)
if success:
return {"redirect_uri": redirect_uri}
else:
raise exceptions.AuthenticationFailed(message)
[docs] @web.expose
def callback(self, trans, provider, idphint=None, **kwargs):
user = trans.user.username if trans.user is not None else 'anonymous'
if not bool(kwargs):
log.error("OIDC callback received no data for provider `{}` and user `{}`".format(provider, user))
return trans.show_error_message(
'Did not receive any information from the `{}` identity provider to complete user `{}` authentication '
'flow. Please try again, and if the problem persists, contact the Galaxy instance admin. Also note '
'that this endpoint is to receive authentication callbacks only, and should not be called/reached by '
'a user.'.format(provider, user))
if 'error' in kwargs:
log.error("Error handling authentication callback from `{}` identity provider for user `{}` login request."
" Error message: {}".format(provider, user, kwargs.get('error', 'None')))
return trans.show_error_message('Failed to handle authentication callback from {}. '
'Please try again, and if the problem persists, contact '
'the Galaxy instance admin'.format(provider))
try:
success, message, (redirect_url, user) = trans.app.authnz_manager.callback(provider,
kwargs.get('state', ' '),
kwargs['code'],
trans,
login_redirect_url=url_for('/'),
idphint=idphint)
except exceptions.AuthenticationFailed as e:
return trans.response.send_redirect(trans.request.base + url_for('/') + 'root/login?message=' + (e.message or "Duplicate Email"))
if success is False:
return trans.show_error_message(message)
user = user if user is not None else trans.user
if user is None:
return trans.show_error_message("An unknown error occurred when handling the callback from `{}` "
"identity provider. Please try again, and if the problem persists, "
"contact the Galaxy instance admin.".format(provider))
trans.handle_user_login(user)
# Record which idp provider was logged into, so we can logout of it later
trans.set_cookie(value=provider, name=PROVIDER_COOKIE_NAME)
return trans.response.send_redirect(url_for('/'))
[docs] @web.expose
@web.require_login("authenticate against the selected identity provider")
def disconnect(self, trans, provider, **kwargs):
if trans.user is None:
# Only logged in users are allowed here.
return
success, message, redirect_url = trans.app.authnz_manager.disconnect(provider,
trans,
disconnect_redirect_url=url_for('/'))
if success is False:
return trans.show_error_message(message)
if redirect_url is None:
redirect_url = url_for('/')
return trans.response.send_redirect(redirect_url)
[docs] @web.json
def logout(self, trans, provider, **kwargs):
post_logout_redirect_url = trans.request.base + url_for('/') + 'root/login?is_logout_redirect=true'
success, message, redirect_uri = trans.app.authnz_manager.logout(provider,
trans,
post_logout_redirect_url=post_logout_redirect_url)
if success:
return {'redirect_uri': redirect_uri}
else:
return {'message': message}
[docs] @web.expose
def get_logout_url(self, trans, **kwargs):
idp_provider = trans.get_cookie(name=PROVIDER_COOKIE_NAME)
if idp_provider:
return trans.response.send_redirect(url_for(controller='authnz', action='logout', provider=idp_provider))
[docs] @web.expose
@web.json
def get_cilogon_idps(self, trans, **kwargs):
try:
return json.loads(url_get('https://cilogon.org/idplist/', params=dict(kwargs)))
except Exception as e:
raise Exception("Invalid server response. %s." % str(e))