Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.verify.asserts.archive
import io
import re
import tarfile
import tempfile
import zipfile
from typing import (
Union,
Optional,
)
from galaxy.util import asbool
from ._util import _assert_presence_number
def _extract_from_tar(output_bytes, fn):
with io.BytesIO(output_bytes) as temp:
with tarfile.open(fileobj=temp, mode="r") as tar_temp:
ti = tar_temp.getmember(fn)
# zip treats directories like empty files.
# so make this consistent for tar
if ti.isdir():
return ""
with tar_temp.extractfile(fn) as member_fh:
return member_fh.read()
def _list_from_tar(output_bytes, path):
lst = list()
with io.BytesIO(output_bytes) as temp:
with tarfile.open(fileobj=temp, mode="r") as tar_temp:
for fn in tar_temp.getnames():
if not re.match(path, fn):
continue
lst.append(fn)
return sorted(lst)
def _extract_from_zip(output_bytes, fn):
with io.BytesIO(output_bytes) as temp:
with zipfile.ZipFile(temp, mode="r") as zip_temp:
with zip_temp.open(fn) as member_fh:
return member_fh.read()
def _list_from_zip(output_bytes, path):
lst = list()
with io.BytesIO(output_bytes) as temp:
with zipfile.ZipFile(temp, mode="r") as zip_temp:
for fn in zip_temp.namelist():
if not re.match(path, fn):
continue
lst.append(fn)
return sorted(lst)
[docs]def assert_has_archive_member(
output_bytes: bytes,
path: str,
verify_assertions_function,
children,
all: Union[bool, str] = False,
n: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""Recursively checks the specified children assertions against the text of
the first element matching the specified path found within the archive.
Currently supported formats: .zip, .tar, .tar.gz."""
all = asbool(all)
extract_foo = None
# from python 3.9 is_tarfile supports file like objects then we do not need
# the tempfile detour but can use io.BytesIO(output_bytes)
with tempfile.NamedTemporaryFile() as tmp:
tmp.write(output_bytes)
tmp.flush()
if zipfile.is_zipfile(tmp.name):
extract_foo = _extract_from_zip
list_foo = _list_from_zip
elif tarfile.is_tarfile(tmp.name):
extract_foo = _extract_from_tar
list_foo = _list_from_tar
assert extract_foo is not None, f"Expected path '{path}' to be an archive"
# get list of matching file names in archive and check against n, delta,
# min, max (slightly abusing the output and text as well as the function
# parameters)
fns = list_foo(output_bytes, path)
_assert_presence_number(
None,
path,
n,
delta,
min,
max,
negate,
lambda o, t: len(fns) > 0,
lambda o, t: len(fns),
"{expected} path '{text}' in archive",
"{expected} {n}+-{delta} matches for path '{text}' in archive",
"{expected} that the number of matches for path '{text}' in archive is in [{min}:{max}]",
)
# check sub-assertions on members matching path
for fn in fns:
contents = extract_foo(output_bytes, fn)
try:
verify_assertions_function(contents, children)
except AssertionError as e:
raise AssertionError(f"Archive member '{path}': {str(e)}")
if not all:
break