Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.actions

import json
import logging
import os
import re
from json import dumps


from galaxy import model
from galaxy.exceptions import ItemAccessibilityException
from galaxy.jobs.actions.post import ActionBox
from galaxy.model import LibraryDatasetDatasetAssociation, WorkflowRequestInputParameter
from galaxy.model.dataset_collections.builder import CollectionBuilder
from galaxy.model.none_like import NoneDataset
from galaxy.tools.parameters import update_dataset_ids
from galaxy.tools.parameters.basic import DataCollectionToolParameter, DataToolParameter, RuntimeValue
from galaxy.tools.parameters.wrapped import WrappedParameters
from galaxy.util import ExecutionTimer
from galaxy.util.template import fill_template
from galaxy.web import url_for

log = logging.getLogger(__name__)


[docs]class ToolExecutionCache: """ An object mean to cache calculation caused by repeatedly evaluting the same tool by the same user with slightly different parameters. """
[docs] def __init__(self, trans): self.trans = trans self.current_user_roles = trans.get_current_user_roles() self.chrom_info = {} self.cached_collection_elements = {}
[docs] def get_chrom_info(self, tool_id, input_dbkey): genome_builds = self.trans.app.genome_builds custom_build_hack_get_len_from_fasta_conversion = tool_id != 'CONVERTER_fasta_to_len' if custom_build_hack_get_len_from_fasta_conversion and input_dbkey in self.chrom_info: return self.chrom_info[input_dbkey] chrom_info_pair = genome_builds.get_chrom_info(input_dbkey, trans=self.trans, custom_build_hack_get_len_from_fasta_conversion=custom_build_hack_get_len_from_fasta_conversion) if custom_build_hack_get_len_from_fasta_conversion: self.chrom_info[input_dbkey] = chrom_info_pair return chrom_info_pair
[docs]class ToolAction: """ The actions to be taken when a tool is run (after parameters have been converted and validated). """
[docs] def execute(self, tool, trans, incoming=None, set_output_hid=True): incoming = incoming or {} raise TypeError("Abstract method")
[docs]class DefaultToolAction: """Default tool action is to run an external command""" produces_real_jobs = True def _collect_input_datasets(self, tool, param_values, trans, history, current_user_roles=None, dataset_collection_elements=None, collection_info=None): """ Collect any dataset inputs from incoming. Returns a mapping from parameter name to Dataset instance for each tool parameter that is of the DataToolParameter type. """ if current_user_roles is None: current_user_roles = trans.get_current_user_roles() input_datasets = {} all_permissions = {} def record_permission(action, role_id): if action not in all_permissions: all_permissions[action] = set() all_permissions[action].add(role_id) def visitor(input, value, prefix, parent=None, **kwargs): def process_dataset(data, formats=None): if not data or isinstance(data, RuntimeValue): return None if formats is None: formats = input.formats direct_match, target_ext, converted_dataset = data.find_conversion_destination(formats) if not direct_match and target_ext: if converted_dataset: data = converted_dataset else: data = data.get_converted_dataset(trans, target_ext, target_context=parent, history=history) input_name = prefix + input.name # Checked security of whole collection all at once if mapping over this input, else # fetch dataset details for this input from the database. if collection_info and collection_info.is_mapped_over(input_name): action_tuples = collection_info.map_over_action_tuples(input_name) if not trans.app.security_agent.can_access_datasets(current_user_roles, action_tuples): raise ItemAccessibilityException("User does not have permission to use a dataset provided for input.") for action, role_id in action_tuples: record_permission(action, role_id) else: if not trans.app.security_agent.can_access_dataset(current_user_roles, data.dataset): raise ItemAccessibilityException(f"User does not have permission to use a dataset ({data.id}) provided for input.") permissions = trans.app.security_agent.get_permissions(data.dataset) for action, roles in permissions.items(): for role in roles: record_permission(action.action, model.cached_id(role)) return data if isinstance(input, DataToolParameter): if isinstance(value, list): # If there are multiple inputs with the same name, they # are stored as name1, name2, ... for i, v in enumerate(value): processed_dataset = process_dataset(v) if i == 0: # Allow copying metadata to output, first item will be source. input_datasets[prefix + input.name] = processed_dataset input_datasets[prefix + input.name + str(i + 1)] = processed_dataset conversions = [] for conversion_name, conversion_extensions, conversion_datatypes in input.conversions: new_data = process_dataset(input_datasets[prefix + input.name + str(i + 1)], conversion_datatypes) if not new_data or new_data.datatype.matches_any(conversion_datatypes): input_datasets[prefix + conversion_name + str(i + 1)] = new_data conversions.append((conversion_name, new_data)) else: raise Exception(f'A path for explicit datatype conversion has not been found: {input_datasets[prefix + input.name + str(i + 1)].extension} --/--> {conversion_extensions}') if parent: parent[input.name][i] = input_datasets[prefix + input.name + str(i + 1)] for conversion_name, conversion_data in conversions: # allow explicit conversion to be stored in job_parameter table parent[conversion_name][i] = conversion_data.id # a more robust way to determine JSONable value is desired else: param_values[input.name][i] = input_datasets[prefix + input.name + str(i + 1)] for conversion_name, conversion_data in conversions: # allow explicit conversion to be stored in job_parameter table param_values[conversion_name][i] = conversion_data.id # a more robust way to determine JSONable value is desired else: input_datasets[prefix + input.name] = process_dataset(value) conversions = [] for conversion_name, conversion_extensions, conversion_datatypes in input.conversions: new_data = process_dataset(input_datasets[prefix + input.name], conversion_datatypes) if not new_data or new_data.datatype.matches_any(conversion_datatypes): input_datasets[prefix + conversion_name] = new_data conversions.append((conversion_name, new_data)) else: raise Exception(f'A path for explicit datatype conversion has not been found: {input_datasets[prefix + input.name].extension} --/--> {conversion_extensions}') target_dict = parent if not target_dict: target_dict = param_values target_dict[input.name] = input_datasets[prefix + input.name] for conversion_name, conversion_data in conversions: # allow explicit conversion to be stored in job_parameter table target_dict[conversion_name] = conversion_data.id # a more robust way to determine JSONable value is desired elif isinstance(input, DataCollectionToolParameter): if not value: return collection = None child_collection = False if hasattr(value, 'child_collection'): # if we are mapping a collection over a tool, we only require the child_collection child_collection = True collection = value.child_collection else: # else the tool takes a collection as input so we need everything collection = value.collection action_tuples = collection.dataset_action_tuples if not trans.app.security_agent.can_access_datasets(current_user_roles, action_tuples): raise ItemAccessibilityException("User does not have permission to use a dataset provided for input.") for action, role_id in action_tuples: record_permission(action, role_id) _, extensions = collection.dataset_states_and_extensions_summary conversion_required = False for ext in extensions: if ext: datatype = trans.app.datatypes_registry.get_datatype_by_extension(ext) if not datatype.matches_any(input.formats): conversion_required = True break processed_dataset_dict = {} for i, v in enumerate(collection.dataset_instances): processed_dataset = None if conversion_required: processed_dataset = process_dataset(v) if processed_dataset is not v: processed_dataset_dict[v] = processed_dataset input_datasets[prefix + input.name + str(i + 1)] = processed_dataset or v if conversion_required: collection_type_description = trans.app.dataset_collection_manager.collection_type_descriptions.for_collection_type(collection.collection_type) collection_builder = CollectionBuilder(collection_type_description) collection_builder.replace_elements_in_collection( template_collection=collection, replacement_dict=processed_dataset_dict, ) new_collection = collection_builder.build() if child_collection: value.child_collection = new_collection else: value.collection = new_collection tool.visit_inputs(param_values, visitor) return input_datasets, all_permissions
[docs] def collect_input_dataset_collections(self, tool, param_values): def append_to_key(the_dict, key, value): if key not in the_dict: the_dict[key] = [] the_dict[key].append(value) input_dataset_collections = dict() def visitor(input, value, prefix, parent=None, prefixed_name=None, **kwargs): if isinstance(input, DataToolParameter): values = value if not isinstance(values, list): values = [value] for i, value in enumerate(values): if isinstance(value, model.HistoryDatasetCollectionAssociation) or isinstance(value, model.DatasetCollectionElement): append_to_key(input_dataset_collections, prefixed_name, (value, True)) target_dict = parent if not target_dict: target_dict = param_values # This is just a DataToolParameter, so replace this # collection with individual datasets. Database will still # record collection which should be enought for workflow # extraction and tool rerun. if hasattr(value, 'child_collection'): # if we are mapping a collection over a tool, we only require the child_collection dataset_instances = value.child_collection.dataset_instances else: # else the tool takes a collection as input so we need everything dataset_instances = value.collection.dataset_instances if i == 0: target_dict[input.name] = [] target_dict[input.name].extend(dataset_instances) elif isinstance(input, DataCollectionToolParameter): append_to_key(input_dataset_collections, prefix + input.name, (value, False)) tool.visit_inputs(param_values, visitor) return input_dataset_collections
def _check_access(self, tool, trans): assert tool.allow_user_access(trans.user), f"User ({trans.user}) is not allowed to access this tool." def _collect_inputs(self, tool, trans, incoming, history, current_user_roles, collection_info): """ Collect history as well as input datasets and collections. """ # Set history. if not history: history = tool.get_default_history_by_trans(trans, create=True) # Track input dataset collections - but replace with simply lists so collect # input datasets can process these normally. inp_dataset_collections = self.collect_input_dataset_collections(tool, incoming) # Collect any input datasets from the incoming parameters inp_data, all_permissions = self._collect_input_datasets(tool, incoming, trans, history=history, current_user_roles=current_user_roles, collection_info=collection_info) preserved_tags = {} preserved_hdca_tags = {} # grab tags from incoming HDAs for data in inp_data.values(): if not data: continue for tag in data.auto_propagated_tags: preserved_tags[tag.value] = tag # grab tags from incoming HDCAs for collection_pairs in inp_dataset_collections.values(): for collection, _ in collection_pairs: # if sub-collection mapping, this will be an DC not an HDCA # (e.g. part of collection not a collection instance) and thus won't have tags. if hasattr(collection, "tags"): for tag in collection.auto_propagated_tags: preserved_hdca_tags[tag.value] = tag preserved_tags.update(preserved_hdca_tags) return history, inp_data, inp_dataset_collections, preserved_tags, preserved_hdca_tags, all_permissions
[docs] def execute(self, tool, trans, incoming=None, return_job=False, set_output_hid=True, history=None, job_params=None, rerun_remap_job_id=None, execution_cache=None, dataset_collection_elements=None, completed_job=None, collection_info=None, job_callback=None, flush_job=True ): """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use trans.history as destination for tool's output datasets. """ trans.check_user_activation() incoming = incoming or {} self._check_access(tool, trans) app = trans.app if execution_cache is None: execution_cache = ToolExecutionCache(trans) current_user_roles = execution_cache.current_user_roles history, inp_data, inp_dataset_collections, preserved_tags, preserved_hdca_tags, all_permissions = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info) # Build name for output datasets based on tool name and input names on_text = self._get_on_text(inp_data) # format='input" previously would give you a random extension from # the input extensions, now it should just give "input" as the output # format. input_ext = 'data' if tool.profile < 16.04 else "input" input_dbkey = incoming.get("dbkey", "?") for name, data in reversed(list(inp_data.items())): if not data: data = NoneDataset(datatypes_registry=app.datatypes_registry) continue # Convert LDDA to an HDA. if isinstance(data, LibraryDatasetDatasetAssociation) and not completed_job: data = data.to_history_dataset_association(None) inp_data[name] = data if tool.profile < 16.04: input_ext = data.ext if data.dbkey not in [None, '?']: input_dbkey = data.dbkey identifier = getattr(data, "element_identifier", None) if identifier is not None: incoming[f"{name}|__identifier__"] = identifier # Collect chromInfo dataset and add as parameters to incoming (chrom_info, db_dataset) = execution_cache.get_chrom_info(tool.id, input_dbkey) if db_dataset: inp_data.update({"chromInfo": db_dataset}) incoming["chromInfo"] = chrom_info if not completed_job: # Determine output dataset permission/roles list if all_permissions: output_permissions = app.security_agent.guess_derived_permissions(all_permissions) else: # No valid inputs, we will use history defaults output_permissions = app.security_agent.history_get_default_permissions(history) # Add the dbkey to the incoming parameters incoming["dbkey"] = input_dbkey incoming["__input_ext"] = input_ext # wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed wrapped_params = self._wrapped_params(trans, tool, incoming, inp_data) out_data = {} input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()} output_collections = OutputCollections( trans, history, tool=tool, tool_action=self, input_collections=input_collections, dataset_collection_elements=dataset_collection_elements, on_text=on_text, incoming=incoming, params=wrapped_params.params, job_params=job_params, tags=preserved_tags, hdca_tags=preserved_hdca_tags, ) # Keep track of parent / child relationships, we'll create all the # datasets first, then create the associations parent_to_child_pairs = [] child_dataset_names = set() async_tool = tool.tool_type == 'data_source_async' def handle_output(name, output, hidden=None): if output.parent: parent_to_child_pairs.append((output.parent, name)) child_dataset_names.add(name) if async_tool and name in incoming: # HACK: output data has already been created as a result of the async controller dataid = incoming[name] data = trans.sa_session.query(app.model.HistoryDatasetAssociation).get(dataid) assert data is not None out_data[name] = data else: ext = determine_output_format( output, wrapped_params.params, inp_data, inp_dataset_collections, input_ext, python_template_version=tool.python_template_version, execution_cache=execution_cache, ) create_datasets = True dataset = None if completed_job: for output_dataset in completed_job.output_datasets: if output_dataset.name == name: create_datasets = False completed_data = output_dataset.dataset dataset = output_dataset.dataset.dataset break data = app.model.HistoryDatasetAssociation(extension=ext, dataset=dataset, create_dataset=create_datasets, flush=False) if create_datasets: from_work_dir = output.from_work_dir if from_work_dir is not None: data.dataset.created_from_basename = os.path.basename(from_work_dir) if hidden is None: hidden = output.hidden if not hidden and dataset_collection_elements is not None: # Mapping over a collection - hide datasets hidden = True if hidden: data.visible = False if dataset_collection_elements is not None and name in dataset_collection_elements: dataset_collection_elements[name].hda = data trans.sa_session.add(data) if not completed_job: trans.app.security_agent.set_all_dataset_permissions(data.dataset, output_permissions, new=True, flush=False) data.copy_tags_to(preserved_tags.values()) # This may not be neccesary with the new parent/child associations data.designation = name # Copy metadata from one of the inputs if requested. # metadata source can be either a string referencing an input # or an actual object to copy. metadata_source = output.metadata_source if metadata_source: if isinstance(metadata_source, str): metadata_source = inp_data.get(metadata_source) if metadata_source is not None: data.init_meta(copy_from=metadata_source) else: data.init_meta() # Take dbkey from LAST input data.dbkey = str(input_dbkey) # Set state if completed_job: data.blurb = completed_data.blurb data.peek = completed_data.peek data._metadata = completed_data._metadata else: data.blurb = "queued" # Set output label data.name = self.get_output_name(output, data, tool, on_text, trans, incoming, history, wrapped_params.params, job_params) # Store output out_data[name] = data if output.actions: # Apply pre-job tool-output-dataset actions; e.g. setting metadata, changing format output_action_params = dict(out_data) output_action_params.update(incoming) output.actions.apply_action(data, output_action_params) # Also set the default values of actions of type metadata self.set_metadata_defaults(output, data, tool, on_text, trans, incoming, history, wrapped_params.params, job_params) # Flush all datasets at once. return data for name, output in tool.outputs.items(): if not filter_output(tool, output, incoming): handle_output_timer = ExecutionTimer() if output.collection: if completed_job and dataset_collection_elements and name in dataset_collection_elements: # Output collection is mapped over and has already been copied from original job continue collections_manager = app.dataset_collection_manager element_identifiers = [] known_outputs = output.known_outputs(input_collections, collections_manager.type_registry) # Just to echo TODO elsewhere - this should be restructured to allow # nested collections. for output_part_def in known_outputs: # Add elements to top-level collection, unless nested... current_element_identifiers = element_identifiers current_collection_type = output.structure.collection_type for parent_id in (output_part_def.parent_ids or []): # TODO: replace following line with formal abstractions for doing this. current_collection_type = ":".join(current_collection_type.split(":")[1:]) name_to_index = {value["name"]: index for (index, value) in enumerate(current_element_identifiers)} if parent_id not in name_to_index: if parent_id not in current_element_identifiers: index = len(current_element_identifiers) current_element_identifiers.append(dict( name=parent_id, collection_type=current_collection_type, src="new_collection", element_identifiers=[], )) else: index = name_to_index[parent_id] current_element_identifiers = current_element_identifiers[index]["element_identifiers"] effective_output_name = output_part_def.effective_output_name element = handle_output(effective_output_name, output_part_def.output_def, hidden=True) history.stage_addition(element) # TODO: this shouldn't exist in the top-level of the history at all # but for now we are still working around that by hiding the contents # there. # Following hack causes dataset to no be added to history... child_dataset_names.add(effective_output_name) trans.sa_session.add(element) current_element_identifiers.append({ "__object__": element, "name": output_part_def.element_identifier, }) if output.dynamic_structure: assert not element_identifiers # known_outputs must have been empty element_kwds = dict(elements=collections_manager.ELEMENTS_UNINITIALIZED) else: element_kwds = dict(element_identifiers=element_identifiers) output_collections.create_collection( output=output, name=name, completed_job=completed_job, **element_kwds ) log.info(f"Handled collection output named {name} for tool {tool.id} {handle_output_timer}") else: handle_output(name, output) log.info(f"Handled output named {name} for tool {tool.id} {handle_output_timer}") add_datasets_timer = tool.app.execution_timer_factory.get_timer( 'internals.galaxy.tools.actions.add_datasets', 'Added output datasets to history', ) # Add all the top-level (non-child) datasets to the history unless otherwise specified for name, data in out_data.items(): if name not in child_dataset_names and name not in incoming: # don't add children; or already existing datasets, i.e. async created history.stage_addition(data) history.add_pending_items(set_output_hid=set_output_hid) # Add all the children to their parents for parent_name, child_name in parent_to_child_pairs: parent_dataset = out_data[parent_name] child_dataset = out_data[child_name] parent_dataset.children.append(child_dataset) log.info(add_datasets_timer) job_setup_timer = ExecutionTimer() # Create the job object job, galaxy_session = self._new_job_for_session(trans, tool, history) self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections) self._record_outputs(job, out_data, output_collections) # execute immediate post job actions and associate post job actions that are to be executed after the job is complete if job_callback: job_callback(job) if job_params: job.params = dumps(job_params) if completed_job: job.set_copied_from_job_id(completed_job.id) trans.sa_session.add(job) # Remap any outputs if this is a rerun and the user chose to continue dependent jobs # This functionality requires tracking jobs in the database. if app.config.track_jobs_in_database and rerun_remap_job_id is not None: # Need to flush here so that referencing outputs by id works session = trans.sa_session() try: session.expire_on_commit = False session.flush() finally: session.expire_on_commit = True self._remap_job_on_rerun(trans=trans, galaxy_session=galaxy_session, rerun_remap_job_id=rerun_remap_job_id, current_job=job, out_data=out_data) log.info(f"Setup for job {job.log_str()} complete, ready to be enqueued {job_setup_timer}") # Some tools are not really executable, but jobs are still created for them ( for record keeping ). # Examples include tools that redirect to other applications ( epigraph ). These special tools must # include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job # from being queued. if 'REDIRECT_URL' in incoming: # Get the dataset - there should only be 1 for name in inp_data.keys(): dataset = inp_data[name] redirect_url = tool.parse_redirect_url(dataset, incoming) # GALAXY_URL should be include in the tool params to enable the external application # to send back to the current Galaxy instance GALAXY_URL = incoming.get('GALAXY_URL', None) assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config." redirect_url += f"&GALAXY_URL={GALAXY_URL}" # Job should not be queued, so set state to ok job.set_state(app.model.Job.states.OK) job.info = f"Redirected to: {redirect_url}" trans.sa_session.add(job) trans.sa_session.flush() trans.response.send_redirect(url_for(controller='tool_runner', action='redirect', redirect_url=redirect_url)) else: if flush_job: # Set HID and add to history. job_flush_timer = ExecutionTimer() trans.sa_session.flush() log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}") return job, out_data, history
def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data): """ Re-connect dependent datasets for a job that is being rerun (because it failed initially). If a job fails, the user has the option to try the job again with changed parameters. To be able to resume jobs that depend on this jobs output datasets we change the dependent's job input datasets to be those of the job that is being rerun. """ try: old_job = trans.sa_session.query(trans.app.model.Job).get(rerun_remap_job_id) assert old_job is not None, f'({rerun_remap_job_id}/{current_job.id}): Old job id is invalid' assert old_job.tool_id == current_job.tool_id, f'({old_job.id}/{current_job.id}): Old tool id ({old_job.tool_id}) does not match rerun tool id ({current_job.tool_id})' if trans.user is not None: assert old_job.user_id == trans.user.id, f'({old_job.id}/{current_job.id}): Old user id ({old_job.user_id}) does not match rerun user id ({trans.user.id})' elif trans.user is None and type(galaxy_session) == trans.model.GalaxySession: assert old_job.session_id == galaxy_session.id, f'({old_job.id}/{current_job.id}): Old session id ({old_job.session_id}) does not match rerun session id ({galaxy_session.id})' else: raise Exception(f'({old_job.id}/{current_job.id}): Remapping via the API is not (yet) supported') # Start by hiding current job outputs before taking over the old job's (implicit) outputs. current_job.hide_outputs(flush=False) # Duplicate PJAs before remap. for pjaa in old_job.post_job_actions: current_job.add_post_job_action(pjaa.post_job_action) if old_job.workflow_invocation_step: replacement_dict = {} for parameter in old_job.workflow_invocation_step.workflow_invocation.input_parameters: if parameter.type == WorkflowRequestInputParameter.types.REPLACEMENT_PARAMETERS: replacement_dict[parameter.name] = parameter.value for pja in old_job.workflow_invocation_step.workflow_step.post_job_actions: # execute immediate actions here, with workflow context. if pja.action_type in ActionBox.immediate_actions: ActionBox.execute(trans.app, trans.sa_session, pja, current_job, replacement_dict) for p in old_job.parameters: if p.name.endswith('|__identifier__'): current_job.parameters.append(p.copy()) remapped_hdas = self.__remap_data_inputs(old_job=old_job, current_job=current_job) for jtod in old_job.output_datasets: for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]: if (trans.user is not None and job_to_remap.user_id == trans.user.id) or ( trans.user is None and job_to_remap.session_id == galaxy_session.id): self.__remap_parameters(job_to_remap, jtid, jtod, out_data) trans.sa_session.add(job_to_remap) trans.sa_session.add(jtid) job_to_remap.resume() jtod.dataset.visible = False trans.sa_session.add(jtod) for jtodc in old_job.output_dataset_collection_instances: # Update JobToOutputDatasetCollectionAssociation to the current job jtodc.job = current_job hdca = jtodc.dataset_collection_instance hdca.collection.replace_failed_elements(remapped_hdas) if hdca.implicit_collection_jobs: for job in hdca.implicit_collection_jobs.jobs: if job.job_id == old_job.id: job.job_id = current_job.id hdca.update() for jtoidca in old_job.output_dataset_collections: jtoidca.dataset_collection.replace_failed_elements(remapped_hdas) except Exception: log.exception('Cannot remap rerun dependencies.') def __remap_data_inputs(self, old_job, current_job): """Record output datasets from old_job and build a dictionary that maps the old output HDAs to the new output HDAs.""" remapped_hdas = {} old_output_datasets = {jtod.name: jtod.dataset for jtod in old_job.output_datasets} for jtod in current_job.output_datasets: remapped_hdas[old_output_datasets[jtod.name]] = jtod.dataset return remapped_hdas def __remap_parameters(self, job_to_remap, jtid, jtod, out_data): input_values = {p.name: json.loads(p.value) for p in job_to_remap.parameters if p.value is not None} old_dataset_id = jtod.dataset_id new_dataset_id = out_data[jtod.name].id input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda') for p in job_to_remap.parameters: if p.name in input_values: p.value = json.dumps(input_values[p.name]) jtid.dataset = out_data[jtod.name] jtid.dataset.hid = jtod.dataset.hid log.info(f'Job {job_to_remap.id} input HDA {jtod.dataset.id} remapped to new HDA {jtid.dataset.id}') def _wrapped_params(self, trans, tool, incoming, input_datasets=None): wrapped_params = WrappedParameters(trans, tool, incoming, input_datasets=input_datasets) return wrapped_params def _get_on_text(self, inp_data): input_names = [] for data in reversed(list(inp_data.values())): if getattr(data, "hid", None): input_names.append(f'data {data.hid}') return on_text_for_names(input_names) def _new_job_for_session(self, trans, tool, history): job = trans.app.model.Job() job.galaxy_version = trans.app.config.version_major galaxy_session = None if hasattr(trans, "get_galaxy_session"): galaxy_session = trans.get_galaxy_session() # If we're submitting from the API, there won't be a session. if type(galaxy_session) == trans.model.GalaxySession: job.session_id = model.cached_id(galaxy_session) if trans.user is not None: job.user_id = model.cached_id(trans.user) if history: job.history_id = model.cached_id(history) job.tool_id = tool.id try: # For backward compatibility, some tools may not have versions yet. job.tool_version = tool.version except AttributeError: job.tool_version = "1.0.0" job.dynamic_tool = tool.dynamic_tool return job, galaxy_session def _record_inputs(self, trans, tool, job, incoming, inp_data, inp_dataset_collections): # FIXME: Don't need all of incoming here, just the defined parameters # from the tool. We need to deal with tools that pass all post # parameters to the command as a special case. reductions = {} for name, dataset_collection_info_pairs in inp_dataset_collections.items(): for (dataset_collection, reduced) in dataset_collection_info_pairs: if reduced: if name not in reductions: reductions[name] = [] reductions[name].append(dataset_collection) # TODO: verify can have multiple with same name, don't want to lose traceability if isinstance(dataset_collection, model.HistoryDatasetCollectionAssociation): job.add_input_dataset_collection(name, dataset_collection) elif isinstance(dataset_collection, model.DatasetCollectionElement): job.add_input_dataset_collection_element(name, dataset_collection) # If this an input collection is a reduction, we expanded it for dataset security, type # checking, and such, but the persisted input must be the original collection # so we can recover things like element identifier during tool command evaluation. def restore_reduction_visitor(input, value, prefix, parent=None, prefixed_name=None, **kwargs): if prefixed_name in reductions and isinstance(input, DataToolParameter): target_dict = parent if not target_dict: target_dict = incoming target_dict[input.name] = [] for reduced_collection in reductions[prefixed_name]: if hasattr(reduced_collection, "child_collection"): target_dict[input.name].append({'id': model.cached_id(reduced_collection), 'src': 'dce'}) else: target_dict[input.name].append({'id': model.cached_id(reduced_collection), 'src': 'hdca'}) if reductions: tool.visit_inputs(incoming, restore_reduction_visitor) for name, value in tool.params_to_strings(incoming, trans.app).items(): job.add_parameter(name, value) self._record_input_datasets(trans, job, inp_data) def _record_outputs(self, job, out_data, output_collections): out_collections = output_collections.out_collections out_collection_instances = output_collections.out_collection_instances for name, dataset in out_data.items(): job.add_output_dataset(name, dataset) for name, dataset_collection in out_collections.items(): job.add_implicit_output_dataset_collection(name, dataset_collection) for name, dataset_collection_instance in out_collection_instances.items(): job.add_output_dataset_collection(name, dataset_collection_instance) dataset_collection_instance.job = job def _record_input_datasets(self, trans, job, inp_data): for name, dataset in inp_data.items(): # TODO: figure out why can't pass dataset_id here. job.add_input_dataset(name, dataset=dataset)
[docs] def get_output_name(self, output, dataset=None, tool=None, on_text=None, trans=None, incoming=None, history=None, params=None, job_params=None): if output.label: params['tool'] = tool params['on_string'] = on_text return fill_template(output.label, context=params, python_template_version=tool.python_template_version) else: return self._get_default_data_name(dataset, tool, on_text=on_text, trans=trans, incoming=incoming, history=history, params=params, job_params=job_params)
[docs] def set_metadata_defaults(self, output, dataset, tool, on_text, trans, incoming, history, params, job_params): """ This allows to map names of input files to metadata default values. Example: .. code-block:: <data format="tabular" name="output" label="Tabular output, aggregates data from individual_inputs" > <actions> <action name="column_names" type="metadata" default="${','.join(input.name for input in $individual_inputs)}" /> </actions> </data> """ if output.actions: for action in output.actions.actions: if action.tag == "metadata" and action.default: metadata_new_value = fill_template(action.default, context=params, python_template_version=tool.python_template_version).split(",") dataset.metadata.__setattr__(str(action.name), metadata_new_value)
def _get_default_data_name(self, dataset, tool, on_text=None, trans=None, incoming=None, history=None, params=None, job_params=None, **kwd): name = tool.name if on_text: name += f" on {on_text}" return name
[docs]class OutputCollections: """ Keeps track of collections (DC or HDCA) created by actions. Actions do fairly different things depending on whether we are creating just part of an collection or a whole output collection (mapping_over_collection parameter). """
[docs] def __init__(self, trans, history, tool, tool_action, input_collections, dataset_collection_elements, on_text, incoming, params, job_params, tags, hdca_tags): self.trans = trans self.tag_handler = trans.app.tag_handler.create_tag_handler_session() self.history = history self.tool = tool self.tool_action = tool_action self.input_collections = input_collections self.dataset_collection_elements = dataset_collection_elements self.on_text = on_text self.incoming = incoming self.params = params self.job_params = job_params self.out_collections = {} self.out_collection_instances = {} self.tags = tags # all inherited tags self.hdca_tags = hdca_tags # only tags inherited from input HDCAs
[docs] def create_collection(self, output, name, collection_type=None, completed_job=None, propagate_hda_tags=True, **element_kwds): input_collections = self.input_collections collections_manager = self.trans.app.dataset_collection_manager collection_type = collection_type or output.structure.collection_type if collection_type is None: collection_type_source = output.structure.collection_type_source if collection_type_source is None: # TODO: Not a new problem, but this should be determined # sooner. raise Exception("Could not determine collection type to create.") if collection_type_source not in input_collections: raise Exception(f"Could not find collection type source with name [{collection_type_source}].") # Using the collection_type_source string we get the DataCollectionToolParameter data_param = self.tool.inputs groups = collection_type_source.split('|') for group in groups: values = group.split('_') if values[-1].isdigit(): key = ("_".join(values[0:-1])) # We don't care about the repeat index, we just need to find the correct DataCollectionToolParameter else: key = group if isinstance(data_param, dict): data_param = data_param.get(key) else: data_param = data_param.inputs.get(key) collection_type_description = data_param._history_query(self.trans).can_map_over(input_collections[collection_type_source]) if collection_type_description: collection_type = collection_type_description.collection_type else: collection_type = input_collections[collection_type_source].collection.collection_type if "elements" in element_kwds: def check_elements(elements): if hasattr(elements, "items"): # else it is ELEMENTS_UNINITIALIZED object. for value in elements.values(): # Either a HDA (if) or a DatasetCollection or a recursive dict. if getattr(value, "history_content_type", None) == "dataset": assert value.history is not None or value.history_id is not None elif hasattr(value, "dataset_instances"): for dataset in value.dataset_instances: assert dataset.history is not None or dataset.history_id is not None else: assert value["src"] == "new_collection" check_elements(value["elements"]) elements = element_kwds["elements"] check_elements(elements) if self.dataset_collection_elements is not None: dc = collections_manager.create_dataset_collection( self.trans, collection_type=collection_type, **element_kwds ) if name in self.dataset_collection_elements: self.dataset_collection_elements[name].child_collection = dc # self.trans.sa_session.add(self.dataset_collection_elements[name]) self.out_collections[name] = dc else: hdca_name = self.tool_action.get_output_name( output, None, self.tool, self.on_text, self.trans, self.incoming, self.history, self.params, self.job_params, ) hdca = collections_manager.create( self.trans, self.history, name=hdca_name, collection_type=collection_type, trusted_identifiers=True, tags=self.tags if propagate_hda_tags else self.hdca_tags, set_hid=False, flush=False, completed_job=completed_job, output_name=name, **element_kwds ) # name here is name of the output element - not name # of the hdca. self.history.stage_addition(hdca) self.out_collection_instances[name] = hdca
[docs]def on_text_for_names(input_names): # input_names may contain duplicates... this is because the first value in # multiple input dataset parameters will appear twice once as param_name # and once as param_name1. unique_names = [] for name in input_names: if name not in unique_names: unique_names.append(name) input_names = unique_names # Build name for output datasets based on tool name and input names if len(input_names) == 1: on_text = input_names[0] elif len(input_names) == 2: on_text = '%s and %s' % tuple(input_names[0:2]) elif len(input_names) == 3: on_text = '%s, %s, and %s' % tuple(input_names[0:3]) elif len(input_names) > 3: on_text = '%s, %s, and others' % tuple(input_names[0:2]) else: on_text = "" return on_text
[docs]def filter_output(tool, output, incoming): for filter in output.filters: try: if not eval(filter.text.strip(), globals(), incoming): return True # do not create this dataset except Exception as e: log.debug(f'Tool {tool.id} output {output.name}: dataset output filter ({filter.text}) failed: {e}') return False
[docs]def get_ext_or_implicit_ext(hda): if hda.implicitly_converted_parent_datasets: # implicitly_converted_parent_datasets is a list of ImplicitlyConvertedDatasetAssociation # objects, and their type is the target_ext, so this should be correct even if there # are multiple ImplicitlyConvertedDatasetAssociation objects (meaning 2 datasets had been converted # to produce a dataset with the required datatype) return hda.implicitly_converted_parent_datasets[0].type return hda.ext
[docs]def determine_output_format(output, parameter_context, input_datasets, input_dataset_collections, random_input_ext, python_template_version='3', execution_cache=None): """ Determines the output format for a dataset based on an abstract description of the output (galaxy.tool_util.parser.ToolOutput), the parameter wrappers, a map of the input datasets (name => HDA), and the last input extensions in the tool form. TODO: Don't deal with XML here - move this logic into ToolOutput. TODO: Make the input extension used deterministic instead of random. """ # the type should match the input ext = output.format if ext == "input": if input_datasets and random_input_ext in {'data', 'auto'}: # Probably dealing with an implicitly converted dataset try: first_input_dataset = next(iter(input_datasets.values())) random_input_ext = get_ext_or_implicit_ext(first_input_dataset) except Exception: pass ext = random_input_ext format_source = output.format_source if format_source is not None and format_source in input_datasets: try: input_dataset = input_datasets[output.format_source] ext = get_ext_or_implicit_ext(input_dataset) except Exception: pass elif format_source is not None: element_index = None collection_name = format_source if re.match(r"^[^\[\]]*\[[^\[\]]*\]$", format_source): collection_name, element_index = format_source[0:-1].split("[") # Treat as json to interpret "forward" vs 0 with type # Make it feel more like Python, single quote better in XML also. element_index = element_index.replace("'", '"') element_index = json.loads(element_index) if collection_name in input_dataset_collections: try: input_collection = input_dataset_collections[collection_name][0][0] input_collection_collection = input_collection.collection if element_index is None: # just pick the first HDA input_dataset = input_collection_collection.dataset_instances[0] else: try: input_element = input_collection_collection[element_index] except KeyError: if execution_cache: dataset_elements = execution_cache.cached_collection_elements.get(input_collection_collection.id) if dataset_elements is None: dataset_elements = execution_cache.cached_collection_elements[input_collection_collection.id] = input_collection_collection.dataset_elements else: dataset_elements = input_collection_collection.dataset_elements for element in dataset_elements: if element.element_identifier == element_index: input_element = element break input_dataset = input_element.element_object ext = get_ext_or_implicit_ext(input_dataset) except Exception as e: log.debug("Exception while trying to determine format_source: %s", e) # process change_format tags if output.change_format is not None: new_format_set = False for change_elem in output.change_format: for when_elem in change_elem.findall('when'): check = when_elem.get('input', None) if check is not None: try: if '$' not in check: # allow a simple name or more complex specifications check = '${%s}' % check if fill_template(check, context=parameter_context, python_template_version=python_template_version) == when_elem.get('value', None): ext = when_elem.get('format', ext) except Exception: # bad tag input value; possibly referencing a param within a different conditional when block or other nonexistent grouping construct continue else: check = when_elem.get('input_dataset', None) if check is not None: check = input_datasets.get(check, None) # At this point check is a HistoryDatasetAssociation object. check_format = when_elem.get('format', ext) check_value = when_elem.get('value', None) check_attribute = when_elem.get('attribute', None) if check is not None and check_value is not None and check_attribute is not None: # See if the attribute to be checked belongs to the HistoryDatasetAssociation object. if hasattr(check, check_attribute): if str(getattr(check, check_attribute)) == str(check_value): ext = check_format new_format_set = True break # See if the attribute to be checked belongs to the metadata associated with the # HistoryDatasetAssociation object. if check.metadata is not None: metadata_value = check.metadata.get(check_attribute, None) if metadata_value is not None: if str(metadata_value) == str(check_value): ext = check_format new_format_set = True break if new_format_set: break return ext