Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.actions.metadata

import logging
import os
from json import dumps
from typing import (
    Any,
    Dict,
    Optional,
)

from galaxy.job_execution.datasets import DatasetPath
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import (
    History,
    Job,
    User,
)
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.execute import (
    DatasetCollectionElementsSliceT,
    DEFAULT_DATASET_COLLECTION_ELEMENTS,
    DEFAULT_JOB_CALLBACK,
    DEFAULT_PREFERRED_OBJECT_STORE_ID,
    DEFAULT_RERUN_REMAP_JOB_ID,
    DEFAULT_SET_OUTPUT_HID,
    JobCallbackT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
from galaxy.util import asbool
from . import ToolAction

log = logging.getLogger(__name__)


[docs]class SetMetadataToolAction(ToolAction): """Tool action used for setting external metadata on an existing dataset""" produces_real_jobs: bool = False set_output_hid: bool = False
[docs] def execute( self, tool, trans, incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, completed_job: Optional[Job] = None, collection_info: Optional[MatchingCollections] = None, job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, skip: bool = False, ): """ Execute using a web transaction. """ overwrite = True job, odict = self.execute_via_trans( tool, trans, incoming, overwrite, history, job_params, ) # FIXME: can remove this when logging in execute_via_app method. trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) return job, odict
[docs] def execute_via_trans( self, tool, trans, incoming: Optional[Dict[str, Any]], overwrite: bool = True, history: Optional[History] = None, job_params: Optional[Dict[str, Any]] = None, ): trans.check_user_activation() session = trans.get_galaxy_session() session_id = session and session.id history_id = trans.history and trans.history.id incoming = incoming or {} return self.execute_via_app( tool, trans.app, session_id, history_id, trans.user, incoming, overwrite, history, job_params, )
[docs] def execute_via_app( self, tool, app, session_id: Optional[int], history_id: Optional[int], user: Optional[User] = None, incoming: Optional[Dict[str, Any]] = None, overwrite: bool = True, history: Optional[History] = None, job_params: Optional[Dict[str, Any]] = None, ): """ Execute using application. """ incoming = incoming or {} for name, value in incoming.items(): # Why are we looping here and not just using a fixed input name? Needed? if not name.startswith("input"): continue if isinstance(value, app.model.HistoryDatasetAssociation): dataset = value dataset_name = name type = "hda" break elif isinstance(value, app.model.LibraryDatasetDatasetAssociation): dataset = value dataset_name = name type = "ldda" break else: raise Exception("The dataset to set metadata on could not be determined.") sa_session = app.model.context # Create the job object job = app.model.Job() job.galaxy_version = app.config.version_major job.session_id = session_id job.history_id = history_id job.tool_id = tool.id if user: job.user_id = user.id if job_params: job.params = dumps(job_params) start_job_state = job.state # should be job.states.NEW try: # For backward compatibility, some tools may not have versions yet. job.tool_version = tool.version except AttributeError: job.tool_version = "1.0.1" job.dynamic_tool = tool.dynamic_tool job.state = ( job.states.WAITING ) # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters sa_session.add(job) with transaction(sa_session): # ensure job.id is available sa_session.commit() # add parameters to job_parameter table # Store original dataset state, so we can restore it. A separate table might be better (no chance of 'losing' the original state)? incoming["__ORIGINAL_DATASET_STATE__"] = dataset.state input_paths = [DatasetPath(dataset.id, real_path=dataset.get_file_name(), mutable=False)] app.object_store.create(job, base_dir="job_work", dir_only=True, extra_dir=str(job.id)) job_working_dir = app.object_store.get_filename(job, base_dir="job_work", dir_only=True, extra_dir=str(job.id)) datatypes_config = os.path.join(job_working_dir, "registry.xml") app.datatypes_registry.to_xml_file(path=datatypes_config) external_metadata_wrapper = get_metadata_compute_strategy(app.config, job.id, tool_id=tool.id) output_datatasets_dict = { dataset_name: dataset, } validate_outputs = asbool(incoming.get("validate", False)) cmd_line = external_metadata_wrapper.setup_external_metadata( output_datatasets_dict, {}, sa_session, exec_dir=None, tmp_dir=job_working_dir, dataset_files_path=app.model.Dataset.file_path, output_fnames=input_paths, config_root=app.config.root, config_file=app.config.config_file, datatypes_config=datatypes_config, job_metadata=os.path.join(job_working_dir, "working", tool.provided_metadata_file), include_command=False, max_metadata_value_size=app.config.max_metadata_value_size, max_discovered_files=app.config.max_discovered_files, validate_outputs=validate_outputs, job=job, kwds={"overwrite": overwrite}, ) incoming["__SET_EXTERNAL_METADATA_COMMAND_LINE__"] = cmd_line for name, value in tool.params_to_strings(incoming, app).items(): job.add_parameter(name, value) # add the dataset to job_to_input_dataset table if type == "hda": job.add_input_dataset(dataset_name, dataset) elif type == "ldda": job.add_input_library_dataset(dataset_name, dataset) # Need a special state here to show that metadata is being set and also allow the job to run # i.e. if state was set to 'running' the set metadata job would never run, as it would wait for input (the dataset to set metadata on) to be in a ready state dataset.state = dataset.states.SETTING_METADATA job.state = start_job_state # job inputs have been configured, restore initial job state with transaction(sa_session): sa_session.commit() # clear e.g. converted files dataset.datatype.before_setting_metadata(dataset) return job, {}