Source code for galaxy.tools.actions.metadata
import logging
import os
from json import dumps
from galaxy.job_execution.datasets import DatasetPath
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model.base import transaction
from galaxy.util import asbool
from . import ToolAction
log = logging.getLogger(__name__)
[docs]class SetMetadataToolAction(ToolAction):
"""Tool action used for setting external metadata on an existing dataset"""
produces_real_jobs = False
[docs] def execute(
self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, job_params=None, **kwargs
):
"""
Execute using a web transaction.
"""
trans.check_user_activation()
session = trans.get_galaxy_session()
session_id = session and session.id
history_id = trans.history and trans.history.id
incoming = incoming or {}
job, odict = self.execute_via_app(
tool,
trans.app,
session_id,
history_id,
trans.user,
incoming,
set_output_hid,
overwrite,
history,
job_params,
)
# FIXME: can remove this when logging in execute_via_app method.
trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id)
return job, odict
[docs] def execute_via_app(
self,
tool,
app,
session_id,
history_id,
user=None,
incoming=None,
set_output_hid=False,
overwrite=True,
history=None,
job_params=None,
):
"""
Execute using application.
"""
incoming = incoming or {}
for name, value in incoming.items():
# Why are we looping here and not just using a fixed input name? Needed?
if not name.startswith("input"):
continue
if isinstance(value, app.model.HistoryDatasetAssociation):
dataset = value
dataset_name = name
type = "hda"
break
elif isinstance(value, app.model.LibraryDatasetDatasetAssociation):
dataset = value
dataset_name = name
type = "ldda"
break
else:
raise Exception("The dataset to set metadata on could not be determined.")
sa_session = app.model.context
# Create the job object
job = app.model.Job()
job.galaxy_version = app.config.version_major
job.session_id = session_id
job.history_id = history_id
job.tool_id = tool.id
if user:
job.user_id = user.id
if job_params:
job.params = dumps(job_params)
start_job_state = job.state # should be job.states.NEW
try:
# For backward compatibility, some tools may not have versions yet.
job.tool_version = tool.version
except AttributeError:
job.tool_version = "1.0.1"
job.dynamic_tool = tool.dynamic_tool
job.state = (
job.states.WAITING
) # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters
sa_session.add(job)
with transaction(sa_session): # ensure job.id is available
sa_session.commit()
# add parameters to job_parameter table
# Store original dataset state, so we can restore it. A separate table might be better (no chance of 'losing' the original state)?
incoming["__ORIGINAL_DATASET_STATE__"] = dataset.state
input_paths = [DatasetPath(dataset.id, real_path=dataset.get_file_name(), mutable=False)]
app.object_store.create(job, base_dir="job_work", dir_only=True, extra_dir=str(job.id))
job_working_dir = app.object_store.get_filename(job, base_dir="job_work", dir_only=True, extra_dir=str(job.id))
datatypes_config = os.path.join(job_working_dir, "registry.xml")
app.datatypes_registry.to_xml_file(path=datatypes_config)
external_metadata_wrapper = get_metadata_compute_strategy(app.config, job.id, tool_id=tool.id)
output_datatasets_dict = {
dataset_name: dataset,
}
validate_outputs = asbool(incoming.get("validate", False))
cmd_line = external_metadata_wrapper.setup_external_metadata(
output_datatasets_dict,
{},
sa_session,
exec_dir=None,
tmp_dir=job_working_dir,
dataset_files_path=app.model.Dataset.file_path,
output_fnames=input_paths,
config_root=app.config.root,
config_file=app.config.config_file,
datatypes_config=datatypes_config,
job_metadata=os.path.join(job_working_dir, "working", tool.provided_metadata_file),
include_command=False,
max_metadata_value_size=app.config.max_metadata_value_size,
max_discovered_files=app.config.max_discovered_files,
validate_outputs=validate_outputs,
job=job,
kwds={"overwrite": overwrite},
)
incoming["__SET_EXTERNAL_METADATA_COMMAND_LINE__"] = cmd_line
for name, value in tool.params_to_strings(incoming, app).items():
job.add_parameter(name, value)
# add the dataset to job_to_input_dataset table
if type == "hda":
job.add_input_dataset(dataset_name, dataset)
elif type == "ldda":
job.add_input_library_dataset(dataset_name, dataset)
# Need a special state here to show that metadata is being set and also allow the job to run
# i.e. if state was set to 'running' the set metadata job would never run, as it would wait for input (the dataset to set metadata on) to be in a ready state
dataset.state = dataset.states.SETTING_METADATA
job.state = start_job_state # job inputs have been configured, restore initial job state
with transaction(sa_session):
sa_session.commit()
# clear e.g. converted files
dataset.datatype.before_setting_metadata(dataset)
return job, {}