Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.authnz.managers
import importlib
import logging
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import ParseError
from .psa_authnz import PSAAuthnz
log = logging.getLogger(__name__)
[docs]class AuthnzManager(object):
[docs] def __init__(self, app, oidc_config_file, oidc_backends_config_file):
"""
:type app: galaxy.app.UniverseApplication
:param app:
:type config: string
:param config: sets the path for OIDC configuration
file (e.g., oidc_backends_config.xml).
"""
self._parse_oidc_config(oidc_config_file)
self._parse_oidc_backends_config(oidc_backends_config_file)
def _parse_oidc_config(self, config_file):
self.oidc_config = {}
try:
tree = ET.parse(config_file)
root = tree.getroot()
if root.tag != 'OIDC':
raise ParseError("The root element in OIDC_Config xml file is expected to be `OIDC`, "
"found `{}` instead -- unable to continue.".format(root.tag))
for child in root:
if child.tag != 'Setter':
log.error("Expect a node with `Setter` tag, found a node with `{}` tag instead; "
"skipping this node.".format(child.tag))
continue
if 'Property' not in child.attrib or 'Value' not in child.attrib or 'Type' not in child.attrib:
log.error("Could not find the node attributes `Property` and/or `Value` and/or `Type`;"
" found these attributes: `{}`; skipping this node.".format(child.attrib))
continue
try:
func = getattr(importlib.import_module('__builtin__'), child.get('Type'))
except AttributeError:
log.error("The value of attribute `Type`, `{}`, is not a valid built-in type;"
" skipping this node").format(child.get('Type'))
continue
self.oidc_config[child.get('Property')] = func(child.get('Value'))
except ImportError:
raise
except ParseError as e:
raise ParseError("Invalid configuration at `{}`: {} -- unable to continue.".format(config_file, e.message))
def _parse_oidc_backends_config(self, config_file):
self.oidc_backends_config = {}
try:
tree = ET.parse(config_file)
root = tree.getroot()
if root.tag != 'OIDC':
raise ParseError("The root element in OIDC config xml file is expected to be `OIDC`, "
"found `{}` instead -- unable to continue.".format(root.tag))
for child in root:
if child.tag != 'provider':
log.error("Expect a node with `provider` tag, found a node with `{}` tag instead; "
"skipping the node.".format(child.tag))
continue
if 'name' not in child.attrib:
log.error("Could not find a node attribute 'name'; skipping the node '{}'.".format(child.tag))
continue
idp = child.get('name').lower()
if idp == 'google':
self.oidc_backends_config[idp] = self._parse_google_config(child)
if len(self.oidc_backends_config) == 0:
raise ParseError("No valid provider configuration parsed.")
except ImportError:
raise
except ParseError as e:
raise ParseError("Invalid configuration at `{}`: {} -- unable to continue.".format(config_file, e.message))
# except Exception as e:
# raise Exception("Malformed OIDC Configuration XML -- unable to continue. {}".format(e.message))
def _parse_google_config(self, config_xml):
rtv = {
'client_id': config_xml.find('client_id').text,
'client_secret': config_xml.find('client_secret').text,
'redirect_uri': config_xml.find('redirect_uri').text}
if config_xml.find('prompt') is not None:
rtv['prompt'] = config_xml.find('prompt').text
return rtv
def _get_authnz_backend(self, provider):
provider = provider.lower()
if provider in self.oidc_backends_config:
try:
return True, "", PSAAuthnz(provider, self.oidc_config, self.oidc_backends_config[provider])
except Exception as e:
log.exception('An error occurred when loading PSAAuthnz: ', str(e))
return False, str(e), None
else:
msg = 'The requested identity provider, `{}`, is not a recognized/expected provider'.format(provider)
log.debug(msg)
return False, msg, None
[docs] def authenticate(self, provider, trans):
"""
:type provider: string
:param provider: set the name of the identity provider to be
used for authentication flow.
:type trans: GalaxyWebTransaction
:param trans: Galaxy web transaction.
:return: an identity provider specific authentication redirect URI.
"""
try:
success, message, backend = self._get_authnz_backend(provider)
if success is False:
return False, message, None
return True, "Redirecting to the `{}` identity provider for authentication".format(provider), backend.authenticate(trans)
except Exception as e:
msg = 'An error occurred when authenticating a user on `{}` identity provider: {}'.format(provider, str(e))
log.exception(msg)
return False, msg, None
[docs] def callback(self, provider, state_token, authz_code, trans, login_redirect_url):
try:
success, message, backend = self._get_authnz_backend(provider)
if success is False:
return False, message, (None, None)
return True, message, backend.callback(state_token, authz_code, trans, login_redirect_url)
except Exception as e:
msg = 'An error occurred when handling callback from `{}` identity provider; {}'.format(provider, str(e))
log.exception(msg)
return False, msg, (None, None)
[docs] def disconnect(self, provider, trans, disconnect_redirect_url=None):
try:
success, message, backend = self._get_authnz_backend(provider)
if success is False:
return False, message, None
return backend.disconnect(provider, trans, disconnect_redirect_url)
except Exception as e:
msg = 'An error occurred when disconnecting authentication with `{}` identity provider for user `{}`; ' \
'{}'.format(provider, trans.user.username, str(e))
log.exception(msg)
return False, msg, None