Source code for galaxy.tool_util.provided_metadata
import json
import logging
import os
import re
from galaxy.util import stringify_dictionary_keys
log = logging.getLogger(__name__)
[docs]def parse_tool_provided_metadata(meta_file, provided_metadata_style=None, job_wrapper=None):
"""Return a ToolProvidedMetadata object for specified file path.
If meta_file is absent, return a NullToolProvidedMetadata. If provided_metadata_style is None
attempt to guess tool provided metadata type.
"""
if not os.path.exists(meta_file):
return NullToolProvidedMetadata()
if provided_metadata_style is None:
provided_metadata_style = _guess_tool_provided_metadata_style(meta_file)
assert provided_metadata_style in ["legacy", "default"]
if provided_metadata_style == "legacy":
return LegacyToolProvidedMetadata(meta_file, job_wrapper=job_wrapper)
elif provided_metadata_style == "default":
return ToolProvidedMetadata(meta_file)
def _guess_tool_provided_metadata_style(path):
try:
with open(path) as f:
metadata = json.load(f)
metadata_type = metadata.get("type", None)
return "legacy" if metadata_type in ["dataset", "new_primary_dataset"] else "default"
except ValueError:
# Either empty or multiple JSON lines, either way we can safely treat
# it as legacy style.
return "legacy"
[docs]class BaseToolProvidedMetadata:
[docs] def get_new_datasets(self, output_name):
"""Find new datasets for dataset discovery for specified output.
Return a list of such datasets.
Called only in the context of discovering datasets when
discover_via="tool_provided_metadata" is defined in the tool.
"""
return []
[docs] def has_failed_outputs(self):
"""Determine if generation of any of the outputs failed.
If True, this method should also log information about at least the first such failed output.
"""
return False
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
"""For a discovered dataset, get the corresponding metadata entry.
The discovery may have been from explicit listing in this file (returned
from get_new_datasets) or via file regex, either way the basename of the
file is used to index the fetching of the metadata entry.
"""
return {}
[docs] def get_unnamed_outputs(self):
"""Return unnamed outputs dataset introduced for upload 2.0.
Needs more formal specification but see output_collect for how destinations,
types, elements, etc... are consumed.
"""
return []
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
"""Return primary dataset metadata for specified output."""
return {}
[docs] def rewrite(self):
"""Write metadata back to the file system.
If metadata has not changed via outputs specified as mutable, the
implementation class may opt to not re-write the file.
"""
return None
[docs] def get_new_datasets_for_metadata_collection(self):
"""Return all datasets tracked that are not explicit primary outputs."""
return []
[docs]class LegacyToolProvidedMetadata(BaseToolProvidedMetadata):
[docs] def __init__(self, meta_file, job_wrapper=None):
self.meta_file = meta_file
self.tool_provided_job_metadata = []
with open(meta_file) as f:
for line in f:
try:
line_as_dict = stringify_dictionary_keys(json.loads(line))
assert "type" in line_as_dict
except Exception as e:
message = f'Got JSON data from tool, but line is improperly formatted or no "type" key in: [{line}]'
raise ValueError(message) from e
# Set the dataset id if it's a dataset entry and isn't set.
# This isn't insecure. We loop the job's output datasets in
# the finish method, so if a tool writes out metadata for a
# dataset id that it doesn't own, it'll just be ignored.
dataset_id_not_specified = line_as_dict["type"] == "dataset" and "dataset_id" not in line_as_dict
if dataset_id_not_specified:
dataset_basename = line_as_dict["dataset"]
if job_wrapper:
try:
line_as_dict["dataset_id"] = job_wrapper.job_io.get_output_file_id(dataset_basename)
except KeyError:
log.warning(
f"({job_wrapper.job_id}) Tool provided job dataset-specific metadata without specifying a dataset"
)
continue
else:
match = re.match(r"(galaxy_)?dataset_(.*)\.dat", dataset_basename)
if match is None:
raise Exception(
f"processing tool_provided_metadata (e.g. galaxy.json) entry with invalid dataset name [{dataset_basename}]"
)
dataset_id = match.group(2)
if dataset_id.isdigit():
line_as_dict["dataset_id"] = dataset_id
else:
line_as_dict["dataset_uuid"] = dataset_id
self.tool_provided_job_metadata.append(line_as_dict)
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "dataset" and "dataset_id" in meta and int(meta["dataset_id"]) == dataset_id:
return meta
if meta["type"] == "dataset" and "dataset_uuid" in meta and meta["dataset_uuid"] == dataset_uuid:
return meta
return {}
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "new_primary_dataset" and meta["filename"] == basename:
return meta
[docs] def get_new_datasets(self, output_name):
log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.")
return []
[docs] def has_failed_outputs(self):
found_failed = False
for meta in self.tool_provided_job_metadata:
if meta.get("failed", False):
log.info(f"One or more tool outputs is marked as failed ({meta}).")
found_failed = True
return found_failed
[docs] def rewrite(self):
with open(self.meta_file, "w") as job_metadata_fh:
for meta in self.tool_provided_job_metadata:
job_metadata_fh.write(f"{json.dumps(meta)}\n")
[docs] def get_new_datasets_for_metadata_collection(self):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "new_primary_dataset":
yield meta
[docs]class ToolProvidedMetadata(BaseToolProvidedMetadata):
[docs] def __init__(self, meta_file):
self.meta_file = meta_file
with open(meta_file) as f:
self.tool_provided_job_metadata = json.load(f)
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
return self.tool_provided_job_metadata.get(output_name, {})
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
for meta in datasets:
if meta["filename"] == basename:
return meta
[docs] def get_new_datasets(self, output_name):
datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
if not datasets:
elements = self.tool_provided_job_metadata.get(output_name, {}).get("elements", [])
if elements:
datasets = self._elements_to_datasets(elements)
return datasets
def _elements_to_datasets(self, elements, level=0):
for element in elements:
extra_kwds = {"identifier_%d" % level: element["name"]}
if "elements" in element:
for inner_element in self._elements_to_datasets(element["elements"], level=level + 1):
dataset = extra_kwds.copy()
dataset.update(inner_element)
yield dataset
else:
dataset = extra_kwds
extra_kwds.update(element)
yield extra_kwds
[docs] def has_failed_outputs(self):
found_failed = False
for output_name, meta in self.tool_provided_job_metadata.items():
if output_name == "__unnamed_outputs":
continue
if meta.get("failed", False):
log.info(f"One or more tool outputs is marked as failed ({meta}).")
found_failed = True
return found_failed
[docs] def get_unnamed_outputs(self):
log.debug(f"unnamed outputs [{self.tool_provided_job_metadata}]")
return self.tool_provided_job_metadata.get("__unnamed_outputs", [])
[docs] def rewrite(self):
with open(self.meta_file, "w") as job_metadata_fh:
json.dump(self.tool_provided_job_metadata, job_metadata_fh)