Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.actions.model_operations
import logging
from typing import TYPE_CHECKING
from galaxy.tools.actions import (
DefaultToolAction,
OutputCollections,
ToolExecutionCache,
)
if TYPE_CHECKING:
from galaxy.managers.context import ProvidesUserContext
log = logging.getLogger(__name__)
[docs]class ModelOperationToolAction(DefaultToolAction):
produces_real_jobs = False
[docs] def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None):
if execution_cache is None:
execution_cache = ToolExecutionCache(trans)
current_user_roles = execution_cache.current_user_roles
history, inp_data, inp_dataset_collections, _, _, _ = self._collect_inputs(
tool, trans, incoming, history, current_user_roles, collection_info
)
tool.check_inputs_ready(inp_data, inp_dataset_collections)
[docs] def execute(
self,
tool,
trans,
incoming=None,
set_output_hid=False,
overwrite=True,
history=None,
job_params=None,
execution_cache=None,
collection_info=None,
job_callback=None,
skip=False,
**kwargs,
):
incoming = incoming or {}
trans.check_user_activation()
if execution_cache is None:
execution_cache = ToolExecutionCache(trans)
current_user_roles = execution_cache.current_user_roles
(
history,
inp_data,
inp_dataset_collections,
preserved_tags,
preserved_hdca_tags,
all_permissions,
) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info)
# Build name for output datasets based on tool name and input names
on_text = self._get_on_text(inp_data)
# wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed
wrapped_params = self._wrapped_params(trans, tool, incoming)
out_data = {}
input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()}
output_collections = OutputCollections(
trans,
history,
tool=tool,
tool_action=self,
input_collections=input_collections,
dataset_collection_elements=kwargs.get("dataset_collection_elements", None),
on_text=on_text,
incoming=incoming,
params=wrapped_params.params,
job_params=job_params,
tags=preserved_tags,
hdca_tags=preserved_hdca_tags,
)
#
# Create job.
#
job, galaxy_session = self._new_job_for_session(trans, tool, history)
self._produce_outputs(
trans,
tool,
out_data,
output_collections,
incoming=incoming,
history=history,
tags=preserved_tags,
hdca_tags=preserved_hdca_tags,
skip=skip,
)
self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections)
self._record_outputs(job, out_data, output_collections)
if job_callback:
job_callback(job)
if skip:
job.state = job.states.SKIPPED
else:
job.state = job.states.OK
trans.sa_session.add(job)
# Queue the job for execution
# trans.app.job_manager.job_queue.put( job.id, tool.id )
# trans.log_event( "Added database job action to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )
log.info(f"Calling produce_outputs, tool is {tool}")
return job, out_data, history
def _produce_outputs(
self, trans: "ProvidesUserContext", tool, out_data, output_collections, incoming, history, tags, hdca_tags, skip
):
tag_handler = trans.tag_handler
tool.produce_outputs(
trans,
out_data,
output_collections,
incoming,
history=history,
tags=tags,
hdca_tags=hdca_tags,
tag_handler=tag_handler,
)
if mapped_over_elements := output_collections.dataset_collection_elements:
for name, value in out_data.items():
if name in mapped_over_elements:
value.visible = False
mapped_over_elements[name].hda = value
# We probably need to mark all outputs as skipped, not just the outputs of whatever the database op tools do ?
# This is probably not exactly right, but it might also work in most cases
if skip:
for output_collection in output_collections.out_collections.values():
output_collection.mark_as_populated()
for hdca in output_collections.out_collection_instances.values():
hdca.visible = False
# Would we also need to replace the datasets with skipped datasets?
trans.sa_session.add_all(out_data.values())