Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.authnz.psa_authnz
import json
import requests
from social_core.actions import do_auth, do_complete, do_disconnect
from social_core.backends.utils import get_backend
from social_core.strategy import BaseStrategy
from social_core.utils import module_member, setting_name
from sqlalchemy.exc import IntegrityError
from galaxy.exceptions import MalformedContents
from ..authnz import IdentityProvider
from ..model import PSAAssociation, PSACode, PSANonce, PSAPartial, UserAuthnzToken
# key: a component name which PSA requests.
# value: is the name of a class associated with that key.
DEFAULTS = {
'STRATEGY': 'Strategy',
'STORAGE': 'Storage'
}
BACKENDS = {
'google': 'social_core.backends.google_openidconnect.GoogleOpenIdConnect',
'globus': 'social_core.backends.globus.GlobusOpenIdConnect',
'elixir': 'social_core.backends.elixir.ElixirOpenIdConnect',
'okta': 'social_core.backends.okta_openidconnect.OktaOpenIdConnect',
'azure': 'social_core.backends.azuread_tenant.AzureADTenantOAuth2'
}
BACKENDS_NAME = {
'google': 'google-openidconnect',
'globus': 'globus',
'elixir': 'elixir',
'okta': 'okta-openidconnect',
'azure': 'azuread-tenant-oauth2'
}
AUTH_PIPELINE = (
# Get the information we can about the user and return it in a simple
# format to create the user instance later. On some cases the details are
# already part of the auth response from the provider, but sometimes this
# could hit a provider API.
'social_core.pipeline.social_auth.social_details',
# Get the social uid from whichever service we're authing thru. The uid is
# the unique identifier of the given user in the provider.
'social_core.pipeline.social_auth.social_uid',
# Verifies that the current auth process is valid within the current
# project, this is where emails and domains allowlists are applied (if
# defined).
'social_core.pipeline.social_auth.auth_allowed',
# Checks if the decoded response contains all the required fields such
# as an ID token or a refresh token.
'galaxy.authnz.psa_authnz.contains_required_data',
'galaxy.authnz.psa_authnz.verify',
# Checks if the current social-account is already associated in the site.
'social_core.pipeline.social_auth.social_user',
# Make up a username for this person, appends a random string at the end if
# there's any collision.
'social_core.pipeline.user.get_username',
# Send a validation email to the user to verify its email address.
# 'social_core.pipeline.mail.mail_validation',
# Associates the current social details with another user account with
# a similar email address.
'social_core.pipeline.social_auth.associate_by_email',
# Create a user account if we haven't found one yet.
'social_core.pipeline.user.create_user',
# Create the record that associated the social account with this user.
'social_core.pipeline.social_auth.associate_user',
# Populate the extra_data field in the social record with the values
# specified by settings (and the default ones like access_token, etc).
'social_core.pipeline.social_auth.load_extra_data',
# Update the user record with any changed info from the auth service.
'social_core.pipeline.user.user_details'
)
DISCONNECT_PIPELINE = (
'galaxy.authnz.psa_authnz.allowed_to_disconnect',
'galaxy.authnz.psa_authnz.disconnect'
)
[docs]class PSAAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config):
self.config = {'provider': provider.lower()}
for key, value in oidc_config.items():
self.config[setting_name(key)] = value
self.config[setting_name('USER_MODEL')] = 'models.User'
self.config['SOCIAL_AUTH_PIPELINE'] = AUTH_PIPELINE
self.config['DISCONNECT_PIPELINE'] = DISCONNECT_PIPELINE
self.config[setting_name('AUTHENTICATION_BACKENDS')] = (BACKENDS[provider],)
self.config["VERIFY_SSL"] = oidc_config.get("VERIFY_SSL")
self.config["REQUESTS_TIMEOUT"] = oidc_config.get("REQUESTS_TIMEOUT")
self.config["ID_TOKEN_MAX_AGE"] = oidc_config.get("ID_TOKEN_MAX_AGE")
# The following config sets PSA to call the `_login_user` function for
# logging in a user. If this setting is set to false, the `_login_user`
# would not be called, and as a result Galaxy would not know who is
# the just logged-in user.
self.config[setting_name('INACTIVE_USER_LOGIN')] = True
if provider in BACKENDS_NAME:
self._setup_idp(oidc_backend_config)
# Secondary AuthZ with Google identities is currently supported
if provider != "google":
if 'SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER' in self.config:
del self.config["SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER"]
if 'SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT' in self.config:
del self.config["SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT"]
def _setup_idp(self, oidc_backend_config):
self.config[setting_name('AUTH_EXTRA_ARGUMENTS')] = {'access_type': 'offline'}
self.config['KEY'] = oidc_backend_config.get('client_id')
self.config['SECRET'] = oidc_backend_config.get('client_secret')
self.config['redirect_uri'] = oidc_backend_config.get('redirect_uri')
if oidc_backend_config.get('prompt') is not None:
self.config[setting_name('AUTH_EXTRA_ARGUMENTS')]['prompt'] = oidc_backend_config.get('prompt')
if oidc_backend_config.get('api_url') is not None:
self.config[setting_name('API_URL')] = oidc_backend_config.get('api_url')
if oidc_backend_config.get('url') is not None:
self.config[setting_name('URL')] = oidc_backend_config.get('url')
def _get_helper(self, name, do_import=False):
this_config = self.config.get(setting_name(name), DEFAULTS.get(name, None))
return do_import and module_member(this_config) or this_config
def _load_backend(self, strategy, redirect_uri):
backends = self._get_helper('AUTHENTICATION_BACKENDS')
backend = get_backend(backends, BACKENDS_NAME[self.config['provider']])
return backend(strategy, redirect_uri)
def _login_user(self, backend, user, social_user):
self.config['user'] = user
[docs] def authenticate(self, trans):
on_the_fly_config(trans.sa_session)
strategy = Strategy(trans.request, trans.session, Storage, self.config)
backend = self._load_backend(strategy, self.config['redirect_uri'])
if backend.name is BACKENDS_NAME["google"] and \
"SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config and \
"SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config:
backend.DEFAULT_SCOPE.append("https://www.googleapis.com/auth/cloud-platform")
return do_auth(backend)
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url):
on_the_fly_config(trans.sa_session)
self.config[setting_name('LOGIN_REDIRECT_URL')] = login_redirect_url
strategy = Strategy(trans.request, trans.session, Storage, self.config)
strategy.session_set(BACKENDS_NAME[self.config['provider']] + '_state', state_token)
backend = self._load_backend(strategy, self.config['redirect_uri'])
redirect_url = do_complete(
backend,
login=lambda backend, user, social_user: self._login_user(backend, user, social_user),
user=trans.user,
state=state_token)
return redirect_url, self.config.get('user', None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None, association_id=None):
on_the_fly_config(trans.sa_session)
self.config[setting_name('DISCONNECT_REDIRECT_URL')] =\
disconnect_redirect_url if disconnect_redirect_url is not None else ()
strategy = Strategy(trans.request, trans.session, Storage, self.config)
backend = self._load_backend(strategy, self.config['redirect_uri'])
response = do_disconnect(backend, trans.user, association_id)
if isinstance(response, str):
return True, "", response
return response.get('success', False), response.get('message', ""), ""
[docs]class Strategy(BaseStrategy):
[docs] def __init__(self, request, session, storage, config, tpl=None):
self.request = request
self.session = session if session else {}
self.config = config
self.config['SOCIAL_AUTH_REDIRECT_IS_HTTPS'] = True if self.request and self.request.host.startswith('https:') else False
self.config['SOCIAL_AUTH_GOOGLE_OPENIDCONNECT_EXTRA_DATA'] = ['id_token']
super().__init__(storage, tpl)
[docs] def request_data(self, merge=True):
if not self.request:
return {}
if merge:
data = self.request.GET.copy()
data.update(self.request.POST)
elif self.request.method == 'POST':
data = self.request.POST
else:
data = self.request.GET
return data
[docs] def build_absolute_uri(self, path=None):
path = path or ''
if path.startswith('http://') or path.startswith('https://'):
return path
if self.request:
return \
self.request.host +\
'/authnz' + ('/' + self.config.get('provider')) if self.config.get('provider', None) is not None else ''
return path
[docs] def render_html(self, tpl=None, html=None, context=None):
raise NotImplementedError('Not implemented.')
[docs] def start(self):
self.clean_partial_pipeline()
if self.backend.uses_redirect():
return self.redirect(self.backend.auth_url())
else:
return self.html(self.backend.auth_html())
[docs] def continue_pipeline(self, *args, **kwargs):
return self.backend.continue_pipeline(*args, **kwargs)
[docs]class Storage:
user = UserAuthnzToken
nonce = PSANonce
association = PSAAssociation
code = PSACode
partial = PSAPartial
[docs] @classmethod
def is_integrity_error(cls, exception):
return exception.__class__ is IntegrityError
[docs]def on_the_fly_config(sa_session):
PSACode.sa_session = sa_session
UserAuthnzToken.sa_session = sa_session
PSANonce.sa_session = sa_session
PSAPartial.sa_session = sa_session
PSAAssociation.sa_session = sa_session
[docs]def contains_required_data(response=None, is_new=False, **kwargs):
"""
This function is called as part of authentication and authorization
pipeline before user is authenticated or authorized (see AUTH_PIPELINE).
This function asserts if all the data required by Galaxy for a user
is provided. It raises an exception if any of the required data is missing,
and returns void if otherwise.
:type response: dict
:param response: a dictionary containing decoded response from
OIDC backend that contain the following keys
among others:
- id_token; see: http://openid.net/specs/openid-connect-core-1_0.html#IDToken
- access_token; see: https://tools.ietf.org/html/rfc6749#section-1.4
- refresh_token; see: https://tools.ietf.org/html/rfc6749#section-1.5
- token_type; see: https://tools.ietf.org/html/rfc6750#section-6.1.1
- scope; see: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
- expires_in; is the expiration time of the access and ID tokens in seconds since
the response was generated.
:type is_new: bool
:param is_new: has the user been authenticated?
:param kwargs: may contain the following keys among others:
- uid: user ID
- user: Galaxy user; if user is already authenticated
- backend: the backend that is used for user authentication.
- storage: an instance of Storage class.
- strategy: an instance of the Strategy class.
- state: the state code received from identity provider.
- details: details about the user's third-party identity as requested in `scope`.
:rtype: void
:return: Raises an exception if any of the required arguments is missing, and pass if all are given.
"""
hint_msg = "Visit the identity provider's permitted applications page " \
"(e.g., visit `https://myaccount.google.com/u/0/permissions` " \
"for Google), then revoke the access of this Galaxy instance, " \
"and then retry to login. If the problem persists, contact " \
"the Admin of this Galaxy instance."
if response is None or not isinstance(response, dict):
# This can happen only if PSA is not able to decode the `authnz code`
# sent back from the identity provider. PSA internally handles such
# scenarios; however, this case is implemented to prevent uncaught
# server-side errors.
raise MalformedContents(err_msg=f"`response` not found. {hint_msg}")
if not response.get("id_token"):
# This can happen if a non-OIDC compliant backend is used;
# e.g., an OAuth2.0-based backend that only generates access token.
raise MalformedContents(err_msg=f"Missing identity token. {hint_msg}")
if is_new and not response.get("refresh_token"):
# An identity provider (e.g., Google) sends a refresh token the first
# time user consents Galaxy's access (i.e., the first time user logs in
# to a galaxy instance using their credentials with the identity provider).
# There could be variety of scenarios under which a refresh token might
# be missing; e.g., a manipulated Galaxy's database, where a user's records
# from galaxy_user and oidc_user_authnz_tokens tables deleted after the
# user has provided consent. This can also happen under dev efforts.
# The solution is to revoke the consent by visiting the identity provider's
# website, and then retry the login process.
raise MalformedContents(err_msg=f"Missing refresh token. {hint_msg}")
[docs]def verify(strategy=None, response=None, details=None, **kwargs):
provider = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER")
endpoint = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT")
if provider is None or endpoint is None:
# Either the secondary authorization is not configured or OIDC IdP
# is not compatible, so allow user login.
return
if provider.lower() == "gcp":
result = requests.post(
f"https://iam.googleapis.com/v1/projects/-/serviceAccounts/{endpoint}:getIamPolicy",
headers={
'Authorization': 'Bearer {}'.format(response.get("access_token")),
'Accept': 'application/json'})
res = json.loads(result.content)
if result.status_code == requests.codes.ok:
email_addresses = res["bindings"][0]["members"]
email_addresses = [x.lower().replace("user:", "").strip() for x in email_addresses]
if details.get("email") in email_addresses:
# Secondary authorization successful, so allow user login.
pass
else:
raise Exception("Not authorized by GCP IAM.")
else:
# The message of the raised exception is shown to the user; hence,
# the following way of handling exception is better than using
# result.raise_for_status(), since raise_for_status may report
# sensitive information that should not be exposed to users.
raise Exception(res["error"]["message"])
else:
raise Exception(f"`{provider}` is an unsupported secondary authorization provider, contact admin.")
[docs]def allowed_to_disconnect(name=None, user=None, user_storage=None, strategy=None,
backend=None, request=None, details=None, **kwargs):
"""
Disconnect is the process of disassociating a Galaxy user and a third-party authnz.
In other words, it is the process of removing any access and/or ID tokens of a user.
This function should raise an exception if disconnection is NOT permitted. Do NOT
return any value (except an empty dictionary) if disconnect is allowed. Because, at
least until PSA social_core v.1.5.0, any returned value (e.g., Boolean) will result
in ignoring the rest of the disconnect pipeline.
See the following condition in `run_pipeline` function:
https://github.com/python-social-auth/social-core/blob/master/social_core/backends/base.py#L114
:param name: name of the backend (e.g., google-openidconnect)
:type user: galaxy.model.User
:type user_storage: galaxy.model.UserAuthnzToken
:type strategy: galaxy.authnz.psa_authnz.Strategy
:type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect)
:type request: webob.multidict.MultiDict
:type details: dict
:return: empty dict
"""
[docs]def disconnect(name=None, user=None, user_storage=None, strategy=None,
backend=None, request=None, details=None, **kwargs):
"""
Disconnect is the process of disassociating a Galaxy user and a third-party authnz.
In other words, it is the process of removing any access and/or ID tokens of a user.
:param name: name of the backend (e.g., google-openidconnect)
:type user: galaxy.model.User
:type user_storage: galaxy.model.UserAuthnzToken
:type strategy: galaxy.authnz.psa_authnz.Strategy
:type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect)
:type request: webob.multidict.MultiDict
:type details: dict
:return: void or empty dict. Any key-value pair inside the dictionary will be available
inside PSA only, and will be passed to the next step in the disconnect pipeline. However,
the key-value pair will not be returned as a result of calling the `do_disconnect` function.
Additionally, returning any value except for a(n) (empty) dictionary, will break the
disconnect pipeline, and that value will be returned as a result of calling the `do_disconnect` function.
"""
sa_session = user_storage.sa_session
user_authnz = sa_session.query(user_storage).filter(user_storage.table.c.user_id == user.id,
user_storage.table.c.provider == name).first()
if user_authnz is None:
return {'success': False, 'message': 'Not authenticated by any identity providers.'}
# option A
sa_session.delete(user_authnz)
# option B
# user_authnz.extra_data = None
sa_session.flush()