Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.metadata
"""Define abstraction for capturing the metadata of job's output datasets."""
import abc
import json
import os
import shutil
from logging import getLogger
import galaxy.model
from galaxy.model import store
from galaxy.model.metadata import (
FileParameter,
MetadataTempFile,
)
from galaxy.model.store import DirectoryModelExportStore
from galaxy.util import safe_makedirs
log = getLogger(__name__)
SET_METADATA_SCRIPT = "from galaxy_ext.metadata.set_metadata import set_metadata; set_metadata()"
[docs]def get_metadata_compute_strategy(config, job_id, metadata_strategy_override=None, tool_id=None):
metadata_strategy = metadata_strategy_override or config.metadata_strategy
if metadata_strategy == "legacy":
raise Exception("legacy metadata_strategy has been removed")
elif "extended" in metadata_strategy and tool_id != "__SET_METADATA__":
return ExtendedDirectoryMetadataGenerator(job_id)
else:
return PortableDirectoryMetadataGenerator(job_id)
[docs]class MetadataCollectionStrategy(metaclass=abc.ABCMeta):
"""Interface describing the abstract process of writing out and collecting output metadata."""
extended = False
[docs] def invalidate_external_metadata(self, datasets, sa_session):
"""Invalidate written files."""
[docs] @abc.abstractmethod
def setup_external_metadata(
self,
datasets_dict,
out_collections,
sa_session,
exec_dir=None,
tmp_dir=None,
dataset_files_path=None,
output_fnames=None,
config_root=None,
use_bin=False,
config_file=None,
datatypes_config=None,
job_metadata=None,
provided_metadata_style=None,
compute_tmp_dir=None,
include_command=True,
max_metadata_value_size=0,
max_discovered_files=None,
object_store_conf=None,
tool=None,
job=None,
kwds=None,
):
"""Setup files needed for external metadata collection.
If include_command is True, return full Python command to externally compute metadata
otherwise just the arguments to galaxy_ext.metadata.set_metadata required to build.
"""
[docs] @abc.abstractmethod
def external_metadata_set_successfully(self, dataset, name, sa_session, working_directory):
"""Return boolean indicating if metadata for specified dataset was written properly."""
[docs] @abc.abstractmethod
def load_metadata(self, dataset, name, sa_session, working_directory, remote_metadata_directory=None):
"""Load metadata calculated externally into specified dataset."""
def _load_metadata_from_path(self, dataset, metadata_output_path, working_directory, remote_metadata_directory):
def path_rewriter(path):
if not path:
return path
normalized_remote_metadata_directory = remote_metadata_directory and os.path.normpath(
remote_metadata_directory
)
normalized_path = os.path.normpath(path)
if remote_metadata_directory and normalized_path.startswith(normalized_remote_metadata_directory):
if self.portable:
target_directory = os.path.join(working_directory, "metadata")
else:
target_directory = working_directory
return normalized_path.replace(normalized_remote_metadata_directory, target_directory, 1)
return path
dataset.metadata.from_JSON_dict(metadata_output_path, path_rewriter=path_rewriter)
def _metadata_results_from_file(self, dataset, filename_results_code):
try:
with open(filename_results_code) as f:
rval, rstring = json.load(f)
except OSError:
rval = False
rstring = f"Metadata results could not be read from '{filename_results_code}'"
if not rval:
log.debug(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
return rval
[docs]class PortableDirectoryMetadataGenerator(MetadataCollectionStrategy):
portable = True
write_object_store_conf = False
[docs] def setup_external_metadata(
self,
datasets_dict,
out_collections,
sa_session,
exec_dir=None,
tmp_dir=None,
dataset_files_path=None,
output_fnames=None,
config_root=None,
use_bin=False,
config_file=None,
datatypes_config=None,
job_metadata=None,
provided_metadata_style=None,
compute_tmp_dir=None,
include_command=True,
max_metadata_value_size=0,
max_discovered_files=None,
validate_outputs=False,
object_store_conf=None,
tool=None,
job=None,
link_data_only=False,
kwds=None,
):
assert job_metadata, "setup_external_metadata must be supplied with job_metadata path"
kwds = kwds or {}
if not job:
job = sa_session.query(galaxy.model.Job).get(self.job_id)
tmp_dir = _init_tmp_dir(tmp_dir)
metadata_dir = os.path.join(tmp_dir, "metadata")
# may already exist (i.e. metadata collection in the job handler)
safe_makedirs(metadata_dir)
def job_relative_path(path):
path_relative = os.path.relpath(path, tmp_dir)
return path_relative
outputs = {}
output_collections = {}
for name, dataset in datasets_dict.items():
assert name is not None
assert name not in outputs
key = name
def _metadata_path(what):
return os.path.join(metadata_dir, f"metadata_{what}_{key}")
_initialize_metadata_inputs(
dataset, _metadata_path, tmp_dir, kwds, real_metadata_object=self.write_object_store_conf
)
outputs[name] = {
"filename_override": _get_filename_override(output_fnames, dataset.file_name),
"validate": validate_outputs,
"object_store_store_by": dataset.dataset.store_by,
"id": dataset.id,
"model_class": "LibraryDatasetDatasetAssociation"
if isinstance(dataset, galaxy.model.LibraryDatasetDatasetAssociation)
else "HistoryDatasetAssociation",
}
metadata_params_path = os.path.join(metadata_dir, "params.json")
datatypes_config = os.path.relpath(datatypes_config, tmp_dir) if datatypes_config else None
metadata_params = {
"job_metadata": job_relative_path(job_metadata),
"provided_metadata_style": provided_metadata_style,
"datatypes_config": datatypes_config,
"max_metadata_value_size": max_metadata_value_size,
"max_discovered_files": max_discovered_files,
"outputs": outputs,
}
# export model objects and object store configuration for extended metadata also.
export_directory = os.path.join(metadata_dir, "outputs_new")
with DirectoryModelExportStore(
export_directory,
for_edit=True,
strip_metadata_files=False,
serialize_dataset_objects=True,
serialize_jobs=False,
) as export_store:
export_store.export_job(job, tool=tool)
for dataset in datasets_dict.values():
export_store.add_dataset(dataset)
for name, dataset_collection in out_collections.items():
export_store.export_collection(dataset_collection)
output_collections[name] = {
"id": dataset_collection.id,
"model_class": dataset_collection.__class__.__name__,
}
if self.write_object_store_conf:
with open(os.path.join(metadata_dir, "object_store_conf.json"), "w") as f:
json.dump(object_store_conf, f)
# setup tool
tool_as_dict = {}
tool_as_dict["stdio_exit_codes"] = [e.to_dict() for e in tool.stdio_exit_codes]
tool_as_dict["stdio_regexes"] = [r.to_dict() for r in tool.stdio_regexes]
tool_as_dict["outputs"] = {name: output.to_dict() for name, output in tool.outputs.items()}
tool_as_dict["output_collections"] = {
name: output.to_dict() for name, output in tool.output_collections.items()
}
# setup the rest
metadata_params["tool"] = tool_as_dict
metadata_params["link_data_only"] = link_data_only
metadata_params["tool_path"] = tool.config_file
metadata_params["job_id_tag"] = job.get_id_tag()
metadata_params["implicit_collection_jobs_association_id"] = (
job.implicit_collection_jobs_association and job.implicit_collection_jobs_association.id
)
metadata_params["job_params"] = job.raw_param_dict()
metadata_params["output_collections"] = output_collections
with open(metadata_params_path, "w") as f:
json.dump(metadata_params, f)
if include_command:
# return command required to build
if use_bin:
return "galaxy-set-metadata"
else:
script_path = os.path.join(metadata_dir, "set.py")
with open(script_path, "w") as f:
f.write(SET_METADATA_SCRIPT)
return 'python "metadata/set.py"'
else:
# return args to galaxy_ext.metadata.set_metadata required to build
return ""
[docs] def load_metadata(self, dataset, name, sa_session, working_directory, remote_metadata_directory=None):
metadata_output_path = os.path.join(working_directory, "metadata", f"metadata_out_{name}")
self._load_metadata_from_path(dataset, metadata_output_path, working_directory, remote_metadata_directory)
[docs] def external_metadata_set_successfully(self, dataset, name, sa_session, working_directory):
metadata_results_path = os.path.join(working_directory, "metadata", f"metadata_results_{name}")
try:
return self._metadata_results_from_file(dataset, metadata_results_path)
except Exception:
# if configured we need to try setting metadata internally
return False
[docs]class ExtendedDirectoryMetadataGenerator(PortableDirectoryMetadataGenerator):
extended = True
write_object_store_conf = True
[docs] def setup_external_metadata(self, datasets_dict, out_collections, sa_session, **kwd):
command = super().setup_external_metadata(datasets_dict, out_collections, sa_session, **kwd)
return command
[docs] def load_metadata(self, dataset, name, sa_session, working_directory, remote_metadata_directory=None):
# This method shouldn't really be called one-at-a-time dataset-wise like this and
# isn't in job_wrapper.finish, instead finish just executes perform_import() on
# the target model store within the context of a session to bring in all the changed objects.
# However, this method is part of the metadata interface and is used by unit tests,
# so we allow a sessionless import and loading of individual dataset as below.
import_model_store = store.imported_store_for_metadata(
os.path.join(working_directory, "metadata", "outputs_populated")
)
imported_dataset = import_model_store.sa_session.query(galaxy.model.HistoryDatasetAssociation).find(dataset.id)
dataset.metadata = imported_dataset.metadata
return dataset
def _initialize_metadata_inputs(dataset, path_for_part, tmp_dir, kwds, real_metadata_object=True):
filename_out = path_for_part("out")
filename_results_code = path_for_part("results")
filename_kwds = path_for_part("kwds")
filename_override_metadata = path_for_part("override")
open(filename_out, "wt+") # create the file on disk, so it cannot be reused by tempfile (unlikely, but possible)
# create the file on disk, so it cannot be reused by tempfile (unlikely, but possible)
json.dump((False, "External set_meta() not called"), open(filename_results_code, "wt+"))
json.dump(kwds, open(filename_kwds, "wt+"), ensure_ascii=True)
override_metadata = []
for meta_key, spec_value in dataset.metadata.spec.items():
if isinstance(spec_value.param, FileParameter) and dataset.metadata.get(meta_key, None) is not None:
if not real_metadata_object:
metadata_temp = MetadataTempFile()
metadata_temp.tmp_dir = tmp_dir
shutil.copy(dataset.metadata.get(meta_key, None).file_name, metadata_temp.file_name)
override_metadata.append((meta_key, metadata_temp.to_JSON()))
json.dump(override_metadata, open(filename_override_metadata, "wt+"))
return filename_out, filename_results_code, filename_kwds, filename_override_metadata
def _get_filename_override(output_fnames, file_name):
if output_fnames:
for dataset_path in output_fnames:
if dataset_path.real_path == file_name:
return dataset_path.false_path or dataset_path.real_path
return ""
def _init_tmp_dir(tmp_dir):
assert tmp_dir is not None
safe_makedirs(tmp_dir)
return tmp_dir