Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.mapper
import importlib
import logging
from inspect import getfullargspec
from types import ModuleType
from typing import (
    Callable,
    TYPE_CHECKING,
    Union,
)
import galaxy.jobs.rules
from galaxy.jobs import stock_rules
from galaxy.jobs.dynamic_tool_destination import map_tool_to_destination
from galaxy.jobs.job_destination import JobDestination
from galaxy.util.submodules import import_submodules
from .rule_helper import RuleHelper
if TYPE_CHECKING:
    from galaxy.jobs import (
        JobConfiguration,
        JobWrapper,
    )
log = logging.getLogger(__name__)
DYNAMIC_RUNNER_NAME = "dynamic"
DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
ERROR_MESSAGE_NO_RULE_FUNCTION = "Galaxy misconfigured - cannot find dynamic rule function name for destination %s."
ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND = (
    "Galaxy misconfigured - no rule function named %s found in dynamic rule modules."
)
ERROR_MESSAGE_RULE_EXCEPTION = "Encountered an unhandled exception while caching job destination dynamic rule."
[docs]
class JobNotReadyException(Exception):
[docs]
    def __init__(self, job_state=None, message=None):
        self.job_state = job_state
        self.message = message
STOCK_RULES: dict[str, Callable] = dict(
    choose_one=stock_rules.choose_one,
    burst=stock_rules.burst,
    docker_dispatch=stock_rules.docker_dispatch,
    dtd=map_tool_to_destination,
)
[docs]
class JobRunnerMapper:
    """
    This class is responsible to managing the mapping of jobs
    (in the form of job_wrappers) to job runner url strings.
    """
    rules_module: ModuleType
