Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.auth.providers.ldap_ad

"""
Created on 15/07/2014

@author: Andrew Robinson
"""

import logging

from galaxy.exceptions import ConfigurationError
from galaxy.util import string_as_bool
from ..providers import AuthProvider

try:
    import ldap
except ImportError as exc:
    ldap = None
    ldap_import_exc = exc

log = logging.getLogger(__name__)


def _get_subs(d, k, params):
    if k not in d or not d[k]:
        raise ConfigurationError("Missing '%s' parameter in LDAP options" % k)
    return str(d[k]).format(**params)


def _parse_ldap_options(options_unparsed):
    # Tag is defined in the XML but is empty
    if not options_unparsed:
        return []

    ldap_options = []

    # Valid options must start with this prefix. See help(ldap)
    prefix = "OPT_"

    for opt in options_unparsed.split(","):
        try:
            key, value = opt.split("=")
        except ValueError:
            log.warning("LDAP authenticate: Invalid syntax '%s' inside <ldap-options> element. Syntax should be option1=value1,option2=value2" % opt)
            continue

        if not key.startswith(prefix):
            log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' doesn't start with prefix '%s'", opt, key, prefix)
            continue
        try:
            key = getattr(ldap, key)
        except AttributeError:
            log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, key)
            continue
        if value.startswith(prefix):
            try:
                value = getattr(ldap, value)
            except AttributeError:
                log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, value)
                continue
        pair = (key, value)
        log.debug("LDAP authenticate: Valid LDAP option pair '%s' -> '%s=%s'", opt, *pair)
        ldap_options.append(pair)

    return ldap_options


[docs]class LDAP(AuthProvider): """ Attempts to authenticate users against an LDAP server. If options include search-fields then it will attempt to search LDAP for those fields first. After that it will bind to LDAP with the username (formatted as specified). """ plugin_type = 'ldap'
[docs] def __init__(self): super(LDAP, self).__init__() self.auto_create_roles_or_groups = False self.role_search_attribute = None self.role_search_option = 'auto-register-roles'
[docs] def check_config(self, username, email, options): ok = True failure_mode = False # reject but continue if options.get('continue-on-failure', 'False') == 'False': failure_mode = None # reject and do not continue if string_as_bool(options.get('login-use-username', False)): if not username: log.debug('LDAP authenticate: username must be used to login, cannot be None') return ok, failure_mode else: if not email: log.debug('LDAP authenticate: email must be used to login, cannot be None') return ok, failure_mode auto_create_roles = string_as_bool(options.get('auto-create-roles', False)) auto_create_groups = string_as_bool(options.get('auto-create-groups', False)) self.auto_create_roles_or_groups = auto_create_roles or auto_create_groups auto_assign_roles_to_groups_only = string_as_bool(options.get('auto-assign-roles-to-groups-only', False)) if auto_assign_roles_to_groups_only and not (auto_create_roles and auto_create_groups): raise ConfigurationError("If 'auto-assign-roles-to-groups-only' is True, auto-create-roles and " "auto-create-groups have to be True as well.") self.role_search_attribute = options.get(self.role_search_option, None) return ok, failure_mode
[docs] def authenticate(self, email, username, password, options): """ See abstract method documentation. """ if not options['redact_username_in_logs']: log.debug("LDAP authenticate: email is %s" % email) log.debug("LDAP authenticate: username is %s" % username) log.debug("LDAP authenticate: options are %s" % options) failure_mode, params = self.ldap_search(email, username, options) if not params: return failure_mode, '', '' # allow to skip authentication to allow for pre-populating users if not options.get('no_password_check', False): params['password'] = password if not self._authenticate(params, options): return failure_mode, '', '' attributes = {} if self.auto_create_roles_or_groups: attributes['roles'] = params[self.role_search_option] return (True, _get_subs(options, 'auto-register-email', params), _get_subs(options, 'auto-register-username', params), attributes)
def _authenticate(self, params, options): """ Do the actual authentication by binding as the user to check their credentials """ import ldap try: l = ldap.initialize(_get_subs(options, 'server', params)) l.protocol_version = 3 bind_password = _get_subs(options, 'bind-password', params) l.simple_bind_s(_get_subs( options, 'bind-user', params), bind_password) try: whoami = l.whoami_s() except ldap.PROTOCOL_ERROR: # The "Who am I?" extended operation is not supported by this LDAP server pass else: if not options['redact_username_in_logs']: log.debug("LDAP authenticate: whoami is %s", whoami) if whoami is None: raise RuntimeError('LDAP authenticate: anonymous bind') except Exception: log.warning('LDAP authenticate: bind exception', exc_info=True) return False log.debug('LDAP authentication successful') return True
[docs] def authenticate_user(self, user, password, options): """ See abstract method documentation. """ return self.authenticate(user.email, user.username, password, options)[0]
[docs]class ActiveDirectory(LDAP): """ Effectively just an alias for LDAP auth, but may contain active directory specific logic in the future. """ plugin_type = 'activedirectory'
__all__ = ('LDAP', 'ActiveDirectory')