Source code for galaxy.util.checkers

import bz2
import gzip
import lzma
import os
import re
import tarfile
import zipfile
from io import (
    BytesIO,
    StringIO,
)
from typing import (
    Dict,
    IO,
    Tuple,
)

from typing_extensions import Protocol

from galaxy import util
from galaxy.util.image_util import image_type

HTML_CHECK_LINES = 100
CHUNK_SIZE = 2**15  # 32Kb
HTML_REGEXPS = (
    re.compile(r"<A\s+[^>]*HREF[^>]+>", re.I),
    re.compile(r"<IFRAME[^>]*>", re.I),
    re.compile(r"<FRAMESET[^>]*>", re.I),
    re.compile(r"<META[\W][^>]*>", re.I),
    re.compile(r"<SCRIPT[^>]*>", re.I),
)


class CompressionChecker(Protocol):
    def __call__(self, file_path: str, check_content: bool = True) -> Tuple[bool, bool]: ...


[docs]def check_html(name, file_path: bool = True) -> bool: """ Returns True if the file/string contains HTML code. """ # Handles files if file_path is True or text if file_path is False temp: IO[str] if file_path: temp = open(name, encoding="utf-8") else: temp = StringIO(util.unicodify(name)) try: for _ in range(HTML_CHECK_LINES): line = temp.readline(CHUNK_SIZE) if not line: break if any(regexp.search(line) for regexp in HTML_REGEXPS): return True except UnicodeDecodeError: return False finally: temp.close() return False
[docs]def check_binary(name, file_path: bool = True) -> bool: # Handles files if file_path is True or text if file_path is False temp: IO[bytes] if file_path: temp = open(name, "rb") size = os.stat(name).st_size else: temp = BytesIO(name) size = len(name) read_start = int(size / 2) read_length = 1024 try: if util.is_binary(temp.read(read_length)): return True # Some binary files have text only within the first 1024 # Read 1024 from the middle of the file if this is not # a gzip or zip compressed file (bzip are indexed), # to avoid issues with long txt headers on binary files. if file_path and not is_gzip(name) and not is_zip(name) and not is_bz2(name): # file_path=False doesn't seem to be used in the codebase temp.seek(read_start) return util.is_binary(temp.read(read_length)) return False finally: temp.close()
[docs]def check_gzip(file_path: str, check_content: bool = True) -> Tuple[bool, bool]: # This method returns a tuple of booleans representing ( is_gzipped, is_valid ) # Make sure we have a gzipped file try: with open(file_path, "rb") as temp: magic_check = temp.read(2) if magic_check != util.gzip_magic: return (False, False) except Exception: return (False, False) # We support some binary data types, so check if the compressed binary file is valid # If the file is Bam, it should already have been detected as such, so we'll just check # for sff format. try: with gzip.open(file_path, "rb") as fh: header = fh.read(4) if header == b".sff": return (True, True) except Exception: return (False, False) if not check_content: return (True, True) with gzip.open(file_path, mode="rb") as gzipped_file: chunk = gzipped_file.read(CHUNK_SIZE) # See if we have a compressed HTML file if check_html(chunk, file_path=False): return (True, False) return (True, True)
def check_xz(file_path: str, check_content: bool = True) -> Tuple[bool, bool]: try: with open(file_path, "rb") as temp: magic_check = temp.read(6) if magic_check != util.xz_magic: return (False, False) except Exception: return (False, False) if not check_content: return (True, True) with lzma.LZMAFile(file_path, mode="rb") as xzipped_file: chunk = xzipped_file.read(CHUNK_SIZE) # See if we have a compressed HTML file if check_html(chunk, file_path=False): return (True, False) return (True, True)
[docs]def check_bz2(file_path: str, check_content: bool = True) -> Tuple[bool, bool]: try: with open(file_path, "rb") as temp: magic_check = temp.read(3) if magic_check != util.bz2_magic: return (False, False) except Exception: return (False, False) if not check_content: return (True, True) with bz2.BZ2File(file_path, mode="rb") as bzipped_file: chunk = bzipped_file.read(CHUNK_SIZE) # See if we have a compressed HTML file if check_html(chunk, file_path=False): return (True, False) return (True, True)
[docs]def check_zip(file_path: str, check_content: bool = True, files=1) -> Tuple[bool, bool]: if not zipfile.is_zipfile(file_path): return (False, False) if not check_content: return (True, True) chunk = None for filect, member in enumerate(iter_zip(file_path)): handle, name = member chunk = handle.read(CHUNK_SIZE) if chunk and check_html(chunk, file_path=False): return (True, False) if filect >= files: break return (True, True)
[docs]def is_bz2(file_path: str) -> bool: is_bz2, is_valid = check_bz2(file_path, check_content=False) return is_bz2
[docs]def is_gzip(file_path: str) -> bool: is_gzipped, is_valid = check_gzip(file_path, check_content=False) return is_gzipped
[docs]def is_xz(file_path: str) -> bool: is_xzipped, is_valid = check_xz(file_path, check_content=False) return is_xzipped
[docs]def is_zip(file_path: str) -> bool: is_zipped, is_valid = check_zip(file_path, check_content=False) return is_zipped
def is_single_file_zip(file_path: str) -> bool: for i, _ in enumerate(iter_zip(file_path)): if i > 1: return False return True def is_tar(file_path: str) -> bool: return tarfile.is_tarfile(file_path) def iter_zip(file_path: str): with zipfile.ZipFile(file_path) as z: for f in filter(lambda x: not x.endswith("/"), z.namelist()): yield (z.open(f), f)
[docs]def check_image(file_path: str) -> bool: """Simple wrapper around image_type to yield a True/False verdict""" return bool(image_type(file_path))
COMPRESSION_CHECK_FUNCTIONS: Dict[str, CompressionChecker] = { "gzip": check_gzip, "bz2": check_bz2, "xz": check_xz, "zip": check_zip, } __all__ = ( "check_binary", "check_bz2", "check_gzip", "check_html", "check_image", "check_zip", "COMPRESSION_CHECK_FUNCTIONS", "is_gzip", "is_bz2", "is_xz", "is_zip", )