Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.checkers
import bz2
import gzip
import lzma
import os
import re
import tarfile
import zipfile
from io import (
BytesIO,
StringIO,
)
from typing import (
Dict,
IO,
Tuple,
)
from typing_extensions import Protocol
from galaxy import util
from galaxy.util.image_util import image_type
HTML_CHECK_LINES = 100
CHUNK_SIZE = 2**15 # 32Kb
HTML_REGEXPS = (
re.compile(r"<A\s+[^>]*HREF[^>]+>", re.I),
re.compile(r"<IFRAME[^>]*>", re.I),
re.compile(r"<FRAMESET[^>]*>", re.I),
re.compile(r"<META[\W][^>]*>", re.I),
re.compile(r"<SCRIPT[^>]*>", re.I),
)
class CompressionChecker(Protocol):
def __call__(self, file_path: str, check_content: bool = True) -> Tuple[bool, bool]: ...
[docs]def check_html(name, file_path: bool = True) -> bool:
"""
Returns True if the file/string contains HTML code.
"""
# Handles files if file_path is True or text if file_path is False
temp: IO[str]
if file_path:
temp = open(name, encoding="utf-8")
else:
temp = StringIO(util.unicodify(name))
try:
for _ in range(HTML_CHECK_LINES):
line = temp.readline(CHUNK_SIZE)
if not line:
break
if any(regexp.search(line) for regexp in HTML_REGEXPS):
return True
except UnicodeDecodeError:
return False
finally:
temp.close()
return False
[docs]def check_binary(name, file_path: bool = True) -> bool:
# Handles files if file_path is True or text if file_path is False
temp: IO[bytes]
if file_path:
temp = open(name, "rb")
size = os.stat(name).st_size
else:
temp = BytesIO(name)
size = len(name)
read_start = int(size / 2)
read_length = 1024
try:
if util.is_binary(temp.read(read_length)):
return True
# Some binary files have text only within the first 1024
# Read 1024 from the middle of the file if this is not
# a gzip or zip compressed file (bzip are indexed),
# to avoid issues with long txt headers on binary files.
if file_path and not is_gzip(name) and not is_zip(name) and not is_bz2(name):
# file_path=False doesn't seem to be used in the codebase
temp.seek(read_start)
return util.is_binary(temp.read(read_length))
return False
finally:
temp.close()
[docs]def check_gzip(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
# This method returns a tuple of booleans representing ( is_gzipped, is_valid )
# Make sure we have a gzipped file
try:
with open(file_path, "rb") as temp:
magic_check = temp.read(2)
if magic_check != util.gzip_magic:
return (False, False)
except Exception:
return (False, False)
# We support some binary data types, so check if the compressed binary file is valid
# If the file is Bam, it should already have been detected as such, so we'll just check
# for sff format.
try:
with gzip.open(file_path, "rb") as fh:
header = fh.read(4)
if header == b".sff":
return (True, True)
except Exception:
return (False, False)
if not check_content:
return (True, True)
with gzip.open(file_path, mode="rb") as gzipped_file:
chunk = gzipped_file.read(CHUNK_SIZE)
# See if we have a compressed HTML file
if check_html(chunk, file_path=False):
return (True, False)
return (True, True)
def check_xz(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
try:
with open(file_path, "rb") as temp:
magic_check = temp.read(6)
if magic_check != util.xz_magic:
return (False, False)
except Exception:
return (False, False)
if not check_content:
return (True, True)
with lzma.LZMAFile(file_path, mode="rb") as xzipped_file:
chunk = xzipped_file.read(CHUNK_SIZE)
# See if we have a compressed HTML file
if check_html(chunk, file_path=False):
return (True, False)
return (True, True)
[docs]def check_bz2(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
try:
with open(file_path, "rb") as temp:
magic_check = temp.read(3)
if magic_check != util.bz2_magic:
return (False, False)
except Exception:
return (False, False)
if not check_content:
return (True, True)
with bz2.BZ2File(file_path, mode="rb") as bzipped_file:
chunk = bzipped_file.read(CHUNK_SIZE)
# See if we have a compressed HTML file
if check_html(chunk, file_path=False):
return (True, False)
return (True, True)
[docs]def check_zip(file_path: str, check_content: bool = True, files=1) -> Tuple[bool, bool]:
if not zipfile.is_zipfile(file_path):
return (False, False)
if not check_content:
return (True, True)
chunk = None
for filect, member in enumerate(iter_zip(file_path)):
handle, name = member
chunk = handle.read(CHUNK_SIZE)
if chunk and check_html(chunk, file_path=False):
return (True, False)
if filect >= files:
break
return (True, True)
[docs]def is_bz2(file_path: str) -> bool:
is_bz2, is_valid = check_bz2(file_path, check_content=False)
return is_bz2
[docs]def is_gzip(file_path: str) -> bool:
is_gzipped, is_valid = check_gzip(file_path, check_content=False)
return is_gzipped
[docs]def is_xz(file_path: str) -> bool:
is_xzipped, is_valid = check_xz(file_path, check_content=False)
return is_xzipped
[docs]def is_zip(file_path: str) -> bool:
is_zipped, is_valid = check_zip(file_path, check_content=False)
return is_zipped
def is_single_file_zip(file_path: str) -> bool:
for i, _ in enumerate(iter_zip(file_path)):
if i > 1:
return False
return True
def is_tar(file_path: str) -> bool:
return tarfile.is_tarfile(file_path)
def iter_zip(file_path: str):
with zipfile.ZipFile(file_path) as z:
for f in filter(lambda x: not x.endswith("/"), z.namelist()):
yield (z.open(f), f)
[docs]def check_image(file_path: str) -> bool:
"""Simple wrapper around image_type to yield a True/False verdict"""
return bool(image_type(file_path))
COMPRESSION_CHECK_FUNCTIONS: Dict[str, CompressionChecker] = {
"gzip": check_gzip,
"bz2": check_bz2,
"xz": check_xz,
"zip": check_zip,
}
__all__ = (
"check_binary",
"check_bz2",
"check_gzip",
"check_html",
"check_image",
"check_zip",
"COMPRESSION_CHECK_FUNCTIONS",
"is_gzip",
"is_bz2",
"is_xz",
"is_zip",
)