Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.authnz.psa_authnz

import json

import requests
from social_core.actions import do_auth, do_complete, do_disconnect
from social_core.backends.utils import get_backend
from social_core.strategy import BaseStrategy
from social_core.utils import module_member, setting_name
from sqlalchemy.exc import IntegrityError

from galaxy.exceptions import MalformedContents
from ..authnz import IdentityProvider
from ..model import PSAAssociation, PSACode, PSANonce, PSAPartial, UserAuthnzToken


# key: a component name which PSA requests.
# value: is the name of a class associated with that key.
DEFAULTS = {
    'STRATEGY': 'Strategy',
    'STORAGE': 'Storage'
}

BACKENDS = {
    'google': 'social_core.backends.google_openidconnect.GoogleOpenIdConnect',
    'globus': 'social_core.backends.globus.GlobusOpenIdConnect',
    'elixir': 'social_core.backends.elixir.ElixirOpenIdConnect',
    'okta': 'social_core.backends.okta_openidconnect.OktaOpenIdConnect'
}

BACKENDS_NAME = {
    'google': 'google-openidconnect',
    'globus': 'globus',
    'elixir': 'elixir',
    'okta': 'okta-openidconnect'
}

AUTH_PIPELINE = (
    # Get the information we can about the user and return it in a simple
    # format to create the user instance later. On some cases the details are
    # already part of the auth response from the provider, but sometimes this
    # could hit a provider API.
    'social_core.pipeline.social_auth.social_details',

    # Get the social uid from whichever service we're authing thru. The uid is
    # the unique identifier of the given user in the provider.
    'social_core.pipeline.social_auth.social_uid',

    # Verifies that the current auth process is valid within the current
    # project, this is where emails and domains allowlists are applied (if
    # defined).
    'social_core.pipeline.social_auth.auth_allowed',

    # Checks if the decoded response contains all the required fields such
    # as an ID token or a refresh token.
    'galaxy.authnz.psa_authnz.contains_required_data',

    'galaxy.authnz.psa_authnz.verify',

    # Checks if the current social-account is already associated in the site.
    'social_core.pipeline.social_auth.social_user',

    # Make up a username for this person, appends a random string at the end if
    # there's any collision.
    'social_core.pipeline.user.get_username',

    # Send a validation email to the user to verify its email address.
    # 'social_core.pipeline.mail.mail_validation',

    # Associates the current social details with another user account with
    # a similar email address.
    'social_core.pipeline.social_auth.associate_by_email',

    # Create a user account if we haven't found one yet.
    'social_core.pipeline.user.create_user',

    # Create the record that associated the social account with this user.
    'social_core.pipeline.social_auth.associate_user',

    # Populate the extra_data field in the social record with the values
    # specified by settings (and the default ones like access_token, etc).
    'social_core.pipeline.social_auth.load_extra_data',

    # Update the user record with any changed info from the auth service.
    'social_core.pipeline.user.user_details'
)

DISCONNECT_PIPELINE = (
    'galaxy.authnz.psa_authnz.allowed_to_disconnect',
    'galaxy.authnz.psa_authnz.disconnect'
)


