Source code for galaxy.authnz.psa_authnz

import json
import logging
import time

import jwt
from msal import ConfidentialClientApplication
from social_core.actions import (
    do_auth,
    do_complete,
    do_disconnect,
)
from social_core.backends.utils import get_backend
from social_core.strategy import BaseStrategy
from social_core.utils import (
    module_member,
    setting_name,
)
from sqlalchemy.exc import IntegrityError

from galaxy.exceptions import MalformedContents
from galaxy.model import (
    PSAAssociation,
    PSACode,
    PSANonce,
    PSAPartial,
    UserAuthnzToken,
)
from galaxy.model.base import transaction
from galaxy.util import (
    DEFAULT_SOCKET_TIMEOUT,
    requests,
)
from . import IdentityProvider

log = logging.getLogger(__name__)

# key: a component name which PSA requests.
# value: is the name of a class associated with that key.
DEFAULTS = {"STRATEGY": "Strategy", "STORAGE": "Storage"}

BACKENDS = {
    "google": "social_core.backends.google_openidconnect.GoogleOpenIdConnect",
    "globus": "social_core.backends.globus.GlobusOpenIdConnect",
    "elixir": "social_core.backends.elixir.ElixirOpenIdConnect",
    "okta": "social_core.backends.okta_openidconnect.OktaOpenIdConnect",
    "azure": "social_core.backends.azuread_tenant.AzureADV2TenantOAuth2",
    "egi_checkin": "social_core.backends.egi_checkin.EGICheckinOpenIdConnect",
}

BACKENDS_NAME = {
    "google": "google-openidconnect",
    "globus": "globus",
    "elixir": "elixir",
    "okta": "okta-openidconnect",
    "azure": "azuread-v2-tenant-oauth2",
    "egi_checkin": "egi-checkin",
}

AUTH_PIPELINE = (
    # Get the information we can about the user and return it in a simple
    # format to create the user instance later. On some cases the details are
    # already part of the auth response from the provider, but sometimes this
    # could hit a provider API.
    "social_core.pipeline.social_auth.social_details",
    # Get the social uid from whichever service we're authing thru. The uid is
    # the unique identifier of the given user in the provider.
    "social_core.pipeline.social_auth.social_uid",
    # Verifies that the current auth process is valid within the current
    # project, this is where emails and domains allowlists are applied (if
    # defined).
    "social_core.pipeline.social_auth.auth_allowed",
    # Checks if the decoded response contains all the required fields such
    # as an ID token or a refresh token.
    "galaxy.authnz.psa_authnz.contains_required_data",
    "galaxy.authnz.psa_authnz.verify",
    # Checks if the current social-account is already associated in the site.
    "social_core.pipeline.social_auth.social_user",
    # Make up a username for this person, appends a random string at the end if
    # there's any collision.
    "social_core.pipeline.user.get_username",
    # Send a validation email to the user to verify its email address.
    # 'social_core.pipeline.mail.mail_validation',
    # Associates the current social details with another user account with
    # a similar email address.
    "social_core.pipeline.social_auth.associate_by_email",
    # Create a user account if we haven't found one yet.
    "social_core.pipeline.user.create_user",
    # Create the record that associated the social account with this user.
    "social_core.pipeline.social_auth.associate_user",
    # Populate the extra_data field in the social record with the values
    # specified by settings (and the default ones like access_token, etc).
    "social_core.pipeline.social_auth.load_extra_data",
    # Update the user record with any changed info from the auth service.
    "social_core.pipeline.user.user_details",
)

DISCONNECT_PIPELINE = ("galaxy.authnz.psa_authnz.allowed_to_disconnect", "galaxy.authnz.psa_authnz.disconnect")