[docs]
    def __init__(
        self,
        job_wrapper: "JobWrapper",
        url_to_destination: Callable[[str], JobDestination],
        job_config: "JobConfiguration",
    ):
        self.job_wrapper = job_wrapper
        self.url_to_destination = url_to_destination
        self.job_config = job_config
        self.rules_module = galaxy.jobs.rules
        if job_config.dynamic_params is not None:
            module_name = job_config.dynamic_params["rules_module"]
            self.rules_module = importlib.import_module(module_name)
    def __invoke_expand_function(
        self, expand_function: Callable, destination: JobDestination, resource_params_from_job_state: bool = True
    ):
        function_arg_names = getfullargspec(expand_function).args
        app = self.job_wrapper.app
        if self.job_wrapper.tool is None:
            raise JobMappingException(
                f"Can't map job to destination, tool '{self.job_wrapper.get_job().tool_id}' is unavailable"
            )
        possible_args = {
            "job_id": self.job_wrapper.job_id,
            "tool": self.job_wrapper.tool,
            "tool_id": self.job_wrapper.tool.id,
            "job_wrapper": self.job_wrapper,
            "rule_helper": RuleHelper(app),
            "app": app,
            "referrer": destination,
        }
        # Don't hit the DB to load the job object if not needed
        require_db = False
        for param in [
            "job",
            "user",
            "user_email",
            "resource_params",
            "workflow_invocation_uuid",
            "workflow_resource_params",
        ]:
            if param in function_arg_names:
                require_db = True
                break
        if require_db:
            job = self.job_wrapper.get_job()
            user = job.user
            db_param_mapping = {
                "job": job,
                "user": user,
                "user_email": user and str(user.email),
            }
        actual_args = {}
        for arg in function_arg_names:
            # Send through any job config defined args to function
            if arg in destination.params:
                actual_args[arg] = destination.params[arg]
            # Populate needed args
            elif arg in possible_args:
                actual_args[arg] = possible_args[arg]
            elif require_db and arg in db_param_mapping:
                actual_args[arg] = db_param_mapping[arg]
            elif arg == "resource_params":
                actual_args["resource_params"] = (
                    self.job_wrapper.get_resource_parameters(job) if resource_params_from_job_state else {}
                )
            elif arg == "workflow_invocation_uuid":
                param_values = job.raw_param_dict()
                workflow_invocation_uuid = param_values.get("__workflow_invocation_uuid__", None)
                actual_args["workflow_invocation_uuid"] = workflow_invocation_uuid
            elif arg == "workflow_resource_params":
                param_values = job.raw_param_dict()
                workflow_resource_params = param_values.get("__workflow_resource_params__", None)
                actual_args["workflow_resource_params"] = workflow_resource_params
        return expand_function(**actual_args)
    def __convert_url_to_destination(self, url: str):
        """
        Job runner URLs are deprecated, but dynamic mapper functions may still
        be returning them.  Runners are expected to be able to convert these to
        destinations.
        This method calls
        JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn
        calls the url_to_destination method for the appropriate runner.
        """
        dest = self.url_to_destination(url)
        dest.id = DYNAMIC_DESTINATION_ID
        return dest
    def __find_function_by_tool_id(self, rule_modules: list[ModuleType]) -> Union[Callable, None]:
        assert self.job_wrapper.tool is not None
        # default look for function with name matching an id of tool, unless one specified
        for tool_id in self.job_wrapper.tool.all_ids:
            matching_func = self.__last_matching_function_in_modules(rule_modules, tool_id)
            if matching_func:
                return matching_func
        return None
    def __get_expand_function(self, destination: JobDestination) -> Callable:
        """
        Returns the function that matches the rule. If a rules_module override
        is specified, search within that rules_module, or default to the plugin's
        top level rules_module.
        """
        rules_module_name = destination.params.get("rules_module")
        rule_modules = self.__get_rule_modules_or_defaults(rules_module_name)
        if expand_function_name := destination.params.get("function"):
            expand_function = self.__last_matching_function_in_modules(rule_modules, expand_function_name)
            if not expand_function:
                message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % expand_function_name
                raise JobMappingConfigurationException(message)
        else:
            expand_function = self.__find_function_by_tool_id(rule_modules)
            if not expand_function:
                message = ERROR_MESSAGE_NO_RULE_FUNCTION % destination
                raise JobMappingConfigurationException(message)
        return expand_function
    def __get_rule_modules_or_defaults(self, rules_module_name: Union[str, None]) -> list[ModuleType]:
        """
        Returns the rules under the given rules_module_name or default
        to returning the rules of the top-level rules module for the plugin
        """
        if rules_module_name:
            rules_module = importlib.import_module(rules_module_name)
        else:
            rules_module = self.rules_module
        return import_submodules(rules_module, ordered=True)
    def __last_matching_function_in_modules(
        self, rule_modules: list[ModuleType], function_name: str
    ) -> Union[Callable, None]:
        # self.rule_modules is sorted in reverse order, so find first
        # with function
        for rule_module in rule_modules:
            if f := getattr(rule_module, function_name, None):
                assert callable(f)
                return f
        return None
    def __handle_dynamic_job_destination(self, destination: JobDestination) -> JobDestination:
        expand_type = destination.params.get("type", "python")
        expand_function = None
        if expand_type == "python":
            expand_function = self.__get_expand_function(destination)
        elif expand_type in STOCK_RULES:
            expand_function = STOCK_RULES[expand_type]
        else:
            raise JobMappingConfigurationException(f"Unhandled dynamic job runner type specified - {expand_type}")
        return self.__handle_rule(expand_function, destination)
    def __handle_rule(self, rule_function: Callable, destination: JobDestination) -> JobDestination:
        try:
            job_destination = self.__invoke_expand_function(rule_function, destination)
        except Exception as e:
            # Rules have varying quality and don't raise a consistent set of standard exceptions.
            # so ... if we get an error here let's try again without resource params encoded
            # in the job state. They're evil anyway.
            try:
                job_destination = self.__invoke_expand_function(
                    rule_function, destination, resource_params_from_job_state=False
                )
            except Exception:
                # raise original exception, dropping resource param from job state didn't help.
                raise e
            else:
                log.warning(
                    f"Ignored user-specified invalid resource parameter request because it failed with {str(e)}"
                )
        if not isinstance(job_destination, JobDestination):
            job_destination_rep = str(job_destination)  # Should be either id or url
            if "://" in job_destination_rep:
                job_destination = self.__convert_url_to_destination(job_destination_rep)
            else:
                job_destination = self.job_config.get_destination(job_destination_rep)
        return job_destination
    def __determine_job_destination(
        self, params: Union[dict, None], raw_job_destination: Union[JobDestination, None] = None
    ) -> JobDestination:
        if raw_job_destination is None:
            if self.job_wrapper.tool is None:
                raise JobMappingException(
                    f"Can't map job to destination, tool '{self.job_wrapper.get_job().tool_id}' is unavailable"
                )
            raw_job_destination = self.job_wrapper.tool.get_job_destination(params)
        if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
            job_destination = self.__handle_dynamic_job_destination(raw_job_destination)
            log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id)
            # Recursively handle chained dynamic destinations
            if job_destination.runner == DYNAMIC_RUNNER_NAME:
                return self.__determine_job_destination(params, raw_job_destination=job_destination)
        else:
            job_destination = raw_job_destination
        return job_destination
    def __cache_job_destination(
        self, params: Union[dict, None], raw_job_destination: Union[JobDestination, None] = None
    ) -> JobDestination:
        try:
            self.cached_job_destination = self.__determine_job_destination(
                params, raw_job_destination=raw_job_destination
            )
        except (JobMappingConfigurationException, JobMappingException, JobNotReadyException):
            raise
        except Exception:
            # Other exceptions should not bubble up to the job wrapper since they can occur during the fail() method,
            # causing jobs to become permanently stuck in a non-terminal state.
            log.exception("Caught unhandled exception while attempting to cache job destination:")
            raise JobMappingException(ERROR_MESSAGE_RULE_EXCEPTION)
        return self.cached_job_destination
[docs]
    def get_job_destination(self, params: Union[dict, None]) -> JobDestination:
        """
        cached_job_destination is a public property that is sometimes
        externally set to short-circuit the mapper, such as during resubmits.
        get_job_destination will respect that and not run the mapper if so.
        """
        if not hasattr(self, "cached_job_destination"):
            return self.__cache_job_destination(params)
        return self.cached_job_destination
[docs]
    def cache_job_destination(self, raw_job_destination: Union[JobDestination, None]) -> JobDestination:
        """
        Force update of cached_job_destination to mapper determined job
        destination, overwriting any externally set cached_job_destination
        """
        return self.__cache_job_destination(None, raw_job_destination=raw_job_destination)