import ast
import html
import io
import uuid as _uuid
import zipfile
from typing import (
    Dict,
    List,
    Optional,
)
import yaml
from galaxy.datatypes.binary import CompressedZipArchive
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.protocols import (
    DatasetProtocol,
    HasMetadata,
)
from galaxy.datatypes.sniff import (
    build_sniff_from_prefix,
    FilePrefix,
)
from galaxy.datatypes.tabular import Tabular
class _QIIME2ResultBase(CompressedZipArchive):
    """Base class for QIIME2Artifact and QIIME2Visualization"""
    MetadataElement(name="semantic_type", readonly=True)
    MetadataElement(name="semantic_type_simple", readonly=True, visible=False)
    MetadataElement(name="uuid", readonly=True)
    MetadataElement(name="format", optional=True, no_value="", readonly=True)
    MetadataElement(name="version", readonly=True)
    def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None:
        metadata = _get_metadata_from_archive(dataset.get_file_name())
        for key, value in metadata.items():
            if value:
                setattr(dataset.metadata, key, value)
        dataset.metadata.semantic_type_simple = _strip_properties(dataset.metadata.semantic_type)
    def set_peek(self, dataset: DatasetProtocol, **kwd) -> None:
        if dataset.metadata.semantic_type == "Visualization":
            dataset.blurb = "QIIME 2 Visualization"
        else:
            dataset.blurb = "QIIME 2 Artifact"
        dataset.peek = "\n".join(map(": ".join, self._peek(dataset)))
    def display_peek(self, dataset: DatasetProtocol) -> str:
        if dataset.metadata.semantic_type is None:
            # Proxy for metadata elements not (yet) set
            return "Peek unavailable"
        def make_row(pair):
            return f"<tr><th>{pair[0]}</th><td>{html.escape(pair[1])}</td></tr>"
        table = ['<table cellspacing="0" cellpadding="2">']
        table += [make_row(pair) for pair in self._peek(dataset, simple=True)]
        table += ["</table>"]
        return "".join(table)
    def _peek(self, dataset: HasMetadata, simple: bool = False) -> List:
        peek = [("Type", dataset.metadata.semantic_type), ("UUID", dataset.metadata.uuid)]
        if not simple:
            if dataset.metadata.semantic_type != "Visualization":
                peek.append(("Format", dataset.metadata.format))
            peek.append(("Version", dataset.metadata.version))
        return peek
    def _sniff(self, filename: str) -> Optional[Dict]:
        """Helper method for use in inherited datatypes"""
        try:
            if not zipfile.is_zipfile(filename):
                raise Exception()
            return _get_metadata_from_archive(filename)
        except Exception:
            return None
[docs]
class QIIME2Artifact(_QIIME2ResultBase):
    file_ext = "qza"
[docs]
    def sniff(self, filename: str) -> bool:
        metadata = self._sniff(filename)
        return bool(metadata) and metadata["semantic_type"] != "Visualization"  # type: ignore[index] 
 
[docs]
class QIIME2Visualization(_QIIME2ResultBase):
    file_ext = "qzv"
[docs]
    def sniff(self, filename: str) -> bool:
        metadata = self._sniff(filename)
        return bool(metadata) and metadata["semantic_type"] == "Visualization"  # type: ignore[index] 
 
##############################################################################
# Helpers
##############################################################################
def _strip_properties(expression):
    # This is necessary because QIIME 2's semantic types include a limited
    # form of intersection type, which means that `A & B` is a subtype of `A`
    # as well as a subtype of `B`. This means it is not generally speaking
    # possible or practical to enumerate all valid subtypes and then do an
    # exact match using <options options_filter_attribute="Some[Type]">
    # So instead filter out 90% of the invalid inputs and let QIIME 2 raise an
    # error on the finer details such as these "properties".
    try:
        expression_tree = ast.parse(expression)
        reconstructer = _PredicateRemover()
        reconstructer.visit(expression_tree)
        return reconstructer.expression
    # If we have any problems stripping properties just use the full expression
    # this punts the error off to q2galaxy so if we error we do so there and
    # not here
    except Exception:
        return expression
