Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.authnz.psa_authnz
import json
import requests
import six
from social_core.actions import do_auth, do_complete, do_disconnect
from social_core.backends.utils import get_backend
from social_core.strategy import BaseStrategy
from social_core.utils import module_member, setting_name
from sqlalchemy.exc import IntegrityError
from galaxy.exceptions import MalformedContents
from ..authnz import IdentityProvider
from ..model import PSAAssociation, PSACode, PSANonce, PSAPartial, UserAuthnzToken
# key: a component name which PSA requests.
# value: is the name of a class associated with that key.
DEFAULTS = {
'STRATEGY': 'Strategy',
'STORAGE': 'Storage'
}
BACKENDS = {
'google': 'social_core.backends.google_openidconnect.GoogleOpenIdConnect',
'globus': 'social_core.backends.globus.GlobusOpenIdConnect',
'elixir': 'social_core.backends.elixir.ElixirOpenIdConnect',
'okta': 'social_core.backends.okta_openidconnect.OktaOpenIdConnect'
}
BACKENDS_NAME = {
'google': 'google-openidconnect',
'globus': 'globus',
'elixir': 'elixir',
'okta': 'okta-openidconnect'
}
AUTH_PIPELINE = (
# Get the information we can about the user and return it in a simple
# format to create the user instance later. On some cases the details are
# already part of the auth response from the provider, but sometimes this
# could hit a provider API.
'social_core.pipeline.social_auth.social_details',
# Get the social uid from whichever service we're authing thru. The uid is
# the unique identifier of the given user in the provider.
'social_core.pipeline.social_auth.social_uid',
# Verifies that the current auth process is valid within the current
# project, this is where emails and domains whitelists are applied (if
# defined).
'social_core.pipeline.social_auth.auth_allowed',
# Checks if the decoded response contains all the required fields such
# as an ID token or a refresh token.
'galaxy.authnz.psa_authnz.contains_required_data',
'galaxy.authnz.psa_authnz.verify',
# Checks if the current social-account is already associated in the site.
'social_core.pipeline.social_auth.social_user',
# Make up a username for this person, appends a random string at the end if
# there's any collision.
'social_core.pipeline.user.get_username',
# Send a validation email to the user to verify its email address.
# 'social_core.pipeline.mail.mail_validation',
# Associates the current social details with another user account with
# a similar email address.
'social_core.pipeline.social_auth.associate_by_email',
# Create a user account if we haven't found one yet.
'social_core.pipeline.user.create_user',
# Create the record that associated the social account with this user.
'social_core.pipeline.social_auth.associate_user',
# Populate the extra_data field in the social record with the values
# specified by settings (and the default ones like access_token, etc).
'social_core.pipeline.social_auth.load_extra_data',
# Update the user record with any changed info from the auth service.
'social_core.pipeline.user.user_details'
)
DISCONNECT_PIPELINE = (
'galaxy.authnz.psa_authnz.allowed_to_disconnect',
'galaxy.authnz.psa_authnz.disconnect'
)
[docs]class PSAAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config):
self.config = {'provider': provider.lower()}
for key, value in oidc_config.items():
self.config[setting_name(key)] = value
self.config[setting_name('USER_MODEL')] = 'models.User'
self.config['SOCIAL_AUTH_PIPELINE'] = AUTH_PIPELINE
self.config['DISCONNECT_PIPELINE'] = DISCONNECT_PIPELINE
self.config[setting_name('AUTHENTICATION_BACKENDS')] = (BACKENDS[provider],)
self.config["VERIFY_SSL"] = oidc_config.get("VERIFY_SSL")
self.config["REQUESTS_TIMEOUT"] = oidc_config.get("REQUESTS_TIMEOUT")
self.config["ID_TOKEN_MAX_AGE"] = oidc_config.get("ID_TOKEN_MAX_AGE")
# The following config sets PSA to call the `_login_user` function for
# logging in a user. If this setting is set to false, the `_login_user`
# would not be called, and as a result Galaxy would not know who is
# the just logged-in user.
self.config[setting_name('INACTIVE_USER_LOGIN')] = True
if provider in BACKENDS_NAME:
self._setup_idp(oidc_backend_config)
# Secondary AuthZ with Google identities is currently supported
if provider != "google":
if 'SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER' in self.config:
del self.config["SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER"]
if 'SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT' in self.config:
del self.config["SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT"]
def _setup_idp(self, oidc_backend_config):
self.config[setting_name('AUTH_EXTRA_ARGUMENTS')] = {'access_type': 'offline'}
self.config['KEY'] = oidc_backend_config.get('client_id')
self.config['SECRET'] = oidc_backend_config.get('client_secret')
self.config['redirect_uri'] = oidc_backend_config.get('redirect_uri')
if oidc_backend_config.get('prompt') is not None:
self.config[setting_name('AUTH_EXTRA_ARGUMENTS')]['prompt'] = oidc_backend_config.get('prompt')
if oidc_backend_config.get('api_url') is not None:
self.config[setting_name('API_URL')] = oidc_backend_config.get('api_url')
if oidc_backend_config.get('url') is not None:
self.config[setting_name('URL')] = oidc_backend_config.get('url')
def _get_helper(self, name, do_import=False):
this_config = self.config.get(setting_name(name), DEFAULTS.get(name, None))
return do_import and module_member(this_config) or this_config
def _load_backend(self, strategy, redirect_uri):
backends = self._get_helper('AUTHENTICATION_BACKENDS')
backend = get_backend(backends, BACKENDS_NAME[self.config['provider']])
return backend(strategy, redirect_uri)
def _login_user(self, backend, user, social_user):
self.config['user'] = user
[docs] def authenticate(self, trans):
on_the_fly_config(trans.sa_session)
strategy = Strategy(trans.request, trans.session, Storage, self.config)
backend = self._load_backend(strategy, self.config['redirect_uri'])
if backend.name is BACKENDS_NAME["google"] and \
"SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config and \
"SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config:
backend.DEFAULT_SCOPE.append("https://www.googleapis.com/auth/cloud-platform")
return do_auth(backend)
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url):
on_the_fly_config(trans.sa_session)
self.config[setting_name('LOGIN_REDIRECT_URL')] = login_redirect_url
strategy = Strategy(trans.request, trans.session, Storage, self.config)
strategy.session_set(BACKENDS_NAME[self.config['provider']] + '_state', state_token)
backend = self._load_backend(strategy, self.config['redirect_uri'])
redirect_url = do_complete(
backend,
login=lambda backend, user, social_user: self._login_user(backend, user, social_user),
user=trans.user,
state=state_token)
return redirect_url, self.config.get('user', None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None, association_id=None):
on_the_fly_config(trans.sa_session)
self.config[setting_name('DISCONNECT_REDIRECT_URL')] =\
disconnect_redirect_url if disconnect_redirect_url is not None else ()
strategy = Strategy(trans.request, trans.session, Storage, self.config)
backend = self._load_backend(strategy, self.config['redirect_uri'])
response = do_disconnect(backend, trans.user, association_id)
if isinstance(response, six.string_types):
return True, "", response
return response.get('success', False), response.get('message', ""), ""
[docs]class Strategy(BaseStrategy):
[docs] def __init__(self, request, session, storage, config, tpl=None):
self.request = request
self.session = session if session else {}
self.config = config
self.config['SOCIAL_AUTH_REDIRECT_IS_HTTPS'] = True if self.request and self.request.host.startswith('https:') else False
self.config['SOCIAL_AUTH_GOOGLE_OPENIDCONNECT_EXTRA_DATA'] = ['id_token']
super(Strategy, self).__init__(storage, tpl)
[docs] def request_data(self, merge=True):
if not self.request:
return {}
if merge:
data = self.request.GET.copy()
data.update(self.request.POST)
elif self.request.method == 'POST':
data = self.request.POST
else:
data = self.request.GET
return data
[docs] def build_absolute_uri(self, path=None):
path = path or ''
if path.startswith('http://') or path.startswith('https://'):
return path
if self.request:
return \
self.request.host +\
'/authnz' + ('/' + self.config.get('provider')) if self.config.get('provider', None) is not None else ''
return path
[docs] def render_html(self, tpl=None, html=None, context=None):
raise NotImplementedError('Not implemented.')
[docs] def start(self):
self.clean_partial_pipeline()
if self.backend.uses_redirect():
return self.redirect(self.backend.auth_url())
else:
return self.html(self.backend.auth_html())
[docs] def continue_pipeline(self, *args, **kwargs):
return self.backend.continue_pipeline(*args, **kwargs)
[docs]class Storage(object):
user = UserAuthnzToken
nonce = PSANonce
association = PSAAssociation
code = PSACode
partial = PSAPartial
[docs] @classmethod
def is_integrity_error(cls, exception):
return exception.__class__ is IntegrityError
[docs]def on_the_fly_config(sa_session):
PSACode.sa_session = sa_session
UserAuthnzToken.sa_session = sa_session
PSANonce.sa_session = sa_session
PSAPartial.sa_session = sa_session
PSAAssociation.sa_session = sa_session
[docs]def contains_required_data(response=None, is_new=False, **kwargs):
"""
This function is called as part of authentication and authorization
pipeline before user is authenticated or authorized (see AUTH_PIPELINE).
This function asserts if all the data required by Galaxy for a user
is provided. It raises an exception if any of the required data is missing,
and returns void if otherwise.
:type response: dict
:param response: a dictionary containing decoded response from
OIDC backend that contain the following keys
among others:
- id_token; see: http://openid.net/specs/openid-connect-core-1_0.html#IDToken
- access_token; see: https://tools.ietf.org/html/rfc6749#section-1.4
- refresh_token; see: https://tools.ietf.org/html/rfc6749#section-1.5
- token_type; see: https://tools.ietf.org/html/rfc6750#section-6.1.1
- scope; see: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
- expires_in; is the expiration time of the access and ID tokens in seconds since
the response was generated.
:type is_new: bool
:param is_new: has the user been authenticated?
:param kwargs: may contain the following keys among others:
- uid: user ID
- user: Galaxy user; if user is already authenticated
- backend: the backend that is used for user authentication.
- storage: an instance of Storage class.
- strategy: an instance of the Strategy class.
- state: the state code received from identity provider.
- details: details about the user's third-party identity as requested in `scope`.
:rtype: void
:return: Raises an exception if any of the required arguments is missing, and pass if all are given.
"""
hint_msg = "Visit the identity provider's permitted applications page " \
"(e.g., visit `https://myaccount.google.com/u/0/permissions` " \
"for Google), then revoke the access of this Galaxy instance, " \
"and then retry to login. If the problem persists, contact " \
"the Admin of this Galaxy instance."
if response is None or not isinstance(response, dict):
# This can happen only if PSA is not able to decode the `authnz code`
# sent back from the identity provider. PSA internally handles such
# scenarios; however, this case is implemented to prevent uncaught
# server-side errors.
raise MalformedContents(err_msg="`response` not found. {}".format(hint_msg))
if not response.get("id_token"):
# This can happen if a non-OIDC compliant backend is used;
# e.g., an OAuth2.0-based backend that only generates access token.
raise MalformedContents(err_msg="Missing identity token. {}".format(hint_msg))
if is_new and not response.get("refresh_token"):
# An identity provider (e.g., Google) sends a refresh token the first
# time user consents Galaxy's access (i.e., the first time user logs in
# to a galaxy instance using their credentials with the identity provider).
# There could be variety of scenarios under which a refresh token might
# be missing; e.g., a manipulated Galaxy's database, where a user's records
# from galaxy_user and oidc_user_authnz_tokens tables deleted after the
# user has provided consent. This can also happen under dev efforts.
# The solution is to revoke the consent by visiting the identity provider's
# website, and then retry the login process.
raise MalformedContents(err_msg="Missing refresh token. {}".format(hint_msg))
[docs]def verify(strategy=None, response=None, details=None, **kwargs):
provider = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER")
endpoint = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT")
if provider is None or endpoint is None:
# Either the secondary authorization is not configured or OIDC IdP
# is not compatible, so allow user login.
return
if provider.lower() == "gcp":
result = requests.post(
"https://iam.googleapis.com/v1/projects/-/serviceAccounts/{}:getIamPolicy".format(endpoint),
headers={
'Authorization': 'Bearer {}'.format(response.get("access_token")),
'Accept': 'application/json'})
res = json.loads(result.content)
if result.status_code == requests.codes.ok:
email_addresses = res["bindings"][0]["members"]
email_addresses = [x.lower().replace("user:", "").strip() for x in email_addresses]
if details.get("email") in email_addresses:
# Secondary authorization successful, so allow user login.
pass
else:
raise Exception("Not authorized by GCP IAM.")
else:
# The message of the raised exception is shown to the user; hence,
# the following way of handling exception is better than using
# result.raise_for_status(), since raise_for_status may report
# sensitive information that should not be exposed to users.
raise Exception(res["error"]["message"])
else:
raise Exception("`{}` is an unsupported secondary authorization provider, contact admin.".format(provider))
[docs]def allowed_to_disconnect(name=None, user=None, user_storage=None, strategy=None,
backend=None, request=None, details=None, **kwargs):
"""
Disconnect is the process of disassociating a Galaxy user and a third-party authnz.
In other words, it is the process of removing any access and/or ID tokens of a user.
This function should raise an exception if disconnection is NOT permitted. Do NOT
return any value (except an empty dictionary) if disconnect is allowed. Because, at
least until PSA social_core v.1.5.0, any returned value (e.g., Boolean) will result
in ignoring the rest of the disconnect pipeline.
See the following condition in `run_pipeline` function:
https://github.com/python-social-auth/social-core/blob/master/social_core/backends/base.py#L114
:param name: name of the backend (e.g., google-openidconnect)
:type user: galaxy.model.User
:type user_storage: galaxy.model.UserAuthnzToken
:type strategy: galaxy.authnz.psa_authnz.Strategy
:type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect)
:type request: webob.multidict.MultiDict
:type details: dict
:return: empty dict
"""
pass
[docs]def disconnect(name=None, user=None, user_storage=None, strategy=None,
backend=None, request=None, details=None, **kwargs):
"""
Disconnect is the process of disassociating a Galaxy user and a third-party authnz.
In other words, it is the process of removing any access and/or ID tokens of a user.
:param name: name of the backend (e.g., google-openidconnect)
:type user: galaxy.model.User
:type user_storage: galaxy.model.UserAuthnzToken
:type strategy: galaxy.authnz.psa_authnz.Strategy
:type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect)
:type request: webob.multidict.MultiDict
:type details: dict
:return: void or empty dict. Any key-value pair inside the dictionary will be available
inside PSA only, and will be passed to the next step in the disconnect pipeline. However,
the key-value pair will not be returned as a result of calling the `do_disconnect` function.
Additionally, returning any value except for a(n) (empty) dictionary, will break the
disconnect pipeline, and that value will be returned as a result of calling the `do_disconnect` function.
"""
sa_session = user_storage.sa_session
user_authnz = sa_session.query(user_storage).filter(user_storage.table.c.user_id == user.id,
user_storage.table.c.provider == name).first()
if user_authnz is None:
return {'success': False, 'message': 'Not authenticated by any identity providers.'}
# option A
strategy.trans.sa_session.delete(user_authnz)
# option B
# user_authnz.extra_data = None
strategy.trans.sa_session.flush()