Source code for galaxy.jobs.dynamic_tool_destination

import argparse
import collections
import copy
import json
import logging
import math
import os
import re
import sys
from functools import reduce
from typing import Set

import numpy as np
import yaml

from galaxy.util import parse_xml

__version__ = "1.1.0"

# log to galaxy's logger
log = logging.getLogger(__name__)

# does a lot more logging when set to true
verbose = True

"""
list of all valid priorities, inferred from the global
default_desinations section of the config
"""
priority_list: Set[str] = set()

"""
Instantiated to a list of all valid destinations in the job configuration file
if run directly to validate configs. Otherwise, remains None. We often check
to see if app is None, because if it is then we'll try using the
destination_list instead.
-"""
destination_list: Set[str] = set()

"""
The largest the edit distance can be for a word to be considered
A correction for another word.
"""
max_edit_dist = 2

"""
List of valid categories that can be expected in the configuration.
"""
valid_categories = ["verbose", "tools", "default_destination", "users", "default_priority"]

# --- destination validation error messages --- #
dest_err_default_dest = "Default destination '%s' does not appear in the job configuration."  # destination
dest_err_tool_default_dest = (
    "Default destination for '%s': '%s' does not appear in the job configuration."  # tool, destination
)
dest_err_tool_rule_dest = (
    "Destination for '%s', rule %s: '%s' does not exist in job configuration."  # tool, counter, destination
)


