Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.provided_metadata
import json
import logging
import os
import re
from galaxy.util import stringify_dictionary_keys
log = logging.getLogger(__name__)
[docs]def parse_tool_provided_metadata(meta_file, provided_metadata_style=None, job_wrapper=None):
"""Return a ToolProvidedMetadata object for specified file path.
If meta_file is absent, return a NullToolProvidedMetadata. If provided_metadata_style is None
attempt to guess tool provided metadata type.
"""
if not os.path.exists(meta_file):
return NullToolProvidedMetadata()
if provided_metadata_style is None:
provided_metadata_style = _guess_tool_provided_metadata_style(meta_file)
assert provided_metadata_style in ["legacy", "default"]
if provided_metadata_style == "legacy":
return LegacyToolProvidedMetadata(meta_file, job_wrapper=job_wrapper)
elif provided_metadata_style == "default":
return ToolProvidedMetadata(meta_file)
def _guess_tool_provided_metadata_style(path):
try:
with open(path) as f:
metadata = json.load(f)
metadata_type = metadata.get("type", None)
return "legacy" if metadata_type in ["dataset", "new_primary_dataset"] else "default"
except ValueError:
# Either empty or multiple JSON lines, either way we can safely treat
# it as legacy style.
return "legacy"
[docs]class BaseToolProvidedMetadata:
[docs] def get_new_datasets(self, output_name):
"""Find new datasets for dataset discovery for specified output.
Return a list of such datasets.
Called only in the context of discovering datasets when
discover_via="tool_provided_metadata" is defined in the tool.
"""
return []
[docs] def has_failed_outputs(self):
"""Determine if generation of any of the outputs failed.
If True, this method should also log information about at least the first such failed output.
"""
return False
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
"""For a discovered dataset, get the corresponding metadata entry.
The discovery may have been from explicit listing in this file (returned
from get_new_datasets) or via file regex, either way the basename of the
file is used to index the fetching of the metadata entry.
"""
return {}
[docs] def get_unnamed_outputs(self):
"""Return unnamed outputs dataset introduced for upload 2.0.
Needs more formal specification but see output_collect for how destinations,
types, elements, etc... are consumed.
"""
return []
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
"""Return primary dataset metadata for specified output."""
return {}
[docs] def rewrite(self):
"""Write metadata back to the file system.
If metadata has not changed via outputs specified as mutable, the
implementation class may opt to not re-write the file.
"""
return None
[docs] def get_new_datasets_for_metadata_collection(self):
"""Return all datasets tracked that are not explicit primary outputs."""
return []
[docs]class LegacyToolProvidedMetadata(BaseToolProvidedMetadata):
[docs] def __init__(self, meta_file, job_wrapper=None):
self.meta_file = meta_file
self.tool_provided_job_metadata = []
with open(meta_file) as f:
for line in f:
try:
line_as_dict = stringify_dictionary_keys(json.loads(line))
assert "type" in line_as_dict
except Exception as e:
message = f'Got JSON data from tool, but line is improperly formatted or no "type" key in: [{line}]'
raise ValueError(message) from e
# Set the dataset id if it's a dataset entry and isn't set.
# This isn't insecure. We loop the job's output datasets in
# the finish method, so if a tool writes out metadata for a
# dataset id that it doesn't own, it'll just be ignored.
dataset_id_not_specified = line_as_dict["type"] == "dataset" and "dataset_id" not in line_as_dict
if dataset_id_not_specified:
dataset_basename = line_as_dict["dataset"]
if job_wrapper:
try:
line_as_dict["dataset_id"] = job_wrapper.job_io.get_output_file_id(dataset_basename)
except KeyError:
log.warning(
f"({job_wrapper.job_id}) Tool provided job dataset-specific metadata without specifying a dataset"
)
continue
else:
match = re.match(r"(galaxy_)?dataset_(.*)\.dat", dataset_basename)
if match is None:
raise Exception(
f"processing tool_provided_metadata (e.g. galaxy.json) entry with invalid dataset name [{dataset_basename}]"
)
dataset_id = match.group(2)
if dataset_id.isdigit():
line_as_dict["dataset_id"] = dataset_id
else:
line_as_dict["dataset_uuid"] = dataset_id
self.tool_provided_job_metadata.append(line_as_dict)
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "dataset" and "dataset_id" in meta and int(meta["dataset_id"]) == dataset_id:
return meta
if meta["type"] == "dataset" and "dataset_uuid" in meta and meta["dataset_uuid"] == dataset_uuid:
return meta
return {}
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "new_primary_dataset" and meta["filename"] == basename:
return meta
[docs] def get_new_datasets(self, output_name):
log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.")
return []
[docs] def has_failed_outputs(self):
found_failed = False
for meta in self.tool_provided_job_metadata:
if meta.get("failed", False):
log.info(f"One or more tool outputs is marked as failed ({meta}).")
found_failed = True
return found_failed
[docs] def rewrite(self):
with open(self.meta_file, "w") as job_metadata_fh:
for meta in self.tool_provided_job_metadata:
job_metadata_fh.write(f"{json.dumps(meta)}\n")
[docs] def get_new_datasets_for_metadata_collection(self):
for meta in self.tool_provided_job_metadata:
if meta["type"] == "new_primary_dataset":
yield meta
[docs]class ToolProvidedMetadata(BaseToolProvidedMetadata):
[docs] def __init__(self, meta_file):
self.meta_file = meta_file
with open(meta_file) as f:
self.tool_provided_job_metadata = json.load(f)
[docs] def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
return self.tool_provided_job_metadata.get(output_name, {})
[docs] def get_new_dataset_meta_by_basename(self, output_name, basename):
datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
for meta in datasets:
if meta["filename"] == basename:
return meta
[docs] def get_new_datasets(self, output_name):
datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
if not datasets:
elements = self.tool_provided_job_metadata.get(output_name, {}).get("elements", [])
if elements:
datasets = self._elements_to_datasets(elements)
return datasets
def _elements_to_datasets(self, elements, level=0):
for element in elements:
extra_kwds = {"identifier_%d" % level: element["name"]}
if "elements" in element:
for inner_element in self._elements_to_datasets(element["elements"], level=level + 1):
dataset = extra_kwds.copy()
dataset.update(inner_element)
yield dataset
else:
dataset = extra_kwds
extra_kwds.update(element)
yield extra_kwds
[docs] def has_failed_outputs(self):
found_failed = False
for output_name, meta in self.tool_provided_job_metadata.items():
if output_name == "__unnamed_outputs":
continue
if meta.get("failed", False):
log.info(f"One or more tool outputs is marked as failed ({meta}).")
found_failed = True
return found_failed
[docs] def get_unnamed_outputs(self):
log.debug(f"unnamed outputs [{self.tool_provided_job_metadata}]")
return self.tool_provided_job_metadata.get("__unnamed_outputs", [])
[docs] def rewrite(self):
with open(self.meta_file, "w") as job_metadata_fh:
json.dump(self.tool_provided_job_metadata, job_metadata_fh)