Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.authnz.custos_authnz

import base64
import hashlib
import json
import logging
import os
import time
from datetime import (
    datetime,
    timedelta,
)
from urllib.parse import quote

import jwt
import requests
from oauthlib.common import generate_nonce
from requests_oauthlib import OAuth2Session

from galaxy import (
    exceptions,
    util,
)
from galaxy.model import (
    CustosAuthnzToken,
    User,
)
from galaxy.model.base import transaction
from galaxy.model.orm.util import add_object_to_object_session
from . import IdentityProvider

try:
    import pkce
except ImportError:
    pkce = None  # type: ignore[assignment]

log = logging.getLogger(__name__)
STATE_COOKIE_NAME = "galaxy-oidc-state"
NONCE_COOKIE_NAME = "galaxy-oidc-nonce"
VERIFIER_COOKIE_NAME = "galaxy-oidc-verifier"
KEYCLOAK_BACKENDS = {"custos", "cilogon", "keycloak"}


[docs]class InvalidAuthnzConfigException(Exception): pass
[docs]class CustosAuthnz(IdentityProvider):
[docs] def __init__(self, provider, oidc_config, oidc_backend_config, idphint=None): provider = provider.lower() self.config = {"provider": provider} self.config["verify_ssl"] = oidc_config["VERIFY_SSL"] self.config["url"] = oidc_backend_config["url"] self.config["label"] = oidc_backend_config.get("label", provider.capitalize()) self.config["client_id"] = oidc_backend_config["client_id"] self.config["client_secret"] = oidc_backend_config["client_secret"] self.config["require_create_confirmation"] = oidc_backend_config.get( "require_create_confirmation", provider == "custos" ) self.config["redirect_uri"] = oidc_backend_config["redirect_uri"] self.config["ca_bundle"] = oidc_backend_config.get("ca_bundle", None) self.config["pkce_support"] = oidc_backend_config.get("pkce_support", False) self.config["extra_params"] = { "kc_idp_hint": oidc_backend_config.get( "idphint", "oidc" if self.config["provider"] in ["custos", "keycloak"] else "cilogon" ) } if provider == "cilogon": self._load_config_for_cilogon() elif provider == "custos": self._load_config_for_custos() elif provider == "keycloak": self._load_config_for_keycloak()
def _decode_token_no_signature(self, token): return jwt.decode(token, audience=self.config["client_id"], options={"verify_signature": False})
[docs] def refresh(self, trans, custos_authnz_token): if custos_authnz_token is None: raise exceptions.AuthenticationFailed("cannot find authorized user while refreshing token") id_token_decoded = self._decode_token_no_signature(custos_authnz_token.id_token) # do not refresh tokens if they didn't reach their half lifetime if int(id_token_decoded["iat"]) + int(id_token_decoded["exp"]) > 2 * int(time.time()): return False log.info(custos_authnz_token.access_token) oauth2_session = self._create_oauth2_session() token_endpoint = self.config["token_endpoint"] if self.config.get("iam_client_secret"): client_secret = self.config["iam_client_secret"] else: client_secret = self.config["client_secret"] clientIdAndSec = f"{self.config['client_id']}:{self.config['client_secret']}" # for custos params = { "client_secret": client_secret, "refresh_token": custos_authnz_token.refresh_token, "headers": { "Authorization": f"Basic {util.unicodify(base64.b64encode(util.smart_str(clientIdAndSec)))}" }, # for custos } token = oauth2_session.refresh_token(token_endpoint, **params) processed_token = self._process_token(trans, oauth2_session, token, False) custos_authnz_token.access_token = processed_token["access_token"] custos_authnz_token.id_token = processed_token["id_token"] custos_authnz_token.refresh_token = processed_token["refresh_token"] custos_authnz_token.expiration_time = processed_token["expiration_time"] custos_authnz_token.refresh_expiration_time = processed_token["refresh_expiration_time"] trans.sa_session.add(custos_authnz_token) trans.sa_session.flush() return True
[docs] def authenticate(self, trans, idphint=None): base_authorize_url = self.config["authorization_endpoint"] scopes = ["openid", "email", "profile"] if self.config["provider"] in ["custos", "cilogon"]: scopes.append("org.cilogon.userinfo") oauth2_session = self._create_oauth2_session(scope=scopes) nonce = generate_nonce() nonce_hash = self._hash_nonce(nonce) extra_params = {"nonce": nonce_hash} if idphint is not None: extra_params["idphint"] = idphint if self.config["pkce_support"]: if not pkce: raise InvalidAuthnzConfigException( "The python 'pkce' library is not installed but Galaxy is configured to use it " "(see oidc_backends_config). Make sure pkce is installed correctly to proceed." ) code_verifier, code_challenge = pkce.generate_pkce_pair(96) extra_params["code_challenge"] = code_challenge extra_params["code_challenge_method"] = "S256" trans.set_cookie(value=code_verifier, name=VERIFIER_COOKIE_NAME) if "extra_params" in self.config: extra_params.update(self.config["extra_params"]) authorization_url, state = oauth2_session.authorization_url(base_authorize_url, **extra_params) trans.set_cookie(value=state, name=STATE_COOKIE_NAME) trans.set_cookie(value=nonce, name=NONCE_COOKIE_NAME) return authorization_url
def _process_token(self, trans, oauth2_session, token, validate_nonce=True): processed_token = {} processed_token["access_token"] = token["access_token"] processed_token["id_token"] = token["id_token"] processed_token["refresh_token"] = token["refresh_token"] if "refresh_token" in token else None processed_token["expiration_time"] = datetime.now() + timedelta(seconds=token.get("expires_in", 3600)) processed_token["refresh_expiration_time"] = ( (datetime.now() + timedelta(seconds=token["refresh_expires_in"])) if "refresh_expires_in" in token else None ) # Get nonce from token['id_token'] and validate. 'nonce' in the # id_token is a hash of the nonce stored in the NONCE_COOKIE_NAME # cookie. id_token_decoded = self._decode_token_no_signature(processed_token["id_token"]) if validate_nonce: nonce_hash = id_token_decoded["nonce"] self._validate_nonce(trans, nonce_hash) # Get userinfo and lookup/create Galaxy user record if id_token_decoded.get("email", None): userinfo = id_token_decoded else: userinfo = self._get_userinfo(oauth2_session) processed_token["email"] = userinfo["email"] processed_token["user_id"] = userinfo["sub"] processed_token["username"] = self._username_from_userinfo(trans, userinfo) return processed_token
[docs] def callback(self, state_token, authz_code, trans, login_redirect_url): # Take state value to validate from token. OAuth2Session.fetch_token # will validate that the state query parameter value on the URL matches # this value. state_cookie = trans.get_cookie(name=STATE_COOKIE_NAME) oauth2_session = self._create_oauth2_session(state=state_cookie) token = self._fetch_token(oauth2_session, trans) processed_token = self._process_token(trans, oauth2_session, token) user_id = processed_token["user_id"] email = processed_token["email"] username = processed_token["username"] access_token = processed_token["access_token"] id_token = processed_token["id_token"] refresh_token = processed_token["refresh_token"] expiration_time = processed_token["expiration_time"] refresh_expiration_time = processed_token["refresh_expiration_time"] # Create or update custos_authnz_token record custos_authnz_token = self._get_custos_authnz_token(trans.sa_session, user_id, self.config["provider"]) if custos_authnz_token is None: user = trans.user existing_user = trans.sa_session.query(User).filter_by(email=email).first() if not user: if existing_user: if trans.app.config.fixed_delegated_auth: user = existing_user else: message = f"There already exists a user with email {email}. To associate this external login, you must first be logged in as that existing account." log.info(message) login_redirect_url = ( f"{login_redirect_url}login/start" f"?connect_external_provider={self.config['provider']}" f"&connect_external_email={email}" f"&connect_external_label={self.config['label']}" ) return login_redirect_url, None elif self.config["require_create_confirmation"]: login_redirect_url = f"{login_redirect_url}login/start?confirm=true&provider_token={json.dumps(token)}&provider={self.config['provider']}" return login_redirect_url, None else: user = trans.app.user_manager.create(email=email, username=username) if trans.app.config.user_activation_on: trans.app.user_manager.send_activation_email(trans, email, username) # Create a token to link this identity with an existing account custos_authnz_token = CustosAuthnzToken( user=user, external_user_id=user_id, provider=self.config["provider"], access_token=access_token, id_token=id_token, refresh_token=refresh_token, expiration_time=expiration_time, refresh_expiration_time=refresh_expiration_time, ) label = self.config["label"] if trans.app.config.fixed_delegated_auth: redirect_url = login_redirect_url elif existing_user and existing_user != user: redirect_url = ( f"{login_redirect_url}user/external_ids" f"?email_exists={email}" f"&notification=Your%20{label}%20identity%20has%20been%20linked" "%20to%20your%20Galaxy%20account." ) else: redirect_url = ( f"{login_redirect_url}user/external_ids" f"?notification=Your%20{label}%20identity%20has%20been%20linked" "%20to%20your%20Galaxy%20account." ) else: # Identity is already linked to account - login as usual custos_authnz_token.access_token = access_token custos_authnz_token.id_token = id_token custos_authnz_token.refresh_token = refresh_token custos_authnz_token.expiration_time = expiration_time custos_authnz_token.refresh_expiration_time = refresh_expiration_time redirect_url = "/" trans.sa_session.add(custos_authnz_token) with transaction(trans.sa_session): trans.sa_session.commit() return redirect_url, custos_authnz_token.user
[docs] def create_user(self, token, trans, login_redirect_url): token_dict = json.loads(token) access_token = token_dict["access_token"] id_token = token_dict["id_token"] refresh_token = token_dict["refresh_token"] if "refresh_token" in token_dict else None expiration_time = datetime.now() + timedelta( seconds=token_dict.get("expires_in", 3600) ) # might be a problem cause times no long valid refresh_expiration_time = ( (datetime.now() + timedelta(seconds=token_dict["refresh_expires_in"])) if "refresh_expires_in" in token_dict else None ) # Get nonce from token['id_token'] and validate. 'nonce' in the # id_token is a hash of the nonce stored in the NONCE_COOKIE_NAME # cookie. userinfo = self._decode_token_no_signature(id_token) # Get userinfo and create Galaxy user record email = userinfo["email"] # Check if username if already taken username = self._username_from_userinfo(trans, userinfo) user_id = userinfo["sub"] user = trans.app.user_manager.create(email=email, username=username) if trans.app.config.user_activation_on: trans.app.user_manager.send_activation_email(trans, email, username) custos_authnz_token = CustosAuthnzToken( external_user_id=user_id, provider=self.config["provider"], access_token=access_token, id_token=id_token, refresh_token=refresh_token, expiration_time=expiration_time, refresh_expiration_time=refresh_expiration_time, ) add_object_to_object_session(custos_authnz_token, user) custos_authnz_token.user = user trans.sa_session.add(user) trans.sa_session.add(custos_authnz_token) with transaction(trans.sa_session): trans.sa_session.commit() return login_redirect_url, user
[docs] def disconnect(self, provider, trans, email=None, disconnect_redirect_url=None): try: user = trans.user index = 0 # Find CustosAuthnzToken record for this provider (should only be one) provider_tokens = [token for token in user.custos_auth if token.provider == self.config["provider"]] if len(provider_tokens) == 0: raise Exception(f"User is not associated with provider {self.config['provider']}") if len(provider_tokens) > 1: for idx, token in enumerate(provider_tokens): id_token_decoded = self._decode_token_no_signature(token.id_token) if id_token_decoded["email"] == email: index = idx trans.sa_session.delete(provider_tokens[index]) with transaction(trans.sa_session): trans.sa_session.commit() return True, "", disconnect_redirect_url except Exception as e: return False, f"Failed to disconnect provider {provider}: {util.unicodify(e)}", None
[docs] def logout(self, trans, post_user_logout_href=None): try: redirect_url = self.config["end_session_endpoint"] if post_user_logout_href is not None: redirect_url += f"?redirect_uri={quote(post_user_logout_href)}" return redirect_url except Exception as e: log.error("Failed to generate logout redirect_url", exc_info=e) return None
def _create_oauth2_session(self, state=None, scope=None): client_id = self.config["client_id"] redirect_uri = self.config["redirect_uri"] if redirect_uri.startswith("http://localhost") and os.environ.get("OAUTHLIB_INSECURE_TRANSPORT", None) != "1": log.warning("Setting OAUTHLIB_INSECURE_TRANSPORT to '1' to allow plain HTTP (non-SSL) callback") os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" session = OAuth2Session(client_id, scope=scope, redirect_uri=redirect_uri, state=state) session.verify = self._get_verify_param() return session def _fetch_token(self, oauth2_session, trans): if self.config.get("iam_client_secret"): # Custos uses the Keycloak client secret to get the token client_secret = self.config["iam_client_secret"] else: client_secret = self.config["client_secret"] token_endpoint = self.config["token_endpoint"] clientIdAndSec = f"{self.config['client_id']}:{self.config['client_secret']}" # for custos params = { "client_secret": client_secret, "authorization_response": trans.request.url, "headers": { "Authorization": f"Basic {util.unicodify(base64.b64encode(util.smart_str(clientIdAndSec)))}" }, # for custos "verify": self._get_verify_param(), } if self.config["pkce_support"]: code_verifier = trans.get_cookie(name=VERIFIER_COOKIE_NAME) trans.set_cookie("", name=VERIFIER_COOKIE_NAME, age=-1) params["code_verifier"] = code_verifier return oauth2_session.fetch_token(token_endpoint, **params) def _get_userinfo(self, oauth2_session): userinfo_endpoint = self.config["userinfo_endpoint"] return oauth2_session.get(userinfo_endpoint, verify=self._get_verify_param()).json() def _get_custos_authnz_token(self, sa_session, user_id, provider): return sa_session.query(CustosAuthnzToken).filter_by(external_user_id=user_id, provider=provider).one_or_none() def _hash_nonce(self, nonce): return hashlib.sha256(util.smart_str(nonce)).hexdigest() def _validate_nonce(self, trans, nonce_hash): nonce_cookie = trans.get_cookie(name=NONCE_COOKIE_NAME) # Delete the nonce cookie trans.set_cookie("", name=NONCE_COOKIE_NAME, age=-1) nonce_cookie_hash = self._hash_nonce(nonce_cookie) if nonce_hash != nonce_cookie_hash: raise Exception("Nonce mismatch. Check that configured redirect_uri matches the URL you are using.") def _load_config_for_cilogon(self): # Set cilogon endpoints self.config["authorization_endpoint"] = "https://cilogon.org/authorize" self.config["token_endpoint"] = "https://cilogon.org/oauth2/token" self.config["userinfo_endpoint"] = "https://cilogon.org/oauth2/userinfo" def _load_config_for_custos(self): self.config["well_known_oidc_config_uri"] = self._get_well_known_uri_from_url(self.config["provider"]) self.config["credential_url"] = f"{self.config['url'].rstrip('/')}/credentials" self._get_custos_credentials() # Set custos endpoints clientIdAndSec = f"{self.config['client_id']}:{self.config['client_secret']}" eps = requests.get( self.config["well_known_oidc_config_uri"], headers={"Authorization": f"Basic {util.unicodify(base64.b64encode(util.smart_str(clientIdAndSec)))}"}, verify=False, params={"client_id": self.config["client_id"]}, timeout=util.DEFAULT_SOCKET_TIMEOUT, ) well_known_oidc_config = eps.json() self._load_well_known_oidc_config(well_known_oidc_config) def _load_config_for_keycloak(self): self.config["well_known_oidc_config_uri"] = self._get_well_known_uri_from_url(self.config["provider"]) well_known_oidc_config = self._fetch_well_known_oidc_config(self.config["well_known_oidc_config_uri"]) self._load_well_known_oidc_config(well_known_oidc_config) def _get_custos_credentials(self): clientIdAndSec = f"{self.config['client_id']}:{self.config['client_secret']}" creds = requests.get( self.config["credential_url"], headers={"Authorization": f"Basic {util.unicodify(base64.b64encode(util.smart_str(clientIdAndSec)))}"}, verify=False, params={"client_id": self.config["client_id"]}, timeout=util.DEFAULT_SOCKET_TIMEOUT, ) credentials = creds.json() self.config["iam_client_secret"] = credentials["iam_client_secret"] def _get_well_known_uri_from_url(self, provider): # TODO: Look up this URL from a Python library if provider in ["custos", "keycloak"]: base_url = self.config["url"] # Remove potential trailing slash to avoid "//realms" base_url = base_url if base_url[-1] != "/" else base_url[:-1] return f"{base_url}/.well-known/openid-configuration" else: raise Exception(f"Unknown Custos provider name: {provider}") def _fetch_well_known_oidc_config(self, well_known_uri): try: return requests.get( well_known_uri, verify=self._get_verify_param(), timeout=util.DEFAULT_SOCKET_TIMEOUT ).json() except Exception: log.error(f"Failed to load well-known OIDC config URI: {well_known_uri}") raise def _load_well_known_oidc_config(self, well_known_oidc_config): self.config["authorization_endpoint"] = well_known_oidc_config["authorization_endpoint"] self.config["token_endpoint"] = well_known_oidc_config["token_endpoint"] self.config["userinfo_endpoint"] = well_known_oidc_config["userinfo_endpoint"] self.config["end_session_endpoint"] = well_known_oidc_config.get("end_session_endpoint") def _get_verify_param(self): """Return 'ca_bundle' if 'verify_ssl' is true and 'ca_bundle' is configured.""" # in requests_oauthlib, the verify param can either be a boolean or a CA bundle path if self.config["ca_bundle"] is not None and self.config["verify_ssl"]: return self.config["ca_bundle"] else: return self.config["verify_ssl"] def _username_from_userinfo(self, trans, userinfo): username = userinfo.get("preferred_username", userinfo["email"]) if "@" in username: username = username.split("@")[0] # username created from username portion of email username = util.ready_name_for_url(username) if trans.sa_session.query(trans.app.model.User).filter_by(username=username).first(): # if username already exists in database, append integer and iterate until unique username found count = 0 while trans.sa_session.query(trans.app.model.User).filter_by(username=(f"{username}{count}")).first(): count += 1 return f"{username}{count}" else: return username