# Python 3.9 has a built in unparse. We can probably use this in the future
# when we are using 3.9
# https://docs.python.org/3.9/library/ast.html#ast.unparse
class _PredicateRemover(ast.NodeVisitor):
    binops = {
        ast.Add: " + ",
        ast.Sub: " - ",
        ast.Mult: " * ",
        ast.Div: " / ",
        ast.FloorDiv: " // ",
        ast.Pow: " ** ",
        ast.LShift: " << ",
        ast.RShift: " >> ",
        ast.BitOr: " | ",
        ast.BitXor: " ^ ",
        ast.BitAnd: " & ",
        ast.MatMult: " @ ",
    }
    def __init__(self):
        self.expression = ""
    def visit_Name(self, node):
        self.expression += node.id
    def visit_Subscript(self, node):
        self.visit(node.value)
        self.expression += "["
        self.visit(node.slice)
        self.expression += "]"
    def visit_Tuple(self, node):
        trailing_comma = ""
        for n in node.elts:
            self.expression += trailing_comma
            self.visit(n)
            trailing_comma = ", "
    def visit_BinOp(self, node):
        self.visit(node.left)
        if not isinstance(node.op, ast.Mod):
            self.expression += self.binops[node.op.__class__]
            self.visit(node.right)
def _get_metadata_from_archive(archive):
    uuid = _get_uuid(archive)
    archive_version, framework_version = _get_versions(archive, uuid)
    metadata_contents = _get_metadata_contents(archive, uuid)
    return {
        "uuid": uuid,
        "version": framework_version,
        "semantic_type": metadata_contents["type"],
        "format": metadata_contents["format"] or "",
    }
def _get_metadata_contents(path, uuid):
    with _open_file_in_archive(path, "metadata.yaml", uuid) as fh:
        return yaml.safe_load(fh.read())
def _get_uuid(path):
    roots = set()
    for relpath in _iter_zip_root(path):
        if not relpath.startswith("."):
            roots.add(relpath)
    if len(roots) == 0:
        raise ValueError("Archive does not have a visible root directory.")
    if len(roots) > 1:
        raise ValueError(f"Archive has multiple root directories: {roots!r}")
    uuid = roots.pop()
    if not _is_uuid4(uuid):
        raise ValueError(f"Archive root directory name {uuid!r} is not a valid version 4 UUID.")
    return uuid
def _get_versions(path, uuid):
    try:
        with _open_file_in_archive(path, "VERSION", uuid) as fh:
            header, version_line, framework_version_line, eof = fh.read().split("\n")
        if header.strip() != "QIIME 2":
            raise Exception()  # GOTO except Exception
        version = version_line.split(":")[1].strip()
        framework_version = framework_version_line.split(":")[1].strip()
        return version, framework_version
    except Exception:
        raise ValueError("Archive does not contain a correctly formatted VERSION file.")
def _open_file_in_archive(zip_path, path, uuid):
    relpath = "/".join([uuid, path])
    with zipfile.ZipFile(zip_path, mode="r") as zf:
        return io.TextIOWrapper(zf.open(relpath))
def _iter_zip_root(path):
    seen = set()
    with zipfile.ZipFile(path, mode="r") as zf:
        for name in zf.namelist():
            parts = name.split("/")  # zip is always / for seperators
            if len(parts) > 0:
                result = parts[0]
                if result not in seen:
                    seen.add(result)
                    yield result
def _is_uuid4(uuid_str):
    # Adapted from https://gist.github.com/ShawnMilo/7777304
    try:
        uuid = _uuid.UUID(hex=uuid_str, version=4)
    except ValueError:
        # The string is not a valid hex code for a UUID.
        return False
    # If uuid_str is a valid hex code, but an invalid uuid4, UUID.__init__
    # will convert it to a valid uuid4.
    return str(uuid) == uuid_str