Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.authnz.psa_authnz

import json
import logging
import time

import jwt
import requests
from msal import ConfidentialClientApplication
from social_core.actions import (
    do_auth,
    do_complete,
    do_disconnect,
)
from social_core.backends.utils import get_backend
from social_core.strategy import BaseStrategy
from social_core.utils import (
    module_member,
    setting_name,
)
from sqlalchemy.exc import IntegrityError

from galaxy.exceptions import MalformedContents
from galaxy.model import (
    PSAAssociation,
    PSACode,
    PSANonce,
    PSAPartial,
    UserAuthnzToken,
)
from galaxy.model.base import transaction
from galaxy.util import DEFAULT_SOCKET_TIMEOUT
from . import IdentityProvider

log = logging.getLogger(__name__)

# key: a component name which PSA requests.
# value: is the name of a class associated with that key.
DEFAULTS = {"STRATEGY": "Strategy", "STORAGE": "Storage"}

BACKENDS = {
    "google": "social_core.backends.google_openidconnect.GoogleOpenIdConnect",
    "globus": "social_core.backends.globus.GlobusOpenIdConnect",
    "elixir": "social_core.backends.elixir.ElixirOpenIdConnect",
    "okta": "social_core.backends.okta_openidconnect.OktaOpenIdConnect",
    "azure": "social_core.backends.azuread_tenant.AzureADV2TenantOAuth2",
}

BACKENDS_NAME = {
    "google": "google-openidconnect",
    "globus": "globus",
    "elixir": "elixir",
    "okta": "okta-openidconnect",
    "azure": "azuread-v2-tenant-oauth2",
}

AUTH_PIPELINE = (
    # Get the information we can about the user and return it in a simple
    # format to create the user instance later. On some cases the details are
    # already part of the auth response from the provider, but sometimes this
    # could hit a provider API.
    "social_core.pipeline.social_auth.social_details",
    # Get the social uid from whichever service we're authing thru. The uid is
    # the unique identifier of the given user in the provider.
    "social_core.pipeline.social_auth.social_uid",
    # Verifies that the current auth process is valid within the current
    # project, this is where emails and domains allowlists are applied (if
    # defined).
    "social_core.pipeline.social_auth.auth_allowed",
    # Checks if the decoded response contains all the required fields such
    # as an ID token or a refresh token.
    "galaxy.authnz.psa_authnz.contains_required_data",
    "galaxy.authnz.psa_authnz.verify",
    # Checks if the current social-account is already associated in the site.
    "social_core.pipeline.social_auth.social_user",
    # Make up a username for this person, appends a random string at the end if
    # there's any collision.
    "social_core.pipeline.user.get_username",
    # Send a validation email to the user to verify its email address.
    # 'social_core.pipeline.mail.mail_validation',
    # Associates the current social details with another user account with
    # a similar email address.
    "social_core.pipeline.social_auth.associate_by_email",
    # Create a user account if we haven't found one yet.
    "social_core.pipeline.user.create_user",
    # Create the record that associated the social account with this user.
    "social_core.pipeline.social_auth.associate_user",
    # Populate the extra_data field in the social record with the values
    # specified by settings (and the default ones like access_token, etc).
    "social_core.pipeline.social_auth.load_extra_data",
    # Update the user record with any changed info from the auth service.
    "social_core.pipeline.user.user_details",
)

DISCONNECT_PIPELINE = ("galaxy.authnz.psa_authnz.allowed_to_disconnect", "galaxy.authnz.psa_authnz.disconnect")


