import io
import re
import tarfile
import tempfile
import zipfile
from typing import (
Optional,
Union,
)
from galaxy.util import asbool
from ._util import _assert_presence_number
def _extract_from_tar(output_bytes, fn):
with io.BytesIO(output_bytes) as temp:
with tarfile.open(fileobj=temp, mode="r") as tar_temp:
ti = tar_temp.getmember(fn)
# zip treats directories like empty files.
# so make this consistent for tar
if ti.isdir():
return ""
tar_file = tar_temp.extractfile(fn)
assert tar_file is not None
with tar_file as member_fh:
return member_fh.read()
def _list_from_tar(output_bytes, path):
lst = []
with io.BytesIO(output_bytes) as temp:
with tarfile.open(fileobj=temp, mode="r") as tar_temp:
for fn in tar_temp.getnames():
if not re.match(path, fn):
continue
lst.append(fn)
return sorted(lst)
def _extract_from_zip(output_bytes, fn):
with io.BytesIO(output_bytes) as temp:
with zipfile.ZipFile(temp, mode="r") as zip_temp:
with zip_temp.open(fn) as member_fh:
return member_fh.read()
def _list_from_zip(output_bytes, path):
lst = []
with io.BytesIO(output_bytes) as temp:
with zipfile.ZipFile(temp, mode="r") as zip_temp:
for fn in zip_temp.namelist():
if not re.match(path, fn):
continue
lst.append(fn)
return sorted(lst)
[docs]def assert_has_archive_member(
output_bytes: bytes,
path: str,
verify_assertions_function,
children,
all: Union[bool, str] = False,
n: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""Recursively checks the specified children assertions against the text of
the first element matching the specified path found within the archive.
Currently supported formats: .zip, .tar, .tar.gz."""
all = asbool(all)
extract_foo = None
# from python 3.9 is_tarfile supports file like objects then we do not need
# the tempfile detour but can use io.BytesIO(output_bytes)
with tempfile.NamedTemporaryFile() as tmp:
tmp.write(output_bytes)
tmp.flush()
if zipfile.is_zipfile(tmp.name):
extract_foo = _extract_from_zip
list_foo = _list_from_zip
elif tarfile.is_tarfile(tmp.name):
extract_foo = _extract_from_tar
list_foo = _list_from_tar
assert extract_foo is not None, f"Expected path '{path}' to be an archive"
# get list of matching file names in archive and check against n, delta,
# min, max (slightly abusing the output and text as well as the function
# parameters)
fns = list_foo(output_bytes, path)
_assert_presence_number(
None,
path,
n,
delta,
min,
max,
negate,
lambda o, t: len(fns) > 0,
lambda o, t: len(fns),
"{expected} path '{text}' in archive",
"{expected} {n}+-{delta} matches for path '{text}' in archive",
"{expected} that the number of matches for path '{text}' in archive is in [{min}:{max}]",
)
# check sub-assertions on members matching path
for fn in fns:
contents = extract_foo(output_bytes, fn)
try:
verify_assertions_function(contents, children)
except AssertionError as e:
raise AssertionError(f"Archive member '{path}': {str(e)}")
if not all:
break