Source code for galaxy.jobs.mapper

import importlib
import logging
from inspect import getfullargspec
from types import ModuleType

import galaxy.jobs.rules
from galaxy.jobs import stock_rules
from galaxy.jobs.dynamic_tool_destination import map_tool_to_destination
from galaxy.util.submodules import import_submodules
from .rule_helper import RuleHelper

log = logging.getLogger(__name__)

DYNAMIC_RUNNER_NAME = "dynamic"
DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"

ERROR_MESSAGE_NO_RULE_FUNCTION = "Galaxy misconfigured - cannot find dynamic rule function name for destination %s."
ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND = (
    "Galaxy misconfigured - no rule function named %s found in dynamic rule modules."
)
ERROR_MESSAGE_RULE_EXCEPTION = "Encountered an unhandled exception while caching job destination dynamic rule."


[docs]class JobMappingConfigurationException(Exception): pass
[docs]class JobMappingException(Exception):
[docs] def __init__(self, failure_message): self.failure_message = failure_message
[docs]class JobNotReadyException(Exception):
[docs] def __init__(self, job_state=None, message=None): self.job_state = job_state self.message = message
STOCK_RULES = dict( choose_one=stock_rules.choose_one, burst=stock_rules.burst, docker_dispatch=stock_rules.docker_dispatch, dtd=map_tool_to_destination, )
[docs]class JobRunnerMapper: """ This class is responsible to managing the mapping of jobs (in the form of job_wrappers) to job runner url strings. """ rules_module: ModuleType
[docs] def __init__(self, job_wrapper, url_to_destination, job_config): self.job_wrapper = job_wrapper self.url_to_destination = url_to_destination self.job_config = job_config self.rules_module = galaxy.jobs.rules if job_config.dynamic_params is not None: module_name = job_config.dynamic_params["rules_module"] self.rules_module = importlib.import_module(module_name)
def __invoke_expand_function(self, expand_function, destination, resource_params_from_job_state=True): function_arg_names = getfullargspec(expand_function).args app = self.job_wrapper.app possible_args = { "job_id": self.job_wrapper.job_id, "tool": self.job_wrapper.tool, "tool_id": self.job_wrapper.tool.id, "job_wrapper": self.job_wrapper, "rule_helper": RuleHelper(app), "app": app, "referrer": destination, } # Don't hit the DB to load the job object if not needed require_db = False for param in [ "job", "user", "user_email", "resource_params", "workflow_invocation_uuid", "workflow_resource_params", ]: if param in function_arg_names: require_db = True break if require_db: job = self.job_wrapper.get_job() user = job.user db_param_mapping = { "job": job, "user": user, "user_email": user and str(user.email), } actual_args = {} for arg in function_arg_names: # Send through any job config defined args to function if arg in destination.params: actual_args[arg] = destination.params[arg] # Populate needed args elif arg in possible_args: actual_args[arg] = possible_args[arg] elif require_db and arg in db_param_mapping: actual_args[arg] = db_param_mapping[arg] elif arg == "resource_params": actual_args["resource_params"] = ( self.job_wrapper.get_resource_parameters(job) if resource_params_from_job_state else {} ) elif arg == "workflow_invocation_uuid": param_values = job.raw_param_dict() workflow_invocation_uuid = param_values.get("__workflow_invocation_uuid__", None) actual_args["workflow_invocation_uuid"] = workflow_invocation_uuid elif arg == "workflow_resource_params": param_values = job.raw_param_dict() workflow_resource_params = param_values.get("__workflow_resource_params__", None) actual_args["workflow_resource_params"] = workflow_resource_params return expand_function(**actual_args) def __job_params(self, job): app = self.job_wrapper.app param_values = job.get_param_values(app, ignore_errors=True) return param_values def __convert_url_to_destination(self, url): """ Job runner URLs are deprecated, but dynamic mapper functions may still be returning them. Runners are expected to be able to convert these to destinations. This method calls JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn calls the url_to_destination method for the appropriate runner. """ dest = self.url_to_destination(url) dest["id"] = DYNAMIC_DESTINATION_ID return dest def __find_function_by_tool_id(self, rule_modules): # default look for function with name matching an id of tool, unless one specified for tool_id in self.job_wrapper.tool.all_ids: matching_func = self.__last_matching_function_in_modules(rule_modules, tool_id) if matching_func: return matching_func return None def __get_expand_function(self, destination): """ Returns the function that matches the rule. If a rules_module override is specified, search within that rules_module, or default to the plugin's top level rules_module. """ rules_module_name = destination.params.get("rules_module") rule_modules = self.__get_rule_modules_or_defaults(rules_module_name) expand_function = None if expand_function_name := destination.params.get("function"): expand_function = self.__last_matching_function_in_modules(rule_modules, expand_function_name) if not expand_function: message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % expand_function_name raise JobMappingConfigurationException(message) else: expand_function = self.__find_function_by_tool_id(rule_modules) if not expand_function: message = ERROR_MESSAGE_NO_RULE_FUNCTION % destination raise JobMappingConfigurationException(message) return expand_function def __get_rule_modules_or_defaults(self, rules_module_name): """ Returns the rules under the given rules_module_name or default to returning the rules of the top-level rules module for the plugin """ if rules_module_name: rules_module = importlib.import_module(rules_module_name) else: rules_module = self.rules_module return import_submodules(rules_module, ordered=True) def __last_matching_function_in_modules(self, rule_modules, function_name): # self.rule_modules is sorted in reverse order, so find first # with function for rule_module in rule_modules: if hasattr(rule_module, function_name): return getattr(rule_module, function_name) return None def __handle_dynamic_job_destination(self, destination): expand_type = destination.params.get("type", "python") expand_function = None if expand_type == "python": expand_function = self.__get_expand_function(destination) elif expand_type in STOCK_RULES: expand_function = STOCK_RULES[expand_type] else: raise JobMappingConfigurationException(f"Unhandled dynamic job runner type specified - {expand_type}") return self.__handle_rule(expand_function, destination) def __handle_rule(self, rule_function, destination): try: job_destination = self.__invoke_expand_function(rule_function, destination) except Exception as e: # Rules have varying quality and don't raise a consistent set of standard exceptions. # so ... if we get an error here let's try again without resource params encoded # in the job state. They're evil anyway. try: job_destination = self.__invoke_expand_function( rule_function, destination, resource_params_from_job_state=False ) except Exception: # raise original exception, dropping resource param from job state didn't help. raise e else: log.warning( f"Ignored user-specified invalid resource parameter request because it failed with {str(e)}" ) if not isinstance(job_destination, galaxy.jobs.JobDestination): job_destination_rep = str(job_destination) # Should be either id or url if "://" in job_destination_rep: job_destination = self.__convert_url_to_destination(job_destination_rep) else: job_destination = self.job_config.get_destination(job_destination_rep) return job_destination def __determine_job_destination(self, params, raw_job_destination=None): if self.job_wrapper.tool is None: raise JobMappingException( f"Can't map job to destination, tool '{self.job_wrapper.get_job().tool_id}' is unavailable" ) if raw_job_destination is None: raw_job_destination = self.job_wrapper.tool.get_job_destination(params) if raw_job_destination.runner == DYNAMIC_RUNNER_NAME: try: job_destination = self.__handle_dynamic_job_destination(raw_job_destination) log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id) # Recursively handle chained dynamic destinations if job_destination.runner == DYNAMIC_RUNNER_NAME: return self.__determine_job_destination(params, raw_job_destination=job_destination) except AssertionError: if params and "job_resource" in params: params = params.copy() del params["job_resource"] else: job_destination = raw_job_destination log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id) return job_destination def __cache_job_destination(self, params, raw_job_destination=None): try: self.cached_job_destination = self.__determine_job_destination( params, raw_job_destination=raw_job_destination ) except (JobMappingConfigurationException, JobMappingException, JobNotReadyException): raise except Exception: # Other exceptions should not bubble up to the job wrapper since they can occur during the fail() method, # causing jobs to become permanently stuck in a non-terminal state. log.exception("Caught unhandled exception while attempting to cache job destination:") raise JobMappingException(ERROR_MESSAGE_RULE_EXCEPTION) return self.cached_job_destination
[docs] def get_job_destination(self, params): """ cached_job_destination is a public property that is sometimes externally set to short-circuit the mapper, such as during resubmits. get_job_destination will respect that and not run the mapper if so. """ if not hasattr(self, "cached_job_destination"): return self.__cache_job_destination(params) return self.cached_job_destination
[docs] def cache_job_destination(self, raw_job_destination): """ Force update of cached_job_destination to mapper determined job destination, overwriting any externally set cached_job_destination """ return self.__cache_job_destination(None, raw_job_destination=raw_job_destination)