Source code for galaxy.workflow.resources

"""This package is something a placeholder for workflow resource parameters.

This file defines the baked in resource mapper types, and this package contains an
example of a more open, pluggable approach with greater control.
"""
import functools
import logging
import os
import sys
from copy import deepcopy

import yaml

import galaxy.util

log = logging.getLogger(__name__)


[docs]def get_resource_mapper_function(app): config = app.config mapper = getattr(config, "workflow_resource_params_mapper", None) if mapper is None: return _null_mapper_function elif ":" in mapper: raw_function = _import_resource_mapping_function(mapper) # Bind resource parameters here just to not re-parse over and over. workflow_resource_params = _read_defined_parameter_definitions(config) return functools.partial(raw_function, workflow_resource_params=workflow_resource_params) else: workflow_resource_params = _read_defined_parameter_definitions(config) with open(mapper) as f: mapper_definition = yaml.safe_load(f) if "by_group" in mapper_definition: by_group = mapper_definition["by_group"] return functools.partial( _resource_parameters_by_group, by_group=by_group, workflow_resource_params=workflow_resource_params ) else: raise Exception("Currently workflow parameter mapper definitions require a by_group definition.")
def _read_defined_parameter_definitions(config): params_file = getattr(config, "workflow_resource_params_file", None) if not params_file or not os.path.exists(params_file): # Just re-use job resource parameters. params_file = getattr(config, "job_resource_params_file", None) if not params_file or not os.path.exists(params_file): params_file = None log.debug(f"Loading workflow resource parameter definitions from {params_file}") if params_file: return galaxy.util.parse_resource_parameters(params_file) else: return {} def _resource_parameters_by_group(trans, **kwds): user = trans.user by_group = kwds["by_group"] workflow_resource_params = kwds["workflow_resource_params"] params = [] if validate_by_group_workflow_parameters_mapper(by_group, workflow_resource_params): user_permissions = {} user_groups = [] for g in user.groups: user_groups.append(g.group.name) default_group = by_group.get("default", None) for group_name, group_def in by_group.get("groups", {}).items(): if group_name == default_group or group_name in user_groups: for tag in group_def: if isinstance(tag, dict): if tag.get("name") not in user_permissions: user_permissions[tag.get("name")] = {} for option in tag.get("options"): user_permissions[tag.get("name")][option] = {} else: if tag not in user_permissions: user_permissions[tag] = {} # user_permissions is now set. params = get_workflow_parameter_list(workflow_resource_params, user_permissions) return params # returns an array of parameters that a users set of permissions can access.
[docs]def get_workflow_parameter_list(params, user_permissions): param_list = [] for param_elem in params.values(): attr = deepcopy(param_elem.attrib) if attr["name"] in user_permissions: # Allow 'select' type parameters to be used if attr["type"] == "select": option_data = [] reject_list = [] for option_elem in param_elem.findall("option"): if option_elem.attrib["value"] in user_permissions[attr["name"]]: option_data.append({"label": option_elem.attrib["label"], "value": option_elem.attrib["value"]}) else: reject_list.append(option_elem.attrib["label"]) attr["data"] = option_data attr_help = "" if "help" in attr: attr_help = attr["help"] if reject_list: attr_help += ( "<br/><br/>The following options are available but disabled.<br/>" + str(reject_list) + "<br/>If you believe this is a mistake, please contact your Galaxy admin." ) attr["help"] = attr_help param_list.append(attr) return param_list
[docs]def validate_by_group_workflow_parameters_mapper(by_group, workflow_resource_params): valid = True try: if "default" not in by_group: raise Exception("'workflow_resource_params_mapper' YAML file is malformed, 'default' attribute not found!") default_group = by_group["default"] if "groups" not in by_group: raise Exception("'workflow_resource_params_mapper' YAML file is malformed, 'groups' attribute not found!") if default_group not in by_group["groups"]: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, default group with title '" + default_group + "' not found in 'groups'!" ) for group in by_group["groups"]: for attrib in by_group["groups"][group]: if isinstance(attrib, dict): if "name" not in attrib: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, " "'name' attribute not found in attribute of group '" + group + "'!" ) if attrib["name"] not in workflow_resource_params: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, group with name '" + attrib["name"] + "' not found in 'workflow_resource_params'!" ) if "options" not in attrib: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, " "'options' attribute not found in attribute of group '" + group + "'!" ) valid_options = [] for param_option in workflow_resource_params[attrib["name"]]: valid_options.append(param_option.attrib["value"]) for option in attrib["options"]: if option not in valid_options: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, '" + option + "' in 'options' of '" + attrib["name"] + "' not found in attribute of group '" + group + "'!" ) else: if attrib not in workflow_resource_params: raise Exception( "'workflow_resource_params_mapper' YAML file is malformed, attribute with name " "'" + attrib + "' not found in 'workflow_resource_params'!" ) except Exception as e: log.exception(e) valid = False return valid
def _import_resource_mapping_function(qualified_function_path): full_module_name, function_name = qualified_function_path.split(":", 1) try: __import__(full_module_name) except ImportError: raise Exception(f"Failed to find workflow resource mapper module {full_module_name}") module = sys.modules[full_module_name] if hasattr(module, function_name): return getattr(module, function_name) else: raise Exception(f"Failed to find workflow resource mapper function {full_module_name}.{function_name}") def _null_mapper_function(*args, **kwds): return None