Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.auth.providers.ldap_ad

"""
Created on 15/07/2014

@author: Andrew Robinson
"""

import logging

from galaxy.exceptions import ConfigurationError
from galaxy.security.validate_user_input import transform_publicname
from galaxy.util import (
    string_as_bool,
    unicodify,
)
from ..providers import AuthProvider

try:
    import ldap
except ImportError as exc:
    ldap = None
    ldap_import_exc = exc

log = logging.getLogger(__name__)


def _get_subs(d, k, params):
    if k not in d or not d[k]:
        raise ConfigurationError("Missing '%s' parameter in LDAP options" % k)
    return str(d[k]).format(**params)


def _parse_ldap_options(options_unparsed):
    # Tag is defined in the XML but is empty
    if not options_unparsed:
        return []

    ldap_options = []

    # Valid options must start with this prefix. See help(ldap)
    prefix = "OPT_"

    for opt in options_unparsed.split(","):
        try:
            key, value = opt.split("=")
        except ValueError:
            log.warning("LDAP authenticate: Invalid syntax '%s' inside <ldap-options> element. Syntax should be option1=value1,option2=value2", opt)
            continue

        if not key.startswith(prefix):
            log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' doesn't start with prefix '%s'", opt, key, prefix)
            continue
        try:
            key = getattr(ldap, key)
        except AttributeError:
            log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, key)
            continue
        if value.startswith(prefix):
            try:
                value = getattr(ldap, value)
            except AttributeError:
                log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, value)
                continue
        pair = (key, value)
        log.debug("LDAP authenticate: Valid LDAP option pair '%s' -> '%s=%s'", opt, *pair)
        ldap_options.append(pair)

    return ldap_options


[docs]class LDAP(AuthProvider): """ Attempts to authenticate users against an LDAP server. If options include search-fields then it will attempt to search LDAP for those fields first. After that it will bind to LDAP with the username (formatted as specified). """ plugin_type = 'ldap' role_search_option = 'auto-register-roles'
[docs] def __init__(self): super().__init__() self.auto_create_roles_or_groups = False self.role_search_attribute = None
[docs] def check_config(self, username, email, options): ok = True if options.get('continue-on-failure', 'False') == 'False': failure_mode = None # reject and do not continue else: failure_mode = False # reject but continue if string_as_bool(options.get('login-use-username', False)): if not username: log.debug('LDAP authenticate: username must be used to login, cannot be None') return ok, failure_mode else: if not email: log.debug('LDAP authenticate: email must be used to login, cannot be None') return ok, failure_mode auto_create_roles = string_as_bool(options.get('auto-create-roles', False)) auto_create_groups = string_as_bool(options.get('auto-create-groups', False)) self.auto_create_roles_or_groups = auto_create_roles or auto_create_groups auto_assign_roles_to_groups_only = string_as_bool(options.get('auto-assign-roles-to-groups-only', False)) if auto_assign_roles_to_groups_only and not (auto_create_roles and auto_create_groups): raise ConfigurationError("If 'auto-assign-roles-to-groups-only' is True, auto-create-roles and " "auto-create-groups have to be True as well.") self.role_search_attribute = options.get(self.role_search_option) if self.auto_create_roles_or_groups and self.role_search_attribute is None: raise ConfigurationError("If 'auto-create-roles' or 'auto-create-groups' is True, a '%s' attribute has to" " be provided." % self.role_search_option) return ok, failure_mode
[docs] def authenticate(self, email, username, password, options): """ See abstract method documentation. """ if not options['redact_username_in_logs']: log.debug("LDAP authenticate: email is %s", email) log.debug("LDAP authenticate: username is %s", username) log.debug("LDAP authenticate: options are %s", options) failure_mode, params = self.ldap_search(email, username, options) if not params: return failure_mode, '', '' # allow to skip authentication to allow for pre-populating users if not options.get('no_password_check', False): params['password'] = password if not self._authenticate(params, options): return failure_mode, '', '' # check whether the user is a member of a specified group/domain/... if 'search-memberof-filter' in options: search_filter = _get_subs(options, 'search-memberof-filter', params) if not any(search_filter in ad_node_name for ad_node_name in params['memberOf']): return failure_mode, '', '' attributes = {} if self.auto_create_roles_or_groups: attributes['roles'] = params[self.role_search_option] return (True, _get_subs(options, 'auto-register-email', params), transform_publicname(_get_subs(options, 'auto-register-username', params)), attributes)
def _authenticate(self, params, options): """ Do the actual authentication by binding as the user to check their credentials """ try: l = ldap.initialize(_get_subs(options, 'server', params)) l.protocol_version = 3 bind_user = _get_subs(options, 'bind-user', params) bind_password = _get_subs(options, 'bind-password', params) except Exception: log.exception('LDAP authenticate: initialize exception') return False try: l.simple_bind_s(bind_user, bind_password) try: whoami = l.whoami_s() except ldap.PROTOCOL_ERROR: # The "Who am I?" extended operation is not supported by this LDAP server pass else: if whoami is None: raise RuntimeError('LDAP authenticate: anonymous bind') if not options['redact_username_in_logs']: log.debug("LDAP authenticate: whoami is %s", whoami) except Exception as e: log.info('LDAP authenticate: bind exception: %s', unicodify(e)) return False log.debug('LDAP authentication successful') return True
[docs] def authenticate_user(self, user, password, options): """ See abstract method documentation. """ return self.authenticate(user.email, user.username, password, options)[0]
[docs]class ActiveDirectory(LDAP): """ Effectively just an alias for LDAP auth, but may contain active directory specific logic in the future. """ plugin_type = 'activedirectory'
__all__ = ('LDAP', 'ActiveDirectory')