[docs]class PSAAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config): self.config = {"provider": provider.lower()} for key, value in oidc_config.items(): self.config[setting_name(key)] = value self.config[setting_name("USER_MODEL")] = "models.User" self.config["SOCIAL_AUTH_PIPELINE"] = AUTH_PIPELINE self.config["DISCONNECT_PIPELINE"] = DISCONNECT_PIPELINE self.config[setting_name("AUTHENTICATION_BACKENDS")] = (BACKENDS[provider],) self.config["VERIFY_SSL"] = oidc_config.get("VERIFY_SSL") self.config["REQUESTS_TIMEOUT"] = oidc_config.get("REQUESTS_TIMEOUT") self.config["ID_TOKEN_MAX_AGE"] = oidc_config.get("ID_TOKEN_MAX_AGE") # The following config sets PSA to call the `_login_user` function for # logging in a user. If this setting is set to false, the `_login_user` # would not be called, and as a result Galaxy would not know who is # the just logged-in user. self.config[setting_name("INACTIVE_USER_LOGIN")] = True if provider in BACKENDS_NAME: self._setup_idp(oidc_backend_config) # Secondary AuthZ with Google identities is currently supported if provider != "google": if "SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER"] if "SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT"]
def _setup_idp(self, oidc_backend_config): self.config[setting_name("AUTH_EXTRA_ARGUMENTS")] = {"access_type": "offline"} self.config["KEY"] = oidc_backend_config.get("client_id") self.config["SECRET"] = oidc_backend_config.get("client_secret") self.config["TENANT_ID"] = oidc_backend_config.get("tenant_id") self.config["redirect_uri"] = oidc_backend_config.get("redirect_uri") self.config["EXTRA_SCOPES"] = oidc_backend_config.get("extra_scopes") if oidc_backend_config.get("prompt") is not None: self.config[setting_name("AUTH_EXTRA_ARGUMENTS")]["prompt"] = oidc_backend_config.get("prompt") if oidc_backend_config.get("api_url") is not None: self.config[setting_name("API_URL")] = oidc_backend_config.get("api_url") if oidc_backend_config.get("url") is not None: self.config[setting_name("URL")] = oidc_backend_config.get("url") def _get_helper(self, name, do_import=False): this_config = self.config.get(setting_name(name), DEFAULTS.get(name, None)) return do_import and module_member(this_config) or this_config def _load_backend(self, strategy, redirect_uri): backends = self._get_helper("AUTHENTICATION_BACKENDS") backend = get_backend(backends, BACKENDS_NAME[self.config["provider"]]) return backend(strategy, redirect_uri) def _login_user(self, backend, user, social_user): self.config["user"] = user
[docs] def refresh_azure(self, user_authnz_token): logging.getLogger("msal").setLevel(logging.WARN) old_extra_data = user_authnz_token.extra_data app = ConfidentialClientApplication( self.config["KEY"], self.config["SECRET"], authority="https://login.microsoftonline.com/" + self.config["TENANT_ID"], ) extra_data = app.acquire_token_by_refresh_token( old_extra_data["refresh_token"], scopes=["https://graph.microsoft.com/.default"] ) decoded_token = jwt.decode(extra_data["id_token"], options={"verify_signature": False}) if "auth_time" not in extra_data: extra_data["auth_time"] = decoded_token["iat"] expires = decoded_token["exp"] extra_data["expires"] = int(expires - time.time()) user_authnz_token.set_extra_data(extra_data)
[docs] def refresh(self, trans, user_authnz_token): if not user_authnz_token or not user_authnz_token.extra_data: return False # refresh tokens if they reached their half lifetime if "expires" in user_authnz_token.extra_data: expires = user_authnz_token.extra_data["expires"] elif "expires_in" in user_authnz_token.extra_data: expires = user_authnz_token.extra_data["expires_in"] else: log.debug("No `expires` or `expires_in` key found in token extra data, cannot refresh") return False if int(user_authnz_token.extra_data["auth_time"]) + int(expires) / 2 <= int(time.time()): on_the_fly_config(trans.sa_session) if self.config["provider"] == "azure": self.refresh_azure(user_authnz_token) else: strategy = Strategy(trans.request, trans.session, Storage, self.config) user_authnz_token.refresh_token(strategy) return True return False
[docs] def authenticate(self, trans): on_the_fly_config(trans.sa_session) strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config["redirect_uri"]) if ( backend.name is BACKENDS_NAME["google"] and "SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config and "SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config ): backend.DEFAULT_SCOPE.append("https://www.googleapis.com/auth/cloud-platform") if self.config["EXTRA_SCOPES"] is not None: backend.DEFAULT_SCOPE.extend(self.config["EXTRA_SCOPES"]) return do_auth(backend)
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url): on_the_fly_config(trans.sa_session) self.config[setting_name("LOGIN_REDIRECT_URL")] = login_redirect_url strategy = Strategy(trans.request, trans.session, Storage, self.config) strategy.session_set(f"{BACKENDS_NAME[self.config['provider']]}_state", state_token) backend = self._load_backend(strategy, self.config["redirect_uri"]) redirect_url = do_complete( backend, login=lambda backend, user, social_user: self._login_user(backend, user, social_user), user=trans.user, state=state_token, ) return redirect_url, self.config.get("user", None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None, association_id=None): on_the_fly_config(trans.sa_session) self.config[setting_name("DISCONNECT_REDIRECT_URL")] = ( disconnect_redirect_url if disconnect_redirect_url is not None else () ) strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config["redirect_uri"]) response = do_disconnect(backend, trans.user, association_id) if isinstance(response, str): return True, "", response return response.get("success", False), response.get("message", ""), ""
[docs]class Strategy(BaseStrategy):
[docs] def __init__(self, request, session, storage, config, tpl=None): self.request = request self.session = session if session else {} self.config = config self.config["SOCIAL_AUTH_REDIRECT_IS_HTTPS"] = ( True if self.request and self.request.host.startswith("https:") else False ) self.config["SOCIAL_AUTH_GOOGLE_OPENIDCONNECT_EXTRA_DATA"] = ["id_token"] super().__init__(storage, tpl)
[docs] def get_setting(self, name): return self.config[name]
[docs] def session_get(self, name, default=None): return self.session.get(name, default)
[docs] def session_set(self, name, value): self.session[name] = value
[docs] def session_pop(self, name): raise NotImplementedError("Not implemented.")
[docs] def request_data(self, merge=True): if not self.request: return {} if merge: data = self.request.GET.copy() data.update(self.request.POST) elif self.request.method == "POST": data = self.request.POST else: data = self.request.GET return data
[docs] def request_host(self): if self.request: return self.request.host
[docs] def build_absolute_uri(self, path=None): path = path or "" if path.startswith("http://") or path.startswith("https://"): return path if self.request: return ( self.request.host + "/authnz" + ("/" + self.config.get("provider")) if self.config.get("provider", None) is not None else "" ) return path
[docs] def redirect(self, url): return url
[docs] def html(self, content): raise NotImplementedError("Not implemented.")
[docs] def render_html(self, tpl=None, html=None, context=None): raise NotImplementedError("Not implemented.")
[docs] def start(self): self.clean_partial_pipeline() if self.backend.uses_redirect(): return self.redirect(self.backend.auth_url()) else: return self.html(self.backend.auth_html())
[docs] def complete(self, *args, **kwargs): return self.backend.auth_complete(*args, **kwargs)
[docs] def continue_pipeline(self, *args, **kwargs): return self.backend.continue_pipeline(*args, **kwargs)
[docs]class Storage: user = UserAuthnzToken nonce = PSANonce association = PSAAssociation code = PSACode partial = PSAPartial
[docs] @classmethod def is_integrity_error(cls, exception): return exception.__class__ is IntegrityError
[docs]def on_the_fly_config(sa_session): PSACode.sa_session = sa_session UserAuthnzToken.sa_session = sa_session PSANonce.sa_session = sa_session PSAPartial.sa_session = sa_session PSAAssociation.sa_session = sa_session
[docs]def contains_required_data(response=None, is_new=False, **kwargs): """ This function is called as part of authentication and authorization pipeline before user is authenticated or authorized (see AUTH_PIPELINE). This function asserts if all the data required by Galaxy for a user is provided. It raises an exception if any of the required data is missing, and returns void if otherwise. :type response: dict :param response: a dictionary containing decoded response from OIDC backend that contain the following keys among others: - id_token; see: http://openid.net/specs/openid-connect-core-1_0.html#IDToken - access_token; see: https://tools.ietf.org/html/rfc6749#section-1.4 - refresh_token; see: https://tools.ietf.org/html/rfc6749#section-1.5 - token_type; see: https://tools.ietf.org/html/rfc6750#section-6.1.1 - scope; see: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest - expires_in; is the expiration time of the access and ID tokens in seconds since the response was generated. :type is_new: bool :param is_new: has the user been authenticated? :param kwargs: may contain the following keys among others: - uid: user ID - user: Galaxy user; if user is already authenticated - backend: the backend that is used for user authentication. - storage: an instance of Storage class. - strategy: an instance of the Strategy class. - state: the state code received from identity provider. - details: details about the user's third-party identity as requested in `scope`. :rtype: void :return: Raises an exception if any of the required arguments is missing, and pass if all are given. """ hint_msg = ( "Visit the identity provider's permitted applications page " "(e.g., visit `https://myaccount.google.com/u/0/permissions` " "for Google), then revoke the access of this Galaxy instance, " "and then retry to login. If the problem persists, contact " "the Admin of this Galaxy instance." ) if response is None or not isinstance(response, dict): # This can happen only if PSA is not able to decode the `authnz code` # sent back from the identity provider. PSA internally handles such # scenarios; however, this case is implemented to prevent uncaught # server-side errors. raise MalformedContents(err_msg=f"`response` not found. {hint_msg}") if not response.get("id_token"): # This can happen if a non-OIDC compliant backend is used; # e.g., an OAuth2.0-based backend that only generates access token. raise MalformedContents(err_msg=f"Missing identity token. {hint_msg}") if is_new and not response.get("refresh_token"): # An identity provider (e.g., Google) sends a refresh token the first # time user consents Galaxy's access (i.e., the first time user logs in # to a galaxy instance using their credentials with the identity provider). # There could be variety of scenarios under which a refresh token might # be missing; e.g., a manipulated Galaxy's database, where a user's records # from galaxy_user and oidc_user_authnz_tokens tables deleted after the # user has provided consent. This can also happen under dev efforts. # The solution is to revoke the consent by visiting the identity provider's # website, and then retry the login process. raise MalformedContents(err_msg=f"Missing refresh token. {hint_msg}")
[docs]def verify(strategy=None, response=None, details=None, **kwargs): provider = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER") endpoint = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT") if provider is None or endpoint is None: # Either the secondary authorization is not configured or OIDC IdP # is not compatible, so allow user login. return if provider.lower() == "gcp": result = requests.post( f"https://iam.googleapis.com/v1/projects/-/serviceAccounts/{endpoint}:getIamPolicy", headers={ "Authorization": f"Bearer {response.get('access_token')}", "Accept": "application/json", }, timeout=DEFAULT_SOCKET_TIMEOUT, ) res = json.loads(result.content) if result.status_code == requests.codes.ok: email_addresses = res["bindings"][0]["members"] email_addresses = [x.lower().replace("user:", "").strip() for x in email_addresses] if details.get("email") in email_addresses: # Secondary authorization successful, so allow user login. pass else: raise Exception("Not authorized by GCP IAM.") else: # The message of the raised exception is shown to the user; hence, # the following way of handling exception is better than using # result.raise_for_status(), since raise_for_status may report # sensitive information that should not be exposed to users. raise Exception(res["error"]["message"]) else: raise Exception(f"`{provider}` is an unsupported secondary authorization provider, contact admin.")
[docs]def allowed_to_disconnect( name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs ): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. This function should raise an exception if disconnection is NOT permitted. Do NOT return any value (except an empty dictionary) if disconnect is allowed. Because, at least until PSA social_core v.1.5.0, any returned value (e.g., Boolean) will result in ignoring the rest of the disconnect pipeline. See the following condition in `run_pipeline` function: https://github.com/python-social-auth/social-core/blob/master/social_core/backends/base.py#L114 :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: empty dict """
[docs]def disconnect( name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs ): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: void or empty dict. Any key-value pair inside the dictionary will be available inside PSA only, and will be passed to the next step in the disconnect pipeline. However, the key-value pair will not be returned as a result of calling the `do_disconnect` function. Additionally, returning any value except for a(n) (empty) dictionary, will break the disconnect pipeline, and that value will be returned as a result of calling the `do_disconnect` function. """ sa_session = user_storage.sa_session user_authnz = ( sa_session.query(user_storage) .filter(user_storage.table.c.user_id == user.id, user_storage.table.c.provider == name) .first() ) if user_authnz is None: return {"success": False, "message": "Not authenticated by any identity providers."} # option A sa_session.delete(user_authnz) # option B # user_authnz.extra_data = None with transaction(sa_session): sa_session.commit()