import ast
import html
import io
import uuid as _uuid
import zipfile
from typing import (
Dict,
List,
Optional,
)
import yaml
from galaxy.datatypes.binary import CompressedZipArchive
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.protocols import (
DatasetProtocol,
HasMetadata,
)
from galaxy.datatypes.sniff import (
build_sniff_from_prefix,
FilePrefix,
)
from galaxy.datatypes.tabular import Tabular
class _QIIME2ResultBase(CompressedZipArchive):
"""Base class for QIIME2Artifact and QIIME2Visualization"""
MetadataElement(name="semantic_type", readonly=True)
MetadataElement(name="semantic_type_simple", readonly=True, visible=False)
MetadataElement(name="uuid", readonly=True)
MetadataElement(name="format", optional=True, no_value="", readonly=True)
MetadataElement(name="version", readonly=True)
def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None:
metadata = _get_metadata_from_archive(dataset.get_file_name())
for key, value in metadata.items():
if value:
setattr(dataset.metadata, key, value)
dataset.metadata.semantic_type_simple = _strip_properties(dataset.metadata.semantic_type)
def set_peek(self, dataset: DatasetProtocol, **kwd) -> None:
if dataset.metadata.semantic_type == "Visualization":
dataset.blurb = "QIIME 2 Visualization"
else:
dataset.blurb = "QIIME 2 Artifact"
dataset.peek = "\n".join(map(": ".join, self._peek(dataset)))
def display_peek(self, dataset: DatasetProtocol) -> str:
if dataset.metadata.semantic_type is None:
# Proxy for metadata elements not (yet) set
return "Peek unavailable"
def make_row(pair):
return f"<tr><th>{pair[0]}</th><td>{html.escape(pair[1])}</td></tr>"
table = ['<table cellspacing="0" cellpadding="2">']
table += [make_row(pair) for pair in self._peek(dataset, simple=True)]
table += ["</table>"]
return "".join(table)
def _peek(self, dataset: HasMetadata, simple: bool = False) -> List:
peek = [("Type", dataset.metadata.semantic_type), ("UUID", dataset.metadata.uuid)]
if not simple:
if dataset.metadata.semantic_type != "Visualization":
peek.append(("Format", dataset.metadata.format))
peek.append(("Version", dataset.metadata.version))
return peek
def _sniff(self, filename: str) -> Optional[Dict]:
"""Helper method for use in inherited datatypes"""
try:
if not zipfile.is_zipfile(filename):
raise Exception()
return _get_metadata_from_archive(filename)
except Exception:
return None
[docs]class QIIME2Artifact(_QIIME2ResultBase):
file_ext = "qza"
[docs] def sniff(self, filename: str) -> bool:
metadata = self._sniff(filename)
return bool(metadata) and metadata["semantic_type"] != "Visualization" # type: ignore[index]
[docs]class QIIME2Visualization(_QIIME2ResultBase):
file_ext = "qzv"
[docs] def sniff(self, filename: str) -> bool:
metadata = self._sniff(filename)
return bool(metadata) and metadata["semantic_type"] == "Visualization" # type: ignore[index]
##############################################################################
# Helpers
##############################################################################
def _strip_properties(expression):
# This is necessary because QIIME 2's semantic types include a limited
# form of intersection type, which means that `A & B` is a subtype of `A`
# as well as a subtype of `B`. This means it is not generally speaking
# possible or practical to enumerate all valid subtypes and then do an
# exact match using <options options_filter_attribute="Some[Type]">
# So instead filter out 90% of the invalid inputs and let QIIME 2 raise an
# error on the finer details such as these "properties".
try:
expression_tree = ast.parse(expression)
reconstructer = _PredicateRemover()
reconstructer.visit(expression_tree)
return reconstructer.expression
# If we have any problems stripping properties just use the full expression
# this punts the error off to q2galaxy so if we error we do so there and
# not here
except Exception:
return expression
# Python 3.9 has a built in unparse. We can probably use this in the future
# when we are using 3.9
# https://docs.python.org/3.9/library/ast.html#ast.unparse
class _PredicateRemover(ast.NodeVisitor):
binops = {
ast.Add: " + ",
ast.Sub: " - ",
ast.Mult: " * ",
ast.Div: " / ",
ast.FloorDiv: " // ",
ast.Pow: " ** ",
ast.LShift: " << ",
ast.RShift: " >> ",
ast.BitOr: " | ",
ast.BitXor: " ^ ",
ast.BitAnd: " & ",
ast.MatMult: " @ ",
}
def __init__(self):
self.expression = ""
def visit_Name(self, node):
self.expression += node.id
def visit_Subscript(self, node):
self.visit(node.value)
self.expression += "["
self.visit(node.slice)
self.expression += "]"
def visit_Tuple(self, node):
trailing_comma = ""
for n in node.elts:
self.expression += trailing_comma
self.visit(n)
trailing_comma = ", "
def visit_BinOp(self, node):
self.visit(node.left)
if not isinstance(node.op, ast.Mod):
self.expression += self.binops[node.op.__class__]
self.visit(node.right)
def _get_metadata_from_archive(archive):
uuid = _get_uuid(archive)
archive_version, framework_version = _get_versions(archive, uuid)
metadata_contents = _get_metadata_contents(archive, uuid)
return {
"uuid": uuid,
"version": framework_version,
"semantic_type": metadata_contents["type"],
"format": metadata_contents["format"] or "",
}
def _get_metadata_contents(path, uuid):
with _open_file_in_archive(path, "metadata.yaml", uuid) as fh:
return yaml.safe_load(fh.read())
def _get_uuid(path):
roots = set()
for relpath in _iter_zip_root(path):
if not relpath.startswith("."):
roots.add(relpath)
if len(roots) == 0:
raise ValueError("Archive does not have a visible root directory.")
if len(roots) > 1:
raise ValueError(f"Archive has multiple root directories: {roots!r}")
uuid = roots.pop()
if not _is_uuid4(uuid):
raise ValueError(f"Archive root directory name {uuid!r} is not a valid version 4 UUID.")
return uuid
def _get_versions(path, uuid):
try:
with _open_file_in_archive(path, "VERSION", uuid) as fh:
header, version_line, framework_version_line, eof = fh.read().split("\n")
if header.strip() != "QIIME 2":
raise Exception() # GOTO except Exception
version = version_line.split(":")[1].strip()
framework_version = framework_version_line.split(":")[1].strip()
return version, framework_version
except Exception:
raise ValueError("Archive does not contain a correctly formatted VERSION file.")
def _open_file_in_archive(zip_path, path, uuid):
relpath = "/".join([uuid, path])
with zipfile.ZipFile(zip_path, mode="r") as zf:
return io.TextIOWrapper(zf.open(relpath))
def _iter_zip_root(path):
seen = set()
with zipfile.ZipFile(path, mode="r") as zf:
for name in zf.namelist():
parts = name.split("/") # zip is always / for seperators
if len(parts) > 0:
result = parts[0]
if result not in seen:
seen.add(result)
yield result
def _is_uuid4(uuid_str):
# Adapted from https://gist.github.com/ShawnMilo/7777304
try:
uuid = _uuid.UUID(hex=uuid_str, version=4)
except ValueError:
# The string is not a valid hex code for a UUID.
return False
# If uuid_str is a valid hex code, but an invalid uuid4, UUID.__init__
# will convert it to a valid uuid4.
return str(uuid) == uuid_str