Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.actions.model_operations

import logging
from typing import TYPE_CHECKING

from galaxy.tools.actions import (
    DefaultToolAction,
    OutputCollections,
    ToolExecutionCache,
)

if TYPE_CHECKING:
    from galaxy.managers.context import ProvidesUserContext

log = logging.getLogger(__name__)


[docs]class ModelOperationToolAction(DefaultToolAction): produces_real_jobs = False
[docs] def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None): if execution_cache is None: execution_cache = ToolExecutionCache(trans) current_user_roles = execution_cache.current_user_roles history, inp_data, inp_dataset_collections, _, _, _ = self._collect_inputs( tool, trans, incoming, history, current_user_roles, collection_info ) tool.check_inputs_ready(inp_data, inp_dataset_collections)
[docs] def execute( self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, job_params=None, execution_cache=None, collection_info=None, job_callback=None, skip=False, **kwargs, ): incoming = incoming or {} trans.check_user_activation() if execution_cache is None: execution_cache = ToolExecutionCache(trans) current_user_roles = execution_cache.current_user_roles ( history, inp_data, inp_dataset_collections, preserved_tags, preserved_hdca_tags, all_permissions, ) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info) # Build name for output datasets based on tool name and input names on_text = self._get_on_text(inp_data) # wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed wrapped_params = self._wrapped_params(trans, tool, incoming) out_data = {} input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()} output_collections = OutputCollections( trans, history, tool=tool, tool_action=self, input_collections=input_collections, dataset_collection_elements=kwargs.get("dataset_collection_elements", None), on_text=on_text, incoming=incoming, params=wrapped_params.params, job_params=job_params, tags=preserved_tags, hdca_tags=preserved_hdca_tags, ) # # Create job. # job, galaxy_session = self._new_job_for_session(trans, tool, history) self._produce_outputs( trans, tool, out_data, output_collections, incoming=incoming, history=history, tags=preserved_tags, hdca_tags=preserved_hdca_tags, skip=skip, ) self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections) self._record_outputs(job, out_data, output_collections) if job_callback: job_callback(job) if skip: job.state = job.states.SKIPPED else: job.state = job.states.OK trans.sa_session.add(job) # Queue the job for execution # trans.app.job_manager.job_queue.put( job.id, tool.id ) # trans.log_event( "Added database job action to the job queue, id: %s" % str(job.id), tool_id=job.tool_id ) log.info(f"Calling produce_outputs, tool is {tool}") return job, out_data, history
def _produce_outputs( self, trans: "ProvidesUserContext", tool, out_data, output_collections, incoming, history, tags, hdca_tags, skip ): tag_handler = trans.tag_handler tool.produce_outputs( trans, out_data, output_collections, incoming, history=history, tags=tags, hdca_tags=hdca_tags, tag_handler=tag_handler, ) if mapped_over_elements := output_collections.dataset_collection_elements: for name, value in out_data.items(): if name in mapped_over_elements: value.visible = False mapped_over_elements[name].hda = value # We probably need to mark all outputs as skipped, not just the outputs of whatever the database op tools do ? # This is probably not exactly right, but it might also work in most cases if skip: for output_collection in output_collections.out_collections.values(): output_collection.mark_as_populated() for hdca in output_collections.out_collection_instances.values(): hdca.visible = False # Would we also need to replace the datasets with skipped datasets? trans.sa_session.add_all(out_data.values())