[docs]class MalformedYMLException(Exception): pass
[docs]class ScannerError(Exception): pass
[docs]def get_keys_from_dict(dl, keys_list): """ This function builds a list using the keys from nest dictionaries """ if isinstance(dl, dict): keys_list.extend(dl.keys()) for x in dl.values(): get_keys_from_dict(x, keys_list) elif isinstance(dl, list): for x in dl: get_keys_from_dict(x, keys_list)
[docs]class RuleValidator: """ This class is the primary facility for validating configs. It's always called in map_tool_to_destination and it's called for validating config directly through DynamicToolDestination.py """
[docs] @classmethod def validate_rule(cls, rule_type: str, app, return_bool: bool = False, *args, **kwargs): """ This function is responsible for passing each rule to its relevant function. :param rule_type: the current rule's type :param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. :rtype: bool, dict (depending on return_bool) :returns: validated rule or result of validation (depending on return_bool) """ if rule_type == "file_size": return cls.__validate_file_size_rule(app, return_bool, *args, **kwargs) elif rule_type == "num_input_datasets": return cls.__validate_num_input_datasets_rule(app, return_bool, *args, **kwargs) elif rule_type == "records": return cls.__validate_records_rule(app, return_bool, *args, **kwargs) elif rule_type == "arguments": return cls.__validate_arguments_rule(app, return_bool, *args, **kwargs)
@classmethod def __validate_file_size_rule(cls, app, return_bool, original_rule, counter, tool): """ This function is responsible for validating 'file_size' rules. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type original_rule: dict @param original_rule: contains the original received rule @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @type tool: str @param tool: the name of the current tool. Necessary for log output. @rtype: bool, dict (depending on return_bool) @return: validated rule or result of validation (depending on return_bool) """ rule = copy.deepcopy(original_rule) valid_rule = True # Users Verification # if rule is not None: valid_rule, rule = cls.__validate_users(valid_rule, return_bool, rule, tool, counter) # Nice_value Verification # if rule is not None: valid_rule, rule = cls.__validate_nice_value(valid_rule, return_bool, rule, tool, counter) # Destination Verification # if rule is not None: valid_rule, rule = cls.__validate_destination(valid_rule, app, return_bool, rule, tool, counter) # Bounds Verification # if rule is not None: valid_rule, rule = cls.__validate_bounds(valid_rule, return_bool, rule, tool, counter) if return_bool: return valid_rule else: return rule @classmethod def __validate_num_input_datasets_rule(cls, app, return_bool, original_rule, counter, tool): """ This function is responsible for validating 'num_input_datasets' rules. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type original_rule: dict @param original_rule: contains the original received rule @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @type tool: str @param tool: the name of the current tool. Necessary for log output. @rtype: bool, dict (depending on return_bool) @return: validated rule or result of validation (depending on return_bool) """ rule = copy.deepcopy(original_rule) valid_rule = True # Users Verification # if rule is not None: valid_rule, rule = cls.__validate_users(valid_rule, return_bool, rule, tool, counter) # Nice_value Verification # if rule is not None: valid_rule, rule = cls.__validate_nice_value(valid_rule, return_bool, rule, tool, counter) # Destination Verification # if rule is not None: valid_rule, rule = cls.__validate_destination(valid_rule, app, return_bool, rule, tool, counter) # Bounds Verification # if rule is not None: valid_rule, rule = cls.__validate_bounds(valid_rule, return_bool, rule, tool, counter) if return_bool: return valid_rule else: return rule @classmethod def __validate_records_rule(cls, app, return_bool, original_rule, counter, tool): """ This function is responsible for validating 'records' rules. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type original_rule: dict @param original_rule: contains the original received rule @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @type tool: str @param tool: the name of the current tool. Necessary for log output. @rtype: bool, dict (depending on return_bool) @return: validated rule or result of validation (depending on return_bool) """ rule = copy.deepcopy(original_rule) valid_rule = True # Users Verification # if rule is not None: valid_rule, rule = cls.__validate_users(valid_rule, return_bool, rule, tool, counter) # Nice_value Verification # if rule is not None: valid_rule, rule = cls.__validate_nice_value(valid_rule, return_bool, rule, tool, counter) # Destination Verification # if rule is not None: valid_rule, rule = cls.__validate_destination(valid_rule, app, return_bool, rule, tool, counter) # Bounds Verification # if rule is not None: valid_rule, rule = cls.__validate_bounds(valid_rule, return_bool, rule, tool, counter) if return_bool: return valid_rule else: return rule @classmethod def __validate_arguments_rule(cls, app, return_bool, original_rule, counter, tool): """ This is responsible for validating 'arguments' rules. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type original_rule: dict @param original_rule: contains the original received rule @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @type tool: str @param tool: the name of the current tool. Necessary for log output. @rtype: bool, dict (depending on return_bool) @return: validated rule or result of validation (depending on return_bool) """ rule = copy.deepcopy(original_rule) valid_rule = True # Users Verification # if rule is not None: valid_rule, rule = cls.__validate_users(valid_rule, return_bool, rule, tool, counter) # Nice_value Verification # if rule is not None: valid_rule, rule = cls.__validate_nice_value(valid_rule, return_bool, rule, tool, counter) # Destination Verification # if rule is not None: valid_rule, rule = cls.__validate_destination(valid_rule, app, return_bool, rule, tool, counter) # Arguments Verification (for rule_type arguments; read comment block at top # of function for clarification. if rule is not None: valid_rule, rule = cls.__validate_arguments(valid_rule, return_bool, rule, tool, counter) if return_bool: return valid_rule else: return rule @classmethod def __validate_nice_value(cls, valid_rule, return_bool, rule, tool, counter): """ This function is responsible for validating nice_value. @type valid_rule: bool @param valid_rule: returns True if everything is valid. False if it encounters any abnormalities in the config. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type rule: dict @param rule: contains the original received rule @type tool: str @param tool: the name of the current tool. Necessary for log output. @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @rtype: bool, dict (tuple) @return: validated rule and result of validation """ if "nice_value" in rule: if rule["nice_value"] < -20 or rule["nice_value"] > 20: error = f"nice_value goes from -20 to 20; rule {str(counter)}" error += f" in '{str(tool)}' has a nice_value of '" error += f"{str(rule['nice_value'])}'." if not return_bool: error += " Setting nice_value to 0." rule["nice_value"] = 0 if verbose: log.debug(error) valid_rule = False else: error = f"No nice_value found for rule {str(counter)} in '" error += f"{str(tool)}'." if not return_bool: error += " Setting nice_value to 0." rule["nice_value"] = 0 if verbose: log.debug(error) valid_rule = False return valid_rule, rule @classmethod def __validate_destination(cls, valid_rule: bool, app, return_bool: bool, rule: dict, tool: str, counter: int): """ This function is responsible for validating destination. :param valid_rule: returns True if everything is valid. False if it encounters any abnormalities in the config. :param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. :param rule: contains the original received rule :param tool: the name of the current tool. Necessary for log output. :param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. :rtype: bool, dict (tuple) :returns: validated rule and result of validation """ if "fail_message" in rule: if "destination" not in rule or rule["destination"] != "fail": error = f"Found a fail_message for rule {str(counter)}" error += f" in '{str(tool)}', but destination is not 'fail'!" if not return_bool: error += " Setting destination to 'fail'." if verbose: log.debug(error) valid_rule = False rule["destination"] = "fail" if "destination" in rule: suggestion = None if isinstance(rule["destination"], str): if rule["destination"] == "fail" and "fail_message" not in rule: error = f"Missing a fail_message for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Adding generic fail_message." message = f"Invalid parameters for rule {str(counter)}" message += f" in '{str(tool)}'." rule["fail_message"] = message if verbose: log.debug(error) valid_rule = False else: is_valid = validate_destination( app, rule["destination"], dest_err_tool_rule_dest, (tool, counter, rule["destination"]), return_bool, ) if not is_valid: valid_rule = False elif isinstance(rule["destination"], dict): if "priority" in rule["destination"] and isinstance(rule["destination"]["priority"], dict): for priority in rule["destination"]["priority"]: if priority not in priority_list: error = "Invalid priority '" error += f"{str(priority)}' for rule " error += f"{str(counter)} in '{str(tool)}'." suggestion = get_typo_correction(priority, priority_list, max_edit_dist) if suggestion: error += f" Did you mean '{str(suggestion)}'?" if not return_bool: error += " Ignoring..." if verbose: log.debug(error) valid_rule = False elif not isinstance(rule["destination"]["priority"][priority], str): error = "Cannot parse tool destination '" error += str(rule["destination"]["priority"][priority]) error += f"' for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Ignoring..." if verbose: log.debug(error) valid_rule = False else: is_valid = validate_destination( app, rule["destination"]["priority"][priority], dest_err_tool_rule_dest, (tool, counter, rule["destination"]["priority"][priority]), return_bool, ) if not is_valid: valid_rule = False else: error = f"No destination specified for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Ignoring..." if verbose: log.debug(error) valid_rule = False else: error = f"No destination specified for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Ignoring..." if verbose: log.debug(error) valid_rule = False else: error = f"No destination specified for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Ignoring..." if verbose: log.debug(error) valid_rule = False return valid_rule, rule @classmethod def __validate_bounds(cls, valid_rule, return_bool, rule, tool, counter): """ This function is responsible for validating bounds. @type valid_rule: bool @param valid_rule: returns True if everything is valid. False if it encounters any abnormalities in the config. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type rule: dict @param rule: contains the original received rule @type tool: str @param tool: the name of the current tool. Necessary for log output. @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @rtype: bool/None, dict (tuple) @return: validated rule (or None if invalid) and result of validation """ if "upper_bound" in rule and "lower_bound" in rule: if rule["rule_type"] in ("file_size", "records"): upper_bound = str_to_bytes(rule["upper_bound"]) lower_bound = str_to_bytes(rule["lower_bound"]) else: upper_bound = rule["upper_bound"] lower_bound = rule["lower_bound"] if lower_bound == "Infinity": error = "Error: lower_bound is set to Infinity, but must be " error += "lower than upper_bound!" if not return_bool: error += " Setting lower_bound to 0!" lower_bound = 0 rule["lower_bound"] = 0 else: lower_bound = math.inf if verbose: log.debug(error) valid_rule = False if upper_bound == "Infinity": upper_bound = -1 if upper_bound != -1 and lower_bound > upper_bound: error = f"lower_bound exceeds upper_bound for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Reversing bounds." temp_upper_bound = rule["upper_bound"] temp_lower_bound = rule["lower_bound"] rule["upper_bound"] = temp_lower_bound rule["lower_bound"] = temp_upper_bound if verbose: log.debug(error) valid_rule = False else: error = f"Missing bounds for rule {str(counter)}" error += f" in '{str(tool)}'." if not return_bool: error += " Ignoring rule." rule = None if verbose: log.debug(error) valid_rule = False return valid_rule, rule @classmethod def __validate_arguments(cls, valid_rule, return_bool, rule, tool, counter): """ This function is responsible for validating arguments. @type valid_rule: bool @param valid_rule: returns True if everything is valid. False if it encounters any abnormalities in the config. @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type rule: dict @param rule: contains the original received rule @type tool: str @param tool: the name of the current tool. Necessary for log output. @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @rtype: bool/None, dict (tuple) @return: validated rule (or None if invalid) and result of validation """ if "arguments" not in rule or not isinstance(rule["arguments"], dict): error = f"No arguments found for rule {str(counter)} in '" error += f"{str(tool)}' despite being of type arguments." if not return_bool: error += " Ignoring rule." rule = None if verbose: log.debug(error) valid_rule = False return valid_rule, rule @classmethod def __validate_users(cls, valid_rule, return_bool, rule, tool, counter): """ This function is responsible for validating users (if present). @type return_bool: bool @param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. @type valid_rule: bool @param valid_rule: returns True if everything is valid. False if it encounters any abnormalities in the config. @type rule: dict @param rule: contains the original received rule @type counter: int @param counter: this counter is used to identify what rule # is currently being validated. Necessary for log output. @type tool: str @param tool: the name of the current tool. Necessary for log output. @rtype: bool, dict (tuple) @return: validated rule and result of validation """ emailregex = r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$" if "users" in rule: if isinstance(rule["users"], list): for user in reversed(rule["users"]): if not isinstance(user, str): error = f"Entry '{str(user)}' in users for rule " error += f"{str(counter)} in tool '{str(tool)}" error += "' is in an invalid format!" if not return_bool: error += " Ignoring entry." if verbose: log.debug(error) valid_rule = False rule["users"].remove(user) else: if re.match(emailregex, user) is None: error = f"Supplied email '{str(user)}" error += f"' for rule {str(counter)} in tool '" error += f"{str(tool)}' is in an invalid format!" if not return_bool: error += " Ignoring email." if verbose: log.debug(error) valid_rule = False rule["users"].remove(user) else: error = "Couldn't find a list under 'users:'!" if not return_bool: error += " Ignoring rule." rule = None if verbose: log.debug(error) valid_rule = False # post-processing checking to make sure we didn't just remove all the users # if we did, we should ignore the rule if rule is not None and rule["users"] is not None and len(rule["users"]) == 0: error = f"No valid user emails were specified for rule {str(counter)}" error += f" in tool '{str(tool)}'!" if not return_bool: error += " Ignoring rule." rule = None if verbose: log.debug(error) valid_rule = False return valid_rule, rule
[docs]def parse_yaml( path: str = "/config/tool_destinations.yml", job_conf_path: str = "/config/job_conf.xml", app=None, test: bool = False, return_bool: bool = False, ): """ Get a yaml file from path and send it to validate_config for validation. :param path: the path to the tool destinations config file :param job_conf_path: the path to the job config file :param test: indicates whether to run in test mode or production mode :type return_bool: bool :param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. :rtype: bool, dict (depending on return_bool) :returns: validated rule or result of validation (depending on return_bool) """ if app is None: global destination_list destination_list = get_destination_list_from_job_config(job_conf_path) # Import file from path try: if test: config = yaml.safe_load(path) else: if path == "/config/tool_destinations.yml": # os.path.realpath gets the path of DynamicToolDestination.py # and then os.path.join is used to go back four directories config_directory = os.path.join( os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir, os.pardir, os.pardir ) opt_file = config_directory + path else: opt_file = path with open(opt_file) as stream: config = yaml.safe_load(stream) # Test imported file try: if return_bool: valid_config = validate_config(config, app, return_bool) else: config = validate_config(config, app) except MalformedYMLException: if verbose: log.exception("Failed to parse YAML for dynamic job destination") raise except ScannerError: if verbose: log.error("Config is too malformed to fix!") raise if return_bool: return valid_config else: return config
[docs]def validate_destination(app, destination: str, err_message: str, err_message_contents, return_bool: bool = True): """ Validate received destination id. :param app: Current app :param destination: string containing the destination id that is being validated :param err_message: Error message to be formatted with the contents of `err_message_contents` upon the event of invalid destination :type err_message_contents: tuple :param err_message_contents: A tuple of strings to be placed in ``err_message`` :param return_bool: Whether or not the calling function has been told to return a boolean value or not. Determines whether or not to print 'Ignoring...' after error messages. :rtype: bool :returns: True if the destination is valid and False otherwise. """ valid_destination = False suggestion = None if ( destination == "fail" and err_message is dest_err_tool_rule_dest ): # It's a tool rule that is set to fail. It's valid valid_destination = True elif app is None: if destination in destination_list: valid_destination = True else: suggestion = get_typo_correction(destination, destination_list, max_edit_dist) elif app.job_config.get_destination(destination): valid_destination = True if not valid_destination: error = err_message % err_message_contents if suggestion: error += f" Did you mean '{suggestion}'?" if not return_bool: error += " Ignoring..." if verbose: log.debug(error) return valid_destination
[docs]def validate_config(obj: dict, app=None, return_bool: bool = False): """ Validate received config. :param obj: the entire contents of the config :param return_bool: True when we are only interested in the result of the validation, and not the validated rule itself. :rtype: bool, dict (depending on return_bool) :returns: validated rule or result of validation (depending on return_bool) """ global priority_list priority_list = set() def infinite_defaultdict(): return collections.defaultdict(infinite_defaultdict) # Allow new_config to expand automatically when adding values to new levels new_config = infinite_defaultdict() global verbose verbose = False valid_config = True valid_rule = True tool_has_default = False if return_bool: verbose = True elif obj is not None and "verbose" in obj and isinstance(obj["verbose"], bool): verbose = obj["verbose"] else: valid_config = False if obj: log.debug(f"Verbose value '{str(obj['verbose'])}' is not True or False! Falling back to verbose...") verbose = True if not return_bool and verbose: log.debug("Running config validation...") # if this is false, then it's definitely because of verbose missing if not valid_config and return_bool: log.debug("Missing mandatory field 'verbose' in config!") # a list with the available rule_types. Can be expanded on easily in the future available_rule_types = ["file_size", "num_input_datasets", "records", "arguments"] if obj is not None: # in obj, there should always be only 5 categories: tools, default_destination, # default_priority, users, and verbose if "default_destination" in obj: suggestion = None if isinstance(obj["default_destination"], str): is_valid = validate_destination( app, obj["default_destination"], dest_err_default_dest, (obj["default_destination"]) ) if is_valid: new_config["default_destination"] = obj["default_destination"] else: valid_config = False elif isinstance(obj["default_destination"], dict): if "priority" in obj["default_destination"] and isinstance( obj["default_destination"]["priority"], dict ): for priority in obj["default_destination"]["priority"]: if isinstance(obj["default_destination"]["priority"][priority], str): priority_list.add(priority) is_valid = validate_destination( app, obj["default_destination"]["priority"][priority], dest_err_default_dest, (obj["default_destination"]["priority"][priority]), ) if is_valid: new_config["default_destination"]["priority"][priority] = obj["default_destination"][ "priority" ][priority] else: valid_config = False if len(priority_list) < 1: error = "No valid priorities found!" if verbose: log.debug(error) valid_config = False else: if "default_priority" in obj: if isinstance(obj["default_priority"], str): if obj["default_priority"] in priority_list: new_config["default_priority"] = obj["default_priority"] else: error = ( "Default priority '" + str(obj["default_priority"]) + "' is not a valid priority." ) suggestion = get_typo_correction( obj["default_priority"], priority_list, max_edit_dist ) if suggestion: error += f" Did you mean '{str(suggestion)}'?" if verbose: log.debug(error) else: error = "default_priority in config is not valid." if verbose: log.debug(error) valid_config = False else: error = "No default_priority section found in config." if "med" in priority_list: # set 'med' as fallback default priority, so # old tool_destination.yml configs still work error += " Setting 'med' as default priority." new_config["default_priority"] = "med" else: error += " Things may not run as expected!" valid_config = False if verbose: log.debug(error) else: error = "No global default destinations specified in config!" if verbose: log.debug(error) valid_config = False else: error = "No global default destination specified in config!" if verbose: log.debug(error) valid_config = False else: error = "No global default destination specified in config!" if verbose: log.debug(error) valid_config = False if "users" in obj: if isinstance(obj["users"], dict): for user in obj["users"]: curr = obj["users"][user] if isinstance(curr, dict): if "priority" in curr and isinstance(curr["priority"], str): if curr["priority"] in priority_list: new_config["users"][user]["priority"] = curr["priority"] else: error = ( "User '" + user + "', priority '" + str(curr["priority"]) + "' is not defined " + "in the global default_destination section" ) suggestion = get_typo_correction(curr["priority"], priority_list, max_edit_dist) if suggestion: error += f" Did you mean '{str(suggestion)}'?" if verbose: log.debug(error) valid_config = False else: error = f"User '{user}' is missing a priority!" if verbose: log.debug(error) valid_config = False else: error = f"User '{user}' is missing a priority!" if verbose: log.debug(error) valid_config = False else: error = "Users option is not a dictionary!" if verbose: log.debug(error) valid_config = False if "tools" in obj: for tool in obj["tools"]: curr = obj["tools"][tool] # This check is to make sure we have a tool name, and not just # rules right way. if not isinstance(curr, list): curr_tool_rules = [] if curr is not None: # in each tool, there should always be only 2 sub-categories: # default_destination (not mandatory) and rules (mandatory) if "default_destination" in curr: suggestion = None if isinstance(curr["default_destination"], str): is_valid = validate_destination( app, curr["default_destination"], dest_err_tool_default_dest, (tool, curr["default_destination"]), ) if is_valid: new_config["tools"][tool]["default_destination"] = curr["default_destination"] tool_has_default = True else: valid_config = False elif isinstance(curr["default_destination"], dict): if "priority" in curr["default_destination"] and isinstance( curr["default_destination"]["priority"], dict ): for priority in curr["default_destination"]["priority"]: destination = curr["default_destination"]["priority"][priority] if priority in priority_list: if isinstance(destination, str): is_valid = validate_destination( app, destination, dest_err_tool_default_dest, (tool, curr["default_destination"]["priority"][priority]), ) if is_valid: new_config["tools"][tool]["default_destination"]["priority"][ priority ] = destination tool_has_default = True else: valid_config = False else: error = ( "No default '" + str(priority) + "' priority destination for tool " + str(tool) + " in config!" ) if verbose: log.debug(error) valid_config = False else: error = ( "Invalid default destination priority '" + str(priority) + "' for '" + str(tool) + "'." ) suggestion = get_typo_correction(priority, priority_list, max_edit_dist) if suggestion: error += f" Did you mean '{str(suggestion)}'?" if verbose: log.debug(error) valid_config = False else: error = "No default priority destinations specified" error += f" for {str(tool)} in config!" if verbose: log.debug(error) valid_config = False if "rules" in curr and isinstance(curr["rules"], list): # under rules, there should only be a list of rules curr_tool = curr counter = 0 for rule in curr_tool["rules"]: if "rule_type" in rule: if rule["rule_type"] in available_rule_types: validated_rule = None counter += 1 # if we're only interested in the result of # the validation, then only retrieve the # result if return_bool: valid_rule = RuleValidator.validate_rule( rule["rule_type"], app, return_bool, rule, counter, tool ) # otherwise, retrieve the processed rule else: validated_rule = RuleValidator.validate_rule( rule["rule_type"], app, return_bool, rule, counter, tool ) # if the result we get is False, then # indicate that the whole config is invalid if not valid_rule: valid_config = False # if we got a rule back that seems to be # valid (or was fixable) then append it to # list of ready-to-use tools if not return_bool and validated_rule is not None: curr_tool_rules.append(copy.deepcopy(validated_rule)) # if rule['rule_type'] in available_rule_types else: error = "Unrecognized rule_type '" error += f"{rule['rule_type']}' " error += f"found in '{str(tool)}'. " if not return_bool: error += "Ignoring..." if verbose: log.debug(error) valid_config = False # if "rule_type" in rule else: counter += 1 error = "No rule_type found for rule " error += str(counter) error += f" in '{str(tool)}'." if verbose: log.debug(error) valid_config = False # if "rules" in curr and isinstance(curr['rules'], list): elif not tool_has_default: valid_config = False error = f"Tool '{str(tool)}' does not have" error += " rules nor a default_destination!" if verbose: log.debug(error) # if obj['tools'][tool] is not None: else: valid_config = False error = f"Config section for tool '{str(tool)}' is blank!" if verbose: log.debug(error) if curr_tool_rules: new_config["tools"][str(tool)]["rules"] = curr_tool_rules # if not isinstance(curr, list) else: error = "Malformed YML; expected job name, " error += "but found a list instead!" if verbose: log.debug(error) valid_config = False # quickly run through categories to detect unrecognized types for category in obj.keys(): if category not in valid_categories: error = f"Unrecognized category '{category}" error += "' found in config file!" if verbose: log.debug(error) valid_config = False # if obj is not None else: if verbose: log.debug("No (or empty) config file supplied!") valid_config = False if not return_bool: if verbose: log.debug("Finished config validation.") if return_bool: return valid_config else: return new_config
[docs]def bytes_to_str(size, unit="YB"): """ Uses the bi convention: 1024 B = 1 KB since this method primarily has inputs of bytes for RAM @type size: int @param size: the size in int (bytes) to be converted to str @rtype: str @return return_str: the resulting string """ # converts size in bytes to most readable unit units = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"] i = 0 # mostly called in order to convert "infinity" try: size_changer = int(size) except ValueError: error = "bytes_to_str passed uncastable non numeric value " raise ValueError(error + str(size)) try: upto = units.index(unit.strip().upper()) except ValueError: upto = 9 while size_changer >= 1024 and i < upto: size_changer = size_changer / 1024.0 i += 1 if size_changer == -1: size_changer = "Infinity" i = 0 try: return_str = f"{size_changer:.2f} {units[i]}" except (ValueError, TypeError): return_str = f"{size_changer}" return return_str
[docs]def str_to_bytes(size): """ Uses the bi convention: 1024 B = 1 KB since this method primarily has inputs of bytes for RAM @type size: str @param size: the size in str to be converted to int (bytes) @rtype: int @return curr_size: the resulting size converted from str """ units = ["", "b", "kb", "mb", "gb", "tb", "pb", "eb", "zb", "yb"] curr_size = size try: if size.lower() != "infinity": # Get the number try: curr_item = size.strip().split(" ") curr_size = "".join(curr_item) curr_size = int(curr_size) except ValueError: curr_item = size.strip().split(" ") curr_unit = curr_item[-1].strip().lower() curr_item = curr_item[0:-1] curr_size = "".join(curr_item) try: curr_size = float(curr_size) except ValueError: error = f"Unable to convert size {str(size)}" raise MalformedYMLException(error) # Get the unit and convert to bytes try: pos = units.index(curr_unit) for _ in range(pos, 1, -1): curr_size *= 1024 except ValueError: error = f"Unable to convert size {str(size)}" raise MalformedYMLException(error) except NameError: pass else: curr_size = -1 except AttributeError: # If size is not a string (doesn't have .lower()) pass return curr_size
[docs]def importer(test): """ Uses Mock galaxy for testing or real galaxy for production @type test: bool @param test: True when being run from a test """ global JobDestination global JobMappingException if test: class JobDestination: def __init__(self, *kwd): self.id = kwd.get("id") self.nativeSpec = kwd.get("params")["nativeSpecification"] self.runner = kwd.get("runner") from galaxy.jobs.mapper import JobMappingException else: from galaxy.jobs import JobDestination from galaxy.jobs.mapper import JobMappingException
[docs]def map_tool_to_destination(job, app, tool, user_email, test=False, path=None, job_conf_path=None): """ Dynamically allocate resources @param job: galaxy job @param app: current app @param tool: current tool @type test: bool @param test: True when running in test mode @type path: str @param path: path to tool_destinations.yml @type job_conf_path: str @param job_conf_path: path to job_conf.xml """ importer(test) # set verbose to True by default, just in case (some tests fail without # this due to how the tests apparently work) global verbose verbose = True filesize_rule_present = False num_input_datasets_rule_present = False records_rule_present = False # Get configuration from tool_destinations.yml and job_conf.xml if path is None: path = app.config.tool_destinations_config_file if job_conf_path is None: job_conf_path = app.config.job_config_file try: config = parse_yaml(path, job_conf_path, app) except MalformedYMLException as e: raise JobMappingException(e) # Get all inputs from tool and databases inp_data = {da.name: da.dataset for da in job.input_datasets} inp_data.update([(da.name, da.dataset) for da in job.input_library_datasets]) if config is not None and str(tool.old_id) in config["tools"]: if "rules" in config["tools"][str(tool.old_id)]: for rule in config["tools"][str(tool.old_id)]["rules"]: if rule["rule_type"] == "file_size": filesize_rule_present = True if rule["rule_type"] == "num_input_datasets": num_input_datasets_rule_present = True if rule["rule_type"] == "records": records_rule_present = True file_size = 0 records = 0 num_input_datasets = 0 if filesize_rule_present or records_rule_present or num_input_datasets_rule_present: # Loop through the database and look for amount of records try: for line in inp_db: if line[0] == ">": records += 1 except NameError: pass # Loops through each input file and adds the size to the total # or looks through db for records for da in inp_data: try: # If the input is a file, check and add the size if inp_data[da] is not None and os.path.isfile(inp_data[da].get_file_name()): num_input_datasets += 1 if verbose: message = f"Loading file: {str(da)}" message += str(inp_data[da].get_file_name()) log.debug(message) # Add to records if the file type is fasta if inp_data[da].ext == "fasta": if records_rule_present: inp_db = open(inp_data[da].get_file_name()) # Try to find automatically computed sequences metadata = inp_data[da].get_metadata() try: records += int(metadata.get("sequences")) except (TypeError, KeyError): for line in inp_db: if line[0] == ">": records += 1 if filesize_rule_present: query_file = str(inp_data[da].get_file_name()) file_size += os.path.getsize(query_file) except AttributeError: # Otherwise, say that input isn't a file if verbose: log.debug(f"Not a file: {str(inp_data[da])}") if verbose: if filesize_rule_present: log.debug(f"Total size: {bytes_to_str(file_size)}") if records_rule_present: log.debug(f"Total amount of records: {str(records)}") if num_input_datasets_rule_present: log.debug(f"Total number of files: {str(num_input_datasets)}") matched_rule = None user_authorized = None rule_counter = 0 # For each different rule for the tool that's running fail_message = None if fail_message is not None: destination = "fail" elif config is not None: # Get the default priority from the config if necessary. # If there isn't one, choose an arbitrary one as a fallback if "default_destination" in config: if isinstance(config["default_destination"], dict): if "default_priority" in config: default_priority = config["default_priority"] priority = default_priority else: if len(priority_list) > 0: default_priority = next(iter(priority_list)) priority = default_priority error = ( "No default priority found, arbitrarily setting '" + default_priority + "' as the default priority." + " Things may not work as expected!" ) if verbose: log.debug(error) # fetch priority information from workflow/job parameters job_parameter_list = job.get_parameters() workflow_params = None job_params = None if job_parameter_list is not None: for param in job_parameter_list: if param.name == "__workflow_resource_params__": workflow_params = param.value if param.name == "__job_resource": job_params = param.value # Priority coming from workflow invocation takes precedence over job specific priorities if workflow_params is not None: resource_params = json.loads(workflow_params) if "priority" in resource_params: # For by_group mapping, this priority has already been validated when the # request was created. if resource_params["priority"] is not None: priority = resource_params["priority"] elif job_params is not None: resource_params = json.loads(job_params) if "priority" in resource_params: if resource_params["priority"] is not None: priority = resource_params["priority"] # get the user's priority if "users" in config: if user_email in config["users"]: priority = config["users"][user_email]["priority"] if "default_destination" in config: if isinstance(config["default_destination"], str): destination = config["default_destination"] else: if priority in config["default_destination"]["priority"]: destination = config["default_destination"]["priority"][priority] elif default_priority in config["default_destination"]["priority"]: destination = config["default_destination"]["priority"][default_priority] config = config["tools"] if str(tool.old_id) in config: if "rules" in config[str(tool.old_id)]: for rule in config[str(tool.old_id)]["rules"]: rule_counter += 1 user_authorized = False if "users" in rule and isinstance(rule["users"], list): if user_email in rule["users"]: user_authorized = True else: user_authorized = True if user_authorized: matched = False if rule["rule_type"] == "file_size": # bounds comparisons upper_bound = str_to_bytes(rule["upper_bound"]) lower_bound = str_to_bytes(rule["lower_bound"]) if upper_bound == -1: if lower_bound <= file_size: matched = True else: if lower_bound <= file_size and file_size < upper_bound: matched = True elif rule["rule_type"] == "num_input_datasets": # bounds comparisons upper_bound = rule["upper_bound"] lower_bound = rule["lower_bound"] if upper_bound == "Infinity": if lower_bound <= num_input_datasets: matched = True else: if lower_bound <= num_input_datasets and num_input_datasets < upper_bound: matched = True elif rule["rule_type"] == "records": # bounds comparisons upper_bound = str_to_bytes(rule["upper_bound"]) lower_bound = str_to_bytes(rule["lower_bound"]) if upper_bound == -1: if lower_bound <= records: matched = True else: if lower_bound <= records and records < upper_bound: matched = True elif rule["rule_type"] == "arguments": options = job.get_param_values(app) matched = True # check if the args in the config file are available for arg in rule["arguments"]: arg_dict = {arg: rule["arguments"][arg]} arg_keys_list = [] get_keys_from_dict(arg_dict, arg_keys_list) try: options_value = reduce(dict.__getitem__, arg_keys_list, options) arg_value = reduce(dict.__getitem__, arg_keys_list, arg_dict) if arg_value != options_value: matched = False except KeyError: matched = False if verbose: error = f"Argument '{str(arg)}" error += "' not recognized!" log.debug(error) # if we matched a rule if matched: if matched_rule is None or rule["nice_value"] < matched_rule["nice_value"]: matched_rule = rule # if user_authorized else: if verbose: error = f"User email '{str(user_email)}' not " error += "specified in list of authorized users for " error += f"rule {str(rule_counter)} in tool '" error += f"{str(tool.old_id)}'! Ignoring rule." log.debug(error) # if str(tool.old_id) in config else: error = f"Tool '{str(tool.old_id)}' not specified in config. " error += "Using default destination." if verbose: log.debug(error) if matched_rule is None: if "default_destination" in config[str(tool.old_id)]: default_tool_destination = config[str(tool.old_id)]["default_destination"] if isinstance(default_tool_destination, str): destination = default_tool_destination else: if priority in default_tool_destination["priority"]: destination = default_tool_destination["priority"][priority] elif default_priority in default_tool_destination["priority"]: destination = default_tool_destination["priority"][default_priority] # else global default destination is used else: if isinstance(matched_rule["destination"], str): destination = matched_rule["destination"] else: if priority in matched_rule["destination"]["priority"]: destination = matched_rule["destination"]["priority"][priority] elif default_priority in matched_rule["destination"]["priority"]: destination = matched_rule["destination"]["priority"][default_priority] # else global default destination is used # if "default_destination" in config else: destination = "fail" fail_message = f"Job '{str(tool.old_id)}' failed; " fail_message += "no global default destination specified in config!" # if fail_message is not None # elif config is not None else: destination = "fail" fail_message = "No config file supplied!" if destination == "fail": if fail_message: raise JobMappingException(fail_message) else: raise JobMappingException(matched_rule["fail_message"]) if config is not None: if destination == "fail": output = f"An error occurred: {fail_message}" log.debug(output) else: output = f"Running '{str(tool.old_id)}' with '" output += f"{destination}'." log.debug(output) return destination
[docs]def get_destination_list_from_job_config(job_config_location) -> set: """ returns A list of all destination IDs declared in the job configuration :type job_config_location: str :param job_config_location: The location of the job config file relative to the galaxy root directory. If NoneType, defaults to galaxy/config/job_conf.xml, galaxy/config/job_conf.xml.sample_advanced, or galaxy/config/job_conf.xml.sample_basic (first one that exists) :return: A list of all of the destination IDs declared in the job configuration file. """ global destination_list # os.path.realpath gets the path of DynamicToolDestination.py # and then os.path.join is used to go back four directories config_location = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir, os.pardir) if job_config_location: local_path = re.compile("^/config/.+$") if local_path.match(job_config_location): job_config_location = os.path.join(config_location, job_config_location) else: # Pick one of the default ones message = "* No job config specified, " possible_job_conf_files = [ "config/job_conf.xml", "config/job_conf.xml.sample_advanced", "config/job_conf.xml.sample_basic", ] for f in possible_job_conf_files: possible_job_conf_path = os.path.join(config_location, f) if os.path.isfile(possible_job_conf_path): job_config_location = possible_job_conf_path message += f"using '{f}'. *" break else: message += "and no default job configs in 'config/'. " + "Expect lots of failures. *" if verbose: log.debug(message) if job_config_location: job_conf = parse_xml(job_config_location, strip_whitespace=False) # Add all destination IDs from the job configuration xml file for destination in job_conf.getroot().iter("destination"): destination_id = destination.get("id") if destination_id: destination_list.add(destination_id) else: error = f"Destination ID '{str(destination)}" error += "' in job configuration file cannot be" error += " parsed. Things may not work as expected!" log.warning(error) return destination_list
[docs]def get_edit_distance(source, target): """ returns the edit distance (levenshtein distance) between two strings. code from: en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance @type str1: str @param str1: The first string @type str2: str @param str2: The second string @rtype: int @return: The edit distance between str1 and str2 """ if len(source) < len(target): return get_edit_distance(target, source) # So now we have len(source) >= len(target). if len(target) == 0: return len(source) # We call tuple() to force strings to be used as sequences # ('c', 'a', 't', 's') - numpy uses them as values by default. source = np.array(tuple(source)) target = np.array(tuple(target)) # We use a dynamic programming algorithm, but with the # added optimization that we only need the last two rows # of the matrix. previous_row = np.arange(target.size + 1) for s in source: # Insertion (target grows longer than source): current_row = previous_row + 1 # Substitution or matching: # Target and source items are aligned, and either # are different (cost of 1), or are the same (cost of 0). current_row[1:] = np.minimum(current_row[1:], np.add(previous_row[:-1], target != s)) # Deletion (target grows shorter than source): current_row[1:] = np.minimum(current_row[1:], current_row[0:-1] + 1) previous_row = current_row return previous_row[-1]
[docs]def get_typo_correction(typo_str, word_set, max_dist): """ returns the string in a set that closest matches the input string, as long as the edit distance between them is equal to or smaller than a value, or the words are the same when case is not considered. If there are no appropriate matches, nothing is returned instead. :type typo_str: str :param typo_str: The string to be compared :type word_set: set of str :param word_set: The set of strings to compare to :type max_dist: int :param max_dist: the largest allowed edit distance between the word and the result. If nothing is within this range, nothing is returned :rtype: str or NoneType :returns: The closest matching string, or None, if no strings being compared to are within max_dist edit distance. """ # Start curr_best out as the largest # edit distance we will tolerate plus one curr_best = max_dist + 1 suggestion = None for valid_word in word_set: # If we've already found a best match, # don't bother checking anything else. if curr_best > 0: if typo_str.lower() == valid_word.lower(): # if something matches when case insensitive, # it is automatically set as the best suggestion = valid_word curr_best = 0 else: edit_distance = get_edit_distance(typo_str, valid_word) if edit_distance < curr_best: suggestion = valid_word curr_best = edit_distance return suggestion
if __name__ == "__main__": """ This function is responsible for running the app if directly run through the commandline. It offers the ability to specify a config through the commandline for checking whether or not it is a valid config. It's to be run from within Galaxy, assuming it is installed correctly within the proper directories in Galaxy, and it looks for the config file in galaxy/config/. It can also be run with a path pointing to a config file if not being run directly from inside Galaxy install directory. """ verbose = True parser = argparse.ArgumentParser() logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) parser.add_argument( "-c", "--check-config", dest="check_config", nargs="?", help=( "Use this option to validate tool_destinations.yml." " Optionally, provide the path to the tool_destinations.yml" " that you would like to check, and/or the path to the related" " job_conf.xml. Default: galaxy/config/tool_destinations.yml" "and galaxy/config/job_conf.xml" ), ) parser.add_argument("-j", "--job-config", dest="job_config") parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") args = parser.parse_args() # if run with no arguments, display the help message if len(sys.argv) == 1: parser.print_help() sys.exit(1) job_config_location = args.job_config if args.check_config: valid_config = parse_yaml(path=args.check_config, job_conf_path=job_config_location, return_bool=True) else: valid_config = parse_yaml( path="/config/tool_destinations.yml", job_conf_path=job_config_location, return_bool=True ) if valid_config: print("Configuration is valid!") else: print("Errors detected; config not valid!")