Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.actions.model_operations
import logging
from typing import (
Optional,
TYPE_CHECKING,
)
from galaxy.model import (
History,
Job,
)
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.objectstore import ObjectStorePopulator
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.actions import (
DefaultToolAction,
OutputCollections,
OutputDatasetsT,
ToolActionExecuteResult,
)
from galaxy.tools.execute import (
DatasetCollectionElementsSliceT,
DEFAULT_DATASET_COLLECTION_ELEMENTS,
DEFAULT_JOB_CALLBACK,
DEFAULT_PREFERRED_OBJECT_STORE_ID,
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
if TYPE_CHECKING:
from galaxy.managers.context import ProvidesUserContext
log = logging.getLogger(__name__)
[docs]class ModelOperationToolAction(DefaultToolAction):
produces_real_jobs: bool = False
[docs] def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None):
if execution_cache is None:
execution_cache = ToolExecutionCache(trans)
current_user_roles = execution_cache.current_user_roles
history, inp_data, inp_dataset_collections, _, _, _ = self._collect_inputs(
tool, trans, incoming, history, current_user_roles, collection_info
)
tool.check_inputs_ready(inp_data, inp_dataset_collections)
[docs] def execute(
self,
tool,
trans,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
execution_cache: Optional[ToolExecutionCache] = None,
dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS,
completed_job: Optional[Job] = None,
collection_info: Optional[MatchingCollections] = None,
job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK,
preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID,
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
flush_job: bool = True,
skip: bool = False,
) -> ToolActionExecuteResult:
incoming = incoming or {}
trans.check_user_activation()
if execution_cache is None:
execution_cache = ToolExecutionCache(trans)
current_user_roles = execution_cache.current_user_roles
(
history,
inp_data,
inp_dataset_collections,
preserved_tags,
preserved_hdca_tags,
all_permissions,
) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info)
# Build name for output datasets based on tool name and input names
on_text = self._get_on_text(inp_data)
# wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed
wrapped_params = self._wrapped_params(trans, tool, incoming)
out_data: OutputDatasetsT = {}
input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()}
output_collections = OutputCollections(
trans,
history,
tool=tool,
tool_action=self,
input_collections=input_collections,
dataset_collection_elements=dataset_collection_elements,
on_text=on_text,
incoming=incoming,
params=wrapped_params.params,
job_params=job_params,
tags=preserved_tags,
hdca_tags=preserved_hdca_tags,
)
#
# Create job.
#
job, galaxy_session = self._new_job_for_session(trans, tool, history)
self._produce_outputs(
trans,
tool,
out_data,
output_collections,
incoming=incoming,
history=history,
tags=preserved_tags,
hdca_tags=preserved_hdca_tags,
skip=skip,
)
self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections)
self._record_outputs(job, out_data, output_collections)
if job_callback:
job_callback(job)
if skip:
job.state = job.states.SKIPPED
else:
job.state = job.states.OK
trans.sa_session.add(job)
# Queue the job for execution
# trans.app.job_manager.job_queue.put( job.id, tool.id )
# trans.log_event( "Added database job action to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )
log.info(f"Calling produce_outputs, tool is {tool}")
return job, out_data, history
def _produce_outputs(
self, trans: "ProvidesUserContext", tool, out_data, output_collections, incoming, history, tags, hdca_tags, skip
):
tag_handler = trans.tag_handler
tool.produce_outputs(
trans,
out_data,
output_collections,
incoming,
history=history,
tags=tags,
hdca_tags=hdca_tags,
tag_handler=tag_handler,
)
if mapped_over_elements := output_collections.dataset_collection_elements:
for name, value in out_data.items():
if name in mapped_over_elements:
value.visible = False
mapped_over_elements[name].hda = value
# We probably need to mark all outputs as skipped, not just the outputs of whatever the database op tools do ?
# This is probably not exactly right, but it might also work in most cases
if skip:
for output_collection in output_collections.out_collections.values():
output_collection.mark_as_populated()
object_store_populator = ObjectStorePopulator(trans.app, trans.user)
for hdca in output_collections.out_collection_instances.values():
hdca.visible = False
# Would we also need to replace the datasets with skipped datasets?
for data in hdca.dataset_instances:
data.set_skipped(object_store_populator)
trans.sa_session.add_all(out_data.values())