Source code for galaxy.auth.providers.ldap_ad

"""
Created on 15/07/2014

@author: Andrew Robinson
"""

import logging

from galaxy.exceptions import ConfigurationError
from galaxy.security.validate_user_input import transform_publicname
from galaxy.util import (
    string_as_bool,
    unicodify,
)
from ..providers import AuthProvider

try:
    import ldap
except ImportError as exc:
    ldap = None
    ldap_import_exc = exc

log = logging.getLogger(__name__)


def _get_subs(d, k, params):
    if k not in d or not d[k]:
        raise ConfigurationError(f"Missing '{k}' parameter in LDAP options")
    return str(d[k]).format(**params)


def _parse_ldap_options(options_unparsed):
    # Tag is defined in the XML but is empty
    if not options_unparsed:
        return []

    ldap_options = []

    # Valid options must start with this prefix. See help(ldap)
    prefix = "OPT_"

    for opt in options_unparsed.split(","):
        try:
            key, value = opt.split("=")
        except ValueError:
            log.warning(
                "LDAP authenticate: Invalid syntax '%s' inside <ldap-options> element. Syntax should be option1=value1,option2=value2",
                opt,
            )
            continue

        if not key.startswith(prefix):
            log.warning(
                "LDAP authenticate: Invalid LDAP option '%s'. '%s' doesn't start with prefix '%s'", opt, key, prefix
            )
            continue
        try:
            key = getattr(ldap, key)
        except AttributeError:
            log.warning("LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, key)
            continue
        if value.startswith(prefix):
            try:
                value = getattr(ldap, value)
            except AttributeError:
                log.warning(
                    "LDAP authenticate: Invalid LDAP option '%s'. '%s' is not available in module ldap", opt, value
                )
                continue
        pair = (key, value)
        log.debug("LDAP authenticate: Valid LDAP option pair '%s' -> '%s=%s'", opt, *pair)
        ldap_options.append(pair)

    return ldap_options


[docs]class LDAP(AuthProvider): """ Attempts to authenticate users against an LDAP server. If options include search-fields then it will attempt to search LDAP for those fields first. After that it will bind to LDAP with the username (formatted as specified). """ plugin_type = "ldap" role_search_option = "auto-register-roles"
[docs] def __init__(self): super().__init__() self.auto_create_roles_or_groups = False self.role_search_attribute = None
[docs] def check_config(self, username, email, options): ok = True if options.get("continue-on-failure", "False") == "False": failure_mode = None # reject and do not continue else: failure_mode = False # reject but continue if string_as_bool(options.get("login-use-username", False)): if not username: log.debug("LDAP authenticate: username must be used to login, cannot be None") return ok, failure_mode else: if not email: log.debug("LDAP authenticate: email must be used to login, cannot be None") return ok, failure_mode auto_create_roles = string_as_bool(options.get("auto-create-roles", False)) auto_create_groups = string_as_bool(options.get("auto-create-groups", False)) self.auto_create_roles_or_groups = auto_create_roles or auto_create_groups auto_assign_roles_to_groups_only = string_as_bool(options.get("auto-assign-roles-to-groups-only", False)) if auto_assign_roles_to_groups_only and not (auto_create_roles and auto_create_groups): raise ConfigurationError( "If 'auto-assign-roles-to-groups-only' is True, auto-create-roles and " "auto-create-groups have to be True as well." ) self.role_search_attribute = options.get(self.role_search_option) if self.auto_create_roles_or_groups and self.role_search_attribute is None: raise ConfigurationError( "If 'auto-create-roles' or 'auto-create-groups' is True, a '%s' attribute has to" " be provided." % self.role_search_option ) return ok, failure_mode
[docs] def authenticate(self, email, username, password, options): """ See abstract method documentation. """ if not options["redact_username_in_logs"]: log.debug("LDAP authenticate: email is %s", email) log.debug("LDAP authenticate: username is %s", username) log.debug("LDAP authenticate: options are %s", options) failure_mode, params = self.ldap_search(email, username, options) if not params: return failure_mode, "", "" # allow to skip authentication to allow for pre-populating users if not options.get("no_password_check", False): params["password"] = password if not self._authenticate(params, options): return failure_mode, "", "" # check whether the user is a member of a specified group/domain/... if "search-memberof-filter" in options: search_filter = _get_subs(options, "search-memberof-filter", params) if not any(search_filter in ad_node_name for ad_node_name in params["memberOf"]): return failure_mode, "", "" attributes = {} if self.auto_create_roles_or_groups: attributes["roles"] = params[self.role_search_option] return ( True, _get_subs(options, "auto-register-email", params), transform_publicname(_get_subs(options, "auto-register-username", params)), attributes, )
def _authenticate(self, params, options): """ Do the actual authentication by binding as the user to check their credentials """ try: conn = ldap.initialize(_get_subs(options, "server", params)) conn.protocol_version = 3 bind_user = _get_subs(options, "bind-user", params) bind_password = _get_subs(options, "bind-password", params) except Exception: log.exception("LDAP authenticate: initialize exception") return False try: conn.simple_bind_s(bind_user, bind_password) try: whoami = conn.whoami_s() except ldap.PROTOCOL_ERROR: # The "Who am I?" extended operation is not supported by this LDAP server pass else: if whoami is None: raise RuntimeError("LDAP authenticate: anonymous bind") if not options["redact_username_in_logs"]: log.debug("LDAP authenticate: whoami is %s", whoami) except Exception as e: log.info("LDAP authenticate: bind exception: %s", unicodify(e)) return False log.debug("LDAP authentication successful") return True
[docs] def authenticate_user(self, user, password, options): """ See abstract method documentation. """ return self.authenticate(user.email, user.username, password, options)[0]
[docs]class ActiveDirectory(LDAP): """Effectively just an alias for LDAP auth, but may contain active directory specific logic in the future.""" plugin_type = "activedirectory"
__all__ = ("LDAP", "ActiveDirectory")