Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.qiime2
import ast
import html
import io
import uuid as _uuid
import zipfile
from typing import (
Dict,
List,
Optional,
)
import yaml
from galaxy.datatypes.binary import CompressedZipArchive
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.protocols import (
DatasetProtocol,
HasMetadata,
)
from galaxy.datatypes.sniff import (
build_sniff_from_prefix,
FilePrefix,
)
from galaxy.datatypes.tabular import Tabular
class _QIIME2ResultBase(CompressedZipArchive):
"""Base class for QIIME2Artifact and QIIME2Visualization"""
MetadataElement(name="semantic_type", readonly=True)
MetadataElement(name="semantic_type_simple", readonly=True, visible=False)
MetadataElement(name="uuid", readonly=True)
MetadataElement(name="format", optional=True, no_value="", readonly=True)
MetadataElement(name="version", readonly=True)
def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None:
metadata = _get_metadata_from_archive(dataset.get_file_name())
for key, value in metadata.items():
if value:
setattr(dataset.metadata, key, value)
dataset.metadata.semantic_type_simple = _strip_properties(dataset.metadata.semantic_type)
def set_peek(self, dataset: DatasetProtocol, **kwd) -> None:
if dataset.metadata.semantic_type == "Visualization":
dataset.blurb = "QIIME 2 Visualization"
else:
dataset.blurb = "QIIME 2 Artifact"
dataset.peek = "\n".join(map(": ".join, self._peek(dataset)))
def display_peek(self, dataset: DatasetProtocol) -> str:
if dataset.metadata.semantic_type is None:
# Proxy for metadata elements not (yet) set
return "Peek unavailable"
def make_row(pair):
return f"<tr><th>{pair[0]}</th><td>{html.escape(pair[1])}</td></tr>"
table = ['<table cellspacing="0" cellpadding="2">']
table += [make_row(pair) for pair in self._peek(dataset, simple=True)]
table += ["</table>"]
return "".join(table)
def _peek(self, dataset: HasMetadata, simple: bool = False) -> List:
peek = [("Type", dataset.metadata.semantic_type), ("UUID", dataset.metadata.uuid)]
if not simple:
if dataset.metadata.semantic_type != "Visualization":
peek.append(("Format", dataset.metadata.format))
peek.append(("Version", dataset.metadata.version))
return peek
def _sniff(self, filename: str) -> Optional[Dict]:
"""Helper method for use in inherited datatypes"""
try:
if not zipfile.is_zipfile(filename):
raise Exception()
return _get_metadata_from_archive(filename)
except Exception:
return None
[docs]class QIIME2Artifact(_QIIME2ResultBase):
file_ext = "qza"
[docs] def sniff(self, filename: str) -> bool:
metadata = self._sniff(filename)
return bool(metadata) and metadata["semantic_type"] != "Visualization" # type: ignore[index]
[docs]class QIIME2Visualization(_QIIME2ResultBase):
file_ext = "qzv"
[docs] def sniff(self, filename: str) -> bool:
metadata = self._sniff(filename)
return bool(metadata) and metadata["semantic_type"] == "Visualization" # type: ignore[index]
[docs]@build_sniff_from_prefix
class QIIME2Metadata(Tabular):
"""
QIIME 2 supports overriding the type of a column to Categorical when
a specific directive `#q2:types` is present under the ID row.
Galaxy already understands column types quite well, however we sometimes
want to override its inferred type.
For Galaxy, we are going to require that if a directive occurs, it happens
on the second line (after the header). This is the most typical location
and interacts best with the current implementation of Tabular.
"""
file_ext = "qiime2.tabular"
_TYPES_DIRECTIVE = "#q2:types"
_search_lines = 2
[docs] def get_column_names(self, first_line: str) -> Optional[List[str]]:
return first_line.strip().split("\t")
[docs] def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None:
"""
Let Galaxy's Tabular format handle most of this. We will just jump
in at the last minute to (potentially) override some column types.
"""
super().set_meta(dataset, overwrite=overwrite, **kwd)
if dataset.has_data():
with open(dataset.get_file_name()) as dataset_fh:
line = None
for line, _ in zip(dataset_fh, range(self._search_lines)):
if line.startswith(self._TYPES_DIRECTIVE):
break
if line is None:
return
q2_types = line.strip().split("\t")
# The first column (q2:types) is always the IDs
q2_types[0] = "index"
if len(q2_types) < dataset.metadata.columns:
# this is probably malformed, but easy to fix
q2_types.extend([""] * (dataset.metadata.columns - len(q2_types)))
for idx, (q2_type, col_type) in enumerate(zip(q2_types, dataset.metadata.column_types)):
if q2_type == "":
if col_type in ("float", "int"):
q2_types[idx] = "numeric"
else:
q2_types[idx] = "categorical"
else:
if q2_type == "categorical" and col_type in ("float", "int", "list"):
dataset.metadata.column_types[idx] = "str"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool:
for _, line in zip(range(self._search_lines), file_prefix.line_iterator()):
if line.startswith(self._TYPES_DIRECTIVE):
return True
return False
##############################################################################
# Helpers
##############################################################################
def _strip_properties(expression):
# This is necessary because QIIME 2's semantic types include a limited
# form of intersection type, which means that `A & B` is a subtype of `A`
# as well as a subtype of `B`. This means it is not generally speaking
# possible or practical to enumerate all valid subtypes and then do an
# exact match using <options options_filter_attribute="Some[Type]">
# So instead filter out 90% of the invalid inputs and let QIIME 2 raise an
# error on the finer details such as these "properties".
try:
expression_tree = ast.parse(expression)
reconstructer = _PredicateRemover()
reconstructer.visit(expression_tree)
return reconstructer.expression
# If we have any problems stripping properties just use the full expression
# this punts the error off to q2galaxy so if we error we do so there and
# not here
except Exception:
return expression
# Python 3.9 has a built in unparse. We can probably use this in the future
# when we are using 3.9
# https://docs.python.org/3.9/library/ast.html#ast.unparse
class _PredicateRemover(ast.NodeVisitor):
binops = {
ast.Add: " + ",
ast.Sub: " - ",
ast.Mult: " * ",
ast.Div: " / ",
ast.FloorDiv: " // ",
ast.Pow: " ** ",
ast.LShift: " << ",
ast.RShift: " >> ",
ast.BitOr: " | ",
ast.BitXor: " ^ ",
ast.BitAnd: " & ",
ast.MatMult: " @ ",
}
def __init__(self):
self.expression = ""
def visit_Name(self, node):
self.expression += node.id
def visit_Subscript(self, node):
self.visit(node.value)
self.expression += "["
self.visit(node.slice)
self.expression += "]"
def visit_Tuple(self, node):
trailing_comma = ""
for n in node.elts:
self.expression += trailing_comma
self.visit(n)
trailing_comma = ", "
def visit_BinOp(self, node):
self.visit(node.left)
if not isinstance(node.op, ast.Mod):
self.expression += self.binops[node.op.__class__]
self.visit(node.right)
def _get_metadata_from_archive(archive):
uuid = _get_uuid(archive)
archive_version, framework_version = _get_versions(archive, uuid)
metadata_contents = _get_metadata_contents(archive, uuid)
return {
"uuid": uuid,
"version": framework_version,
"semantic_type": metadata_contents["type"],
"format": metadata_contents["format"] or "",
}
def _get_metadata_contents(path, uuid):
with _open_file_in_archive(path, "metadata.yaml", uuid) as fh:
return yaml.safe_load(fh.read())
def _get_uuid(path):
roots = set()
for relpath in _iter_zip_root(path):
if not relpath.startswith("."):
roots.add(relpath)
if len(roots) == 0:
raise ValueError("Archive does not have a visible root directory.")
if len(roots) > 1:
raise ValueError(f"Archive has multiple root directories: {roots!r}")
uuid = roots.pop()
if not _is_uuid4(uuid):
raise ValueError(f"Archive root directory name {uuid!r} is not a valid version 4 UUID.")
return uuid
def _get_versions(path, uuid):
try:
with _open_file_in_archive(path, "VERSION", uuid) as fh:
header, version_line, framework_version_line, eof = fh.read().split("\n")
if header.strip() != "QIIME 2":
raise Exception() # GOTO except Exception
version = version_line.split(":")[1].strip()
framework_version = framework_version_line.split(":")[1].strip()
return version, framework_version
except Exception:
raise ValueError("Archive does not contain a correctly formatted VERSION file.")
def _open_file_in_archive(zip_path, path, uuid):
relpath = "/".join([uuid, path])
with zipfile.ZipFile(zip_path, mode="r") as zf:
return io.TextIOWrapper(zf.open(relpath))
def _iter_zip_root(path):
seen = set()
with zipfile.ZipFile(path, mode="r") as zf:
for name in zf.namelist():
parts = name.split("/") # zip is always / for seperators
if len(parts) > 0:
result = parts[0]
if result not in seen:
seen.add(result)
yield result
def _is_uuid4(uuid_str):
# Adapted from https://gist.github.com/ShawnMilo/7777304
try:
uuid = _uuid.UUID(hex=uuid_str, version=4)
except ValueError:
# The string is not a valid hex code for a UUID.
return False
# If uuid_str is a valid hex code, but an invalid uuid4, UUID.__init__
# will convert it to a valid uuid4.
return str(uuid) == uuid_str