Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.actions.data_manager
import logging
from typing import Optional
from galaxy.model import (
History,
Job,
)
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.execute import (
DatasetCollectionElementsSliceT,
DEFAULT_DATASET_COLLECTION_ELEMENTS,
DEFAULT_JOB_CALLBACK,
DEFAULT_PREFERRED_OBJECT_STORE_ID,
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
from . import (
DefaultToolAction,
ToolActionExecuteResult,
)
log = logging.getLogger(__name__)
[docs]class DataManagerToolAction(DefaultToolAction):
"""Tool action used for Data Manager Tools"""
[docs] def execute(
self,
tool,
trans,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
execution_cache: Optional[ToolExecutionCache] = None,
dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS,
completed_job: Optional[Job] = None,
collection_info: Optional[MatchingCollections] = None,
job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK,
preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID,
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
flush_job: bool = True,
skip: bool = False,
) -> ToolActionExecuteResult:
rval = super().execute(
tool,
trans,
incoming=incoming,
history=history,
job_params=job_params,
rerun_remap_job_id=rerun_remap_job_id,
execution_cache=execution_cache,
dataset_collection_elements=dataset_collection_elements,
completed_job=completed_job,
collection_info=collection_info,
job_callback=job_callback,
preferred_object_store_id=preferred_object_store_id,
set_output_hid=set_output_hid,
flush_job=flush_job,
skip=skip,
)
if isinstance(rval, tuple) and len(rval) >= 2 and isinstance(rval[0], trans.app.model.Job):
assoc = trans.app.model.DataManagerJobAssociation(job=rval[0], data_manager_id=tool.data_manager_id)
trans.sa_session.add(assoc)
with transaction(trans.sa_session):
trans.sa_session.commit()
else:
log.error(f"Got bad return value from DefaultToolAction.execute(): {rval}")
return rval