Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.resources
"""This package is something a placeholder for workflow resource parameters.
This file defines the baked in resource mapper types, and this package contains an
example of a more open, pluggable approach with greater control.
"""
import functools
import logging
import os
import sys
from copy import deepcopy
import yaml
import galaxy.util
log = logging.getLogger(__name__)
[docs]def get_resource_mapper_function(app):
config = app.config
mapper = getattr(config, "workflow_resource_params_mapper", None)
if mapper is None:
return _null_mapper_function
elif ":" in mapper:
raw_function = _import_resource_mapping_function(mapper)
# Bind resource parameters here just to not re-parse over and over.
workflow_resource_params = _read_defined_parameter_definitions(config)
return functools.partial(raw_function, workflow_resource_params=workflow_resource_params)
else:
workflow_resource_params = _read_defined_parameter_definitions(config)
with open(mapper) as f:
mapper_definition = yaml.safe_load(f)
if "by_group" in mapper_definition:
by_group = mapper_definition["by_group"]
return functools.partial(
_resource_parameters_by_group, by_group=by_group, workflow_resource_params=workflow_resource_params
)
else:
raise Exception("Currently workflow parameter mapper definitions require a by_group definition.")
def _read_defined_parameter_definitions(config):
params_file = getattr(config, "workflow_resource_params_file", None)
if not params_file or not os.path.exists(params_file):
# Just re-use job resource parameters.
params_file = getattr(config, "job_resource_params_file", None)
if not params_file or not os.path.exists(params_file):
params_file = None
log.debug(f"Loading workflow resource parameter definitions from {params_file}")
if params_file:
return galaxy.util.parse_resource_parameters(params_file)
else:
return {}
def _resource_parameters_by_group(trans, **kwds):
user = trans.user
by_group = kwds["by_group"]
workflow_resource_params = kwds["workflow_resource_params"]
params = []
if validate_by_group_workflow_parameters_mapper(by_group, workflow_resource_params):
user_permissions = {}
user_groups = []
for g in user.groups:
user_groups.append(g.group.name)
default_group = by_group.get("default", None)
for group_name, group_def in by_group.get("groups", {}).items():
if group_name == default_group or group_name in user_groups:
for tag in group_def:
if isinstance(tag, dict):
if tag.get("name") not in user_permissions:
user_permissions[tag.get("name")] = {}
for option in tag.get("options"):
user_permissions[tag.get("name")][option] = {}
else:
if tag not in user_permissions:
user_permissions[tag] = {}
# user_permissions is now set.
params = get_workflow_parameter_list(workflow_resource_params, user_permissions)
return params
# returns an array of parameters that a users set of permissions can access.
[docs]def get_workflow_parameter_list(params, user_permissions):
param_list = []
for param_elem in params.values():
attr = deepcopy(param_elem.attrib)
if attr["name"] in user_permissions:
# Allow 'select' type parameters to be used
if attr["type"] == "select":
option_data = []
reject_list = []
for option_elem in param_elem.findall("option"):
if option_elem.attrib["value"] in user_permissions[attr["name"]]:
option_data.append({"label": option_elem.attrib["label"], "value": option_elem.attrib["value"]})
else:
reject_list.append(option_elem.attrib["label"])
attr["data"] = option_data
attr_help = ""
if "help" in attr:
attr_help = attr["help"]
if reject_list:
attr_help += (
"<br/><br/>The following options are available but disabled.<br/>"
+ str(reject_list)
+ "<br/>If you believe this is a mistake, please contact your Galaxy admin."
)
attr["help"] = attr_help
param_list.append(attr)
return param_list
[docs]def validate_by_group_workflow_parameters_mapper(by_group, workflow_resource_params):
valid = True
try:
if "default" not in by_group:
raise Exception("'workflow_resource_params_mapper' YAML file is malformed, 'default' attribute not found!")
default_group = by_group["default"]
if "groups" not in by_group:
raise Exception("'workflow_resource_params_mapper' YAML file is malformed, 'groups' attribute not found!")
if default_group not in by_group["groups"]:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, default group with title '"
+ default_group
+ "' not found in 'groups'!"
)
for group in by_group["groups"]:
for attrib in by_group["groups"][group]:
if isinstance(attrib, dict):
if "name" not in attrib:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, "
"'name' attribute not found in attribute of group '" + group + "'!"
)
if attrib["name"] not in workflow_resource_params:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, group with name '"
+ attrib["name"]
+ "' not found in 'workflow_resource_params'!"
)
if "options" not in attrib:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, "
"'options' attribute not found in attribute of group '" + group + "'!"
)
valid_options = []
for param_option in workflow_resource_params[attrib["name"]]:
valid_options.append(param_option.attrib["value"])
for option in attrib["options"]:
if option not in valid_options:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, '"
+ option
+ "' in 'options' of '"
+ attrib["name"]
+ "' not found in attribute of group '"
+ group
+ "'!"
)
else:
if attrib not in workflow_resource_params:
raise Exception(
"'workflow_resource_params_mapper' YAML file is malformed, attribute with name "
"'" + attrib + "' not found in 'workflow_resource_params'!"
)
except Exception as e:
log.exception(e)
valid = False
return valid
def _import_resource_mapping_function(qualified_function_path):
full_module_name, function_name = qualified_function_path.split(":", 1)
try:
__import__(full_module_name)
except ImportError:
raise Exception(f"Failed to find workflow resource mapper module {full_module_name}")
module = sys.modules[full_module_name]
if hasattr(module, function_name):
return getattr(module, function_name)
else:
raise Exception(f"Failed to find workflow resource mapper function {full_module_name}.{function_name}")
def _null_mapper_function(*args, **kwds):
return None