[docs]class PSAAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config): self.config = {"provider": provider.lower()} for key, value in oidc_config.items(): self.config[setting_name(key)] = value self.config[setting_name("USER_MODEL")] = "models.User" self.config["SOCIAL_AUTH_PIPELINE"] = AUTH_PIPELINE self.config["DISCONNECT_PIPELINE"] = DISCONNECT_PIPELINE self.config[setting_name("AUTHENTICATION_BACKENDS")] = (BACKENDS[provider],) self.config["VERIFY_SSL"] = oidc_config.get("VERIFY_SSL") self.config["REQUESTS_TIMEOUT"] = oidc_config.get("REQUESTS_TIMEOUT") self.config["ID_TOKEN_MAX_AGE"] = oidc_config.get("ID_TOKEN_MAX_AGE") # The following config sets PSA to call the `_login_user` function for # logging in a user. If this setting is set to false, the `_login_user` # would not be called, and as a result Galaxy would not know who is # the just logged-in user. self.config[setting_name("INACTIVE_USER_LOGIN")] = True if provider in BACKENDS_NAME: self._setup_idp(oidc_backend_config) # Secondary AuthZ with Google identities is currently supported if provider != "google": if "SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER"] if "SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config: del self.config["SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT"]
def _setup_idp(self, oidc_backend_config): self.config[setting_name("AUTH_EXTRA_ARGUMENTS")] = {"access_type": "offline"} self.config["KEY"] = oidc_backend_config.get("client_id") self.config["SECRET"] = oidc_backend_config.get("client_secret") self.config["TENANT_ID"] = oidc_backend_config.get("tenant_id") self.config["redirect_uri"] = oidc_backend_config.get("redirect_uri") self.config["EXTRA_SCOPES"] = oidc_backend_config.get("extra_scopes") if oidc_backend_config.get("prompt") is not None: self.config[setting_name("AUTH_EXTRA_ARGUMENTS")]["prompt"] = oidc_backend_config.get("prompt") if oidc_backend_config.get("api_url") is not None: self.config[setting_name("API_URL")] = oidc_backend_config.get("api_url") if oidc_backend_config.get("url") is not None: self.config[setting_name("URL")] = oidc_backend_config.get("url") def _get_helper(self, name, do_import=False): this_config = self.config.get(setting_name(name), DEFAULTS.get(name, None)) return do_import and module_member(this_config) or this_config def _load_backend(self, strategy, redirect_uri): backends = self._get_helper("AUTHENTICATION_BACKENDS") backend = get_backend(backends, BACKENDS_NAME[self.config["provider"]]) return backend(strategy, redirect_uri) def _login_user(self, backend, user, social_user): self.config["user"] = user
[docs] def refresh_azure(self, user_authnz_token): logging.getLogger("msal").setLevel(logging.WARN) old_extra_data = user_authnz_token.extra_data app = ConfidentialClientApplication( self.config["KEY"], self.config["SECRET"], authority="https://login.microsoftonline.com/" + self.config["TENANT_ID"], ) extra_data = app.acquire_token_by_refresh_token( old_extra_data["refresh_token"], scopes=["https://graph.microsoft.com/.default"] ) decoded_token = jwt.decode(extra_data["id_token"], options={"verify_signature": False}) if "auth_time" not in extra_data: extra_data["auth_time"] = decoded_token["iat"] expires = decoded_token["exp"] extra_data["expires"] = int(expires - time.time()) user_authnz_token.set_extra_data(extra_data)
[docs] def refresh(self, trans, user_authnz_token): if not user_authnz_token or not user_authnz_token.extra_data: return False # refresh tokens if they reached their half lifetime if "expires" in user_authnz_token.extra_data: expires = user_authnz_token.extra_data["expires"] elif "expires_in" in user_authnz_token.extra_data: expires = user_authnz_token.extra_data["expires_in"] else: log.debug("No `expires` or `expires_in` key found in token extra data, cannot refresh") return False if int(user_authnz_token.extra_data["auth_time"]) + int(expires) / 2 <= int(time.time()): on_the_fly_config(trans.sa_session) if self.config["provider"] == "azure": self.refresh_azure(user_authnz_token) else: strategy = Strategy(trans.request, trans.session, Storage, self.config) user_authnz_token.refresh_token(strategy) return True return False
[docs] def authenticate(self, trans): on_the_fly_config(trans.sa_session) strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config["redirect_uri"]) if ( backend.name is BACKENDS_NAME["google"] and "SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER" in self.config and "SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT" in self.config ): backend.DEFAULT_SCOPE.append("https://www.googleapis.com/auth/cloud-platform") if self.config["EXTRA_SCOPES"] is not None: backend.DEFAULT_SCOPE.extend(self.config["EXTRA_SCOPES"]) return do_auth(backend)
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url): on_the_fly_config(trans.sa_session) self.config[setting_name("LOGIN_REDIRECT_URL")] = login_redirect_url strategy = Strategy(trans.request, trans.session, Storage, self.config) strategy.session_set(f"{BACKENDS_NAME[self.config['provider']]}_state", state_token) backend = self._load_backend(strategy, self.config["redirect_uri"]) redirect_url = do_complete( backend, login=lambda backend, user, social_user: self._login_user(backend, user, social_user), user=trans.user, state=state_token, ) return redirect_url, self.config.get("user", None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None, association_id=None): on_the_fly_config(trans.sa_session) self.config[setting_name("DISCONNECT_REDIRECT_URL")] = ( disconnect_redirect_url if disconnect_redirect_url is not None else () ) strategy = Strategy(trans.request, trans.session, Storage, self.config) backend = self._load_backend(strategy, self.config["redirect_uri"]) response = do_disconnect(backend, trans.user, association_id) if isinstance(response, str): return True, "", response return response.get("success", False), response.get("message", ""), ""
[docs]class Strategy(BaseStrategy):
[docs] def __init__(self, request, session, storage, config, tpl=None): self.request = request self.session = session if session else {} self.config = config self.config["SOCIAL_AUTH_REDIRECT_IS_HTTPS"] = ( True if self.request and self.request.host.startswith("https:") else False ) self.config["SOCIAL_AUTH_GOOGLE_OPENIDCONNECT_EXTRA_DATA"] = ["id_token"] super().__init__(storage, tpl)
[docs] def get_setting(self, name): return self.config[name]
[docs] def session_get(self, name, default=None): return self.session.get(name, default)
[docs] def session_set(self, name, value): self.session[name] = value
[docs] def session_pop(self, name): raise NotImplementedError("Not implemented.")
[docs] def request_data(self, merge=True): if not self.request: return {} if merge: data = self.request.GET.copy() data.update(self.request.POST) elif self.request.method == "POST": data = self.request.POST else: data = self.request.GET return data
[docs] def request_host(self): if self.request: return self.request.host
[docs] def build_absolute_uri(self, path=None): path = path or "" if path.startswith("http://") or path.startswith("https://"): return path if self.request: return ( self.request.host + "/authnz" + ("/" + self.config.get("provider")) if self.config.get("provider", None) is not None else "" ) return path
[docs] def redirect(self, url): return url
[docs] def html(self, content): raise NotImplementedError("Not implemented.")
[docs] def render_html(self, tpl=None, html=None, context=None): raise NotImplementedError("Not implemented.")
[docs] def start(self): self.clean_partial_pipeline() if self.backend.uses_redirect(): return self.redirect(self.backend.auth_url()) else: return self.html(self.backend.auth_html())
[docs] def complete(self, *args, **kwargs): return self.backend.auth_complete(*args, **kwargs)
[docs] def continue_pipeline(self, *args, **kwargs): return self.backend.continue_pipeline(*args, **kwargs)
[docs]class Storage: user = UserAuthnzToken nonce = PSANonce association = PSAAssociation code = PSACode partial = PSAPartial
[docs] @classmethod def is_integrity_error(cls, exception): return exception.__class__ is IntegrityError
[docs]def on_the_fly_config(sa_session): PSACode.sa_session = sa_session UserAuthnzToken.sa_session = sa_session PSANonce.sa_session = sa_session PSAPartial.sa_session = sa_session PSAAssociation.sa_session = sa_session
[docs]def contains_required_data(response=None, is_new=False, **kwargs): """ This function is called as part of authentication and authorization pipeline before user is authenticated or authorized (see AUTH_PIPELINE). This function asserts if all the data required by Galaxy for a user is provided. It raises an exception if any of the required data is missing, and returns void if otherwise. :type response: dict :param response: a dictionary containing decoded response from OIDC backend that contain the following keys among others: - id_token; see: http://openid.net/specs/openid-connect-core-1_0.html#IDToken - access_token; see: https://tools.ietf.org/html/rfc6749#section-1.4 - refresh_token; see: https://tools.ietf.org/html/rfc6749#section-1.5 - token_type; see: https://tools.ietf.org/html/rfc6750#section-6.1.1 - scope; see: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest - expires_in; is the expiration time of the access and ID tokens in seconds since the response was generated. :type is_new: bool :param is_new: has the user been authenticated? :param kwargs: may contain the following keys among others: - uid: user ID - user: Galaxy user; if user is already authenticated - backend: the backend that is used for user authentication. - storage: an instance of Storage class. - strategy: an instance of the Strategy class. - state: the state code received from identity provider. - details: details about the user's third-party identity as requested in `scope`. :rtype: void :return: Raises an exception if any of the required arguments is missing, and pass if all are given. """ hint_msg = ( "Visit the identity provider's permitted applications page " "(e.g., visit `https://myaccount.google.com/u/0/permissions` " "for Google), then revoke the access of this Galaxy instance, " "and then retry to login. If the problem persists, contact " "the Admin of this Galaxy instance." ) if response is None or not isinstance(response, dict): # This can happen only if PSA is not able to decode the `authnz code` # sent back from the identity provider. PSA internally handles such # scenarios; however, this case is implemented to prevent uncaught # server-side errors. raise MalformedContents(err_msg=f"`response` not found. {hint_msg}") if not response.get("id_token"): # This can happen if a non-OIDC compliant backend is used; # e.g., an OAuth2.0-based backend that only generates access token. raise MalformedContents(err_msg=f"Missing identity token. {hint_msg}") if is_new and not response.get("refresh_token"): # An identity provider (e.g., Google) sends a refresh token the first # time user consents Galaxy's access (i.e., the first time user logs in # to a galaxy instance using their credentials with the identity provider). # There could be variety of scenarios under which a refresh token might # be missing; e.g., a manipulated Galaxy's database, where a user's records # from galaxy_user and oidc_user_authnz_tokens tables deleted after the # user has provided consent. This can also happen under dev efforts. # The solution is to revoke the consent by visiting the identity provider's # website, and then retry the login process. raise MalformedContents(err_msg=f"Missing refresh token. {hint_msg}")
[docs]def verify(strategy=None, response=None, details=None, **kwargs): provider = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_PROVIDER") endpoint = strategy.config.get("SOCIAL_AUTH_SECONDARY_AUTH_ENDPOINT") if provider is None or endpoint is None: # Either the secondary authorization is not configured or OIDC IdP # is not compatible, so allow user login. return if provider.lower() == "gcp": result = requests.post( f"https://iam.googleapis.com/v1/projects/-/serviceAccounts/{endpoint}:getIamPolicy", headers={ "Authorization": f"Bearer {response.get('access_token')}", "Accept": "application/json", }, timeout=DEFAULT_SOCKET_TIMEOUT, ) res = json.loads(result.content) if result.status_code == requests.codes.ok: email_addresses = res["bindings"][0]["members"] email_addresses = [x.lower().replace("user:", "").strip() for x in email_addresses] if details.get("email") in email_addresses: # Secondary authorization successful, so allow user login. pass else: raise Exception("Not authorized by GCP IAM.") else: # The message of the raised exception is shown to the user; hence, # the following way of handling exception is better than using # result.raise_for_status(), since raise_for_status may report # sensitive information that should not be exposed to users. raise Exception(res["error"]["message"]) else: raise Exception(f"`{provider}` is an unsupported secondary authorization provider, contact admin.")
[docs]def allowed_to_disconnect( name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs ): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. This function should raise an exception if disconnection is NOT permitted. Do NOT return any value (except an empty dictionary) if disconnect is allowed. Because, at least until PSA social_core v.1.5.0, any returned value (e.g., Boolean) will result in ignoring the rest of the disconnect pipeline. See the following condition in `run_pipeline` function: https://github.com/python-social-auth/social-core/blob/master/social_core/backends/base.py#L114 :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: empty dict """
[docs]def disconnect( name=None, user=None, user_storage=None, strategy=None, backend=None, request=None, details=None, **kwargs ): """ Disconnect is the process of disassociating a Galaxy user and a third-party authnz. In other words, it is the process of removing any access and/or ID tokens of a user. :param name: name of the backend (e.g., google-openidconnect) :type user: galaxy.model.User :type user_storage: galaxy.model.UserAuthnzToken :type strategy: galaxy.authnz.psa_authnz.Strategy :type backend: PSA backend object (e.g., social_core.backends.google_openidconnect.GoogleOpenIdConnect) :type request: webob.multidict.MultiDict :type details: dict :return: void or empty dict. Any key-value pair inside the dictionary will be available inside PSA only, and will be passed to the next step in the disconnect pipeline. However, the key-value pair will not be returned as a result of calling the `do_disconnect` function. Additionally, returning any value except for a(n) (empty) dictionary, will break the disconnect pipeline, and that value will be returned as a result of calling the `do_disconnect` function. """ sa_session = user_storage.sa_session user_authnz = ( sa_session.query(user_storage) .filter(user_storage.table.c.user_id == user.id, user_storage.table.c.provider == name) .first() ) if user_authnz is None: return {"success": False, "message": "Not authenticated by any identity providers."} # option A sa_session.delete(user_authnz) # option B # user_authnz.extra_data = None with transaction(sa_session): sa_session.commit()