[docs]class PSAAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config): self.config = {'provider': provider.lower()} for key, value in oidc_config.items(): self.config[setting_name(key)] = value self.config[setting_name('USER_MODEL')] = 'models.User' self.config['SOCIAL_AUTH_PIPELINE'] = AUTH_PIPELINE self.config['DISCONNECT_PIPELINE'] = DISCONNECT_PIPELINE self.config[setting_name('AUTHENTICATION_BACKENDS')] = (BACKENDS[provider],) self.config["VERIFY_SSL"] = oidc_config.get("VERIFY_SSL") self.config["REQUESTS_TIMEOUT"] = oidc_config.get("REQUESTS_TIMEOUT") self.config["ID_TOKEN_MAX_AGE"] = oidc_config.get("ID_TOKEN_MAX_AGE") # The following config sets PSA to call the `_login_user` function for # logging in a user. If this setting is set to false, the `_login_user` # would not be called, and as a result Galaxy would not know who is # the just logged-in user. self.config[setting_name('INACTIVE_USER_LOGIN')] = True if provider in BACKENDS_NAME: self._setup_idp(oidc_backend_config) # Secondary AuthZ with Google identities is currently supported if provider != "google": if 'SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER' in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER"] if 'SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT' in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT"]
def _setup_idp(self, oidc_backend_config): self.config[setting_name('AUTH_EXTRA_ARGUMENTS')] = {'access_type': 'offline'} self.config['KEY'] = oidc_backend_config.get('client_id') self.config['SECRET'] = oidc_backend_config.get('client_secret') self.config['redirect_uri'] = oidc_backend_config.get('redirect_uri') if oidc_backend_config.get('prompt') is not None: self.config[setting_name('AUTH_EXTRA_ARGUMENTS')]['prompt'] = oidc_backend_config.get('prompt') if oidc_backend_config.get('api_url') is not None: self.config[setting_name('API_URL')] = oidc_backend_config.get('api_url') if oidc_backend_config.get('url') is not None: self.config[setting_name('URL')] = oidc_backend_config.get('url') def _get_helper(self, name, do_import=False): this_config = self.config.get(setting_name(name), DEFAULTS.get(name, None)) return do_import and module_member(this_config) or this_config def _load_backend(self, strategy, redirect_uri): backends = self._get_helper('AUTHENTICATION_BACKENDS') backend = get_backend(backends, BACKENDS_NAME[self.config['provider']]) return backend(strategy, redirect_uri) def _login_user(self, backend, user, social_user): self.config['user'] = user
[docs] def authenticate(self, trans): on_the_fly_config(trans.sa_session) strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config['redirect_uri']) if backend.name is BACKENDS_NAME["google"] and \ "SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config and \ "SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config: backend.DEFAULT_SCOPE.append("https://www.googleapis.com/auth/cloud-platform") return do_auth(backend)
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url): on_the_fly_config(trans.sa_session) self.config[setting_name('LOGIN_REDIRECT_URL')] = login_redirect_url strategy = Strategy(trans.request, trans.session, Storage, self.config) strategy.session_set(BACKENDS_NAME[self.config['provider']] + '_state', state_token) backend = self._load_backend(strategy, self.config['redirect_uri']) redirect_url = do_complete( backend, login=lambda backend, user, social_user: self._login_user(backend, user, social_user), user=trans.user, state=state_token) return redirect_url, self.config.get('user', None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None, association_id=None): on_the_fly_config(trans.sa_session) self.config[setting_name('DISCONNECT_REDIRECT_URL')] =\ disconnect_redirect_url if disconnect_redirect_url is not None else () strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config['redirect_uri']) response = do_disconnect(backend, trans.user, association_id) if isinstance(response, str): return True, "", response return response.get('success', False), response.get('message', ""), ""
[docs]class Strategy(BaseStrategy):
[docs] def __init__(self, request, session, storage, config, tpl=None): self.request = request self.session = session if session else {} self.config = config self.config['SOCIAL_AUTH_REDIRECT_IS_HTTPS'] = True if self.request and self.request.host.startswith('https:') else False self.config['SOCIAL_AUTH_GOOGLE_OPENIDCONNECT_EXTRA_DATA'] = ['id_token'] super().__init__(storage, tpl)
[docs] def get_setting(self, name): return self.config[name]
[docs] def session_get(self, name, default=None): return self.session.get(name, default)
[docs] def session_set(self, name, value): self.session[name] = value
[docs] def session_pop(self, name): raise NotImplementedError('Not implemented.')
[docs] def request_data(self, merge=True): if not self.request: return {} if merge: data = self.request.GET.copy() data.update(self.request.POST) elif self.request.method == 'POST': data = self.request.POST else: data = self.request.GET return data
[docs] def request_host(self): if self.request: return self.request.host
[docs] def build_absolute_uri(self, path=None): path = path or '' if path.startswith('http://') or path.startswith('https://'): return path if self.request: return \ self.request.host +\ '/authnz' + ('/' + self.config.get('provider')) if self.config.get('provider', None) is not None else '' return path
[docs] def redirect(self, url): return url
[docs] def html(self, content): raise NotImplementedError('Not implemented.')
[docs] def render_html(self, tpl=None, html=None, context=None): raise NotImplementedError('Not implemented.')
[docs] def start(self): self.clean_partial_pipeline() if self.backend.uses_redirect(): return self.redirect(self.backend.auth_url()) else: return self.html(self.backend.auth_html())
[docs] def complete(self, *args, **kwargs): return self.backend.auth_complete(*args, **kwargs)
[docs] def continue_pipeline(self, *args, **kwargs): return self.backend.continue_pipeline(*args, **kwargs)
[docs]class Storage: user = UserAuthnzToken nonce = PSANonce association = PSAAssociation code = PSACode partial = PSAPartial
[docs] @classmethod def is_integrity_error(cls, exception): return exception.__class__ is IntegrityError
[docs]def on_the_fly_config(sa_session): PSACode.sa_session = sa_session UserAuthnzToken.sa_session = sa_session PSANonce.sa_session = sa_session PSAPartial.sa_session = sa_session PSAAssociation.sa_session = sa_session
[docs]def contains_required_data(response=None, is_new=False, **kwargs): """ This function is called as part of authentication and authorization pipeline before user is authenticated or authorized (see AUTH_PIPELINE). This function asserts if all the data required by Galaxy for a user is provided. It raises an exception if any of the required data is missing, and returns void if otherwise. :type response: dict :param response: a dictionary containing decoded response from OIDC backend that contain the following keys among others: - id_token; see: http://openid.net/specs/openid-connect-core-1_0.html#IDToken - access_token; see: https://tools.ietf.org/html/rfc6749#section-1.4 - refresh_token; see: https://tools.ietf.org/html/rfc6749#section-1.5 - token_type; see: https://tools.ietf.org/html/rfc6750#section-6.1.1 - scope; see: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest - expires_in; is the expiration time of the access and ID tokens in seconds since the response was generated. :type is_new: bool :param is_new: has the user been authenticated? :param kwargs: may contain the following keys among others: - uid: user ID - user: Galaxy user; if user is already authenticated - backend: the backend that is used for user authentication. - storage: an instance of Storage class. - strategy: an instance of the Strategy class. - state: the state code received from identity provider. - details: details about the user's third-party identity as requested in `scope`. :rtype: void :return: Raises an exception if any of the required arguments is missing, and pass if all are given. """ hint_msg = "Visit the identity provider's permitted applications page " \ "(e.g., visit `https://myaccount.google.com/u/0/permissions` " \ "for Google), then revoke the access of this Galaxy instance, " \ "and then retry to login. If the problem persists, contact " \ "the Admin of this Galaxy instance." if response is None or not isinstance(response, dict): # This can happen only if PSA is not able to decode the `authnz code` # sent back from the identity provider. PSA internally handles such # scenarios; however, this case is implemented to prevent uncaught # server-side errors. raise MalformedContents(err_msg="`response` not found. {}".format(hint_msg)) if not response.get("id_token"): # This can happen if a non-OIDC compliant backend is used; # e.g., an OAuth2.0-based backend that only generates access token. raise MalformedContents(err_msg="Missing identity token. {}".format(hint_msg)) if is_new and not response.get("refresh_token"): # An identity provider (e.g., Google) sends a refresh token the first # time user consents Galaxy's access (i.e., the first time user logs in # to a galaxy instance using their credentials with the identity provider). # There could be variety of scenarios under which a refresh token might # be missing; e.g., a manipulated Galaxy's database, where a user's records # from galaxy_user and oidc_user_authnz_tokens tables deleted after the # user has provided consent. This can also happen under dev efforts. # The solution is to revoke the consent by visiting the identity provider's # website, and then retry the login process. raise MalformedContents(err_msg="Missing refresh token. {}".format(hint_msg))
[docs]def verify(strategy=None, response=None, details=None, **kwargs): provider = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER") endpoint = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT") if provider is None or endpoint is None: # Either the secondary authorization is not configured or OIDC IdP # is not compatible, so allow user login. return if provider.lower() == "gcp": result = requests.post( "https://iam.googleapis.com/v1/projects/-/serviceAccounts/{}:getIamPolicy".format(endpoint), headers={ 'Authorization': 'Bearer {}'.format(response.get("access_token")), 'Accept': 'application/json'}) res = json.loads(result.content) if result.status_code == requests.codes.ok: email_addresses = res["bindings"][0]["members"] email_addresses = [x.lower().replace("user:", "").strip() for x in email_addresses] if details.get("email") in email_addresses: # Secondary authorization successful, so allow user login. pass else: raise Exception("Not authorized by GCP IAM.") else: # The message of the raised exception is shown to the user; hence, # the following way of handling exception is better than using # result.raise_for_status(), since raise_for_status may report # sensitive information that should not be exposed to users. raise Exception(res["error"]["message"]) else: raise Exception("`{}` is an unsupported secondary authorization provider, contact admin.".format(provider))
[docs]def allowed_to_disconnect(name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. This function should raise an exception if disconnection is NOT permitted. Do NOT return any value (except an empty dictionary) if disconnect is allowed. Because, at least until PSA social_core v.1.5.0, any returned value (e.g., Boolean) will result in ignoring the rest of the disconnect pipeline. See the following condition in `run_pipeline` function: https://github.com/python-social-auth/social-core/blob/master/social_core/backends/base.py#L114 :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: empty dict """
[docs]def disconnect(name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: void or empty dict. Any key-value pair inside the dictionary will be available inside PSA only, and will be passed to the next step in the disconnect pipeline. However, the key-value pair will not be returned as a result of calling the `do_disconnect` function. Additionally, returning any value except for a(n) (empty) dictionary, will break the disconnect pipeline, and that value will be returned as a result of calling the `do_disconnect` function. """ sa_session = user_storage.sa_session user_authnz = sa_session.query(user_storage).filter(user_storage.table.c.user_id == user.id, user_storage.table.c.provider == name).first() if user_authnz is None: return {'success': False, 'message': 'Not authenticated by any identity providers.'} # option A sa_session.delete(user_authnz) # option B # user_authnz.extra_data = None sa_session.flush()