Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.zipstream

import os
from urllib.parse import quote

import zipstream

from .path import safe_walk


[docs]class ZipstreamWrapper:
[docs] def __init__(self, archive_name=None, upstream_mod_zip=False, upstream_gzip=False): self.upstream_mod_zip = upstream_mod_zip self.archive_name = archive_name if not self.upstream_mod_zip: self.archive = zipstream.ZipFile(allowZip64=True, compression=zipstream.ZIP_STORED if upstream_gzip else zipstream.ZIP_DEFLATED) self.files = [] self.size = 0
[docs] def response(self): if self.upstream_mod_zip: yield "\n".join(self.files).encode() else: yield from iter(self.archive)
[docs] def get_headers(self): headers = {} if self.archive_name: headers['Content-Disposition'] = f'attachment; filename="{self.archive_name}.zip"' if self.upstream_mod_zip: headers['X-Archive-Files'] = 'zip' else: headers['Content-Type'] = 'application/x-zip-compressed' return headers
[docs] def add_path(self, path, archive_name): size = int(os.stat(path).st_size) if self.upstream_mod_zip: # calculating crc32 would defeat the point of using mod-zip, but if we ever calculate hashsums we should consider this crc32 = "-" line = f"{crc32} {size} {quote(path)} {archive_name}" self.files.append(line) else: self.size += size self.archive.write(path, archive_name)
[docs] def write(self, path, archive_name=None): if os.path.isdir(path): pardir = os.path.join(path, os.pardir) for root, directories, files in safe_walk(path): for directory in directories: dir_path = os.path.join(root, directory) self.add_path(dir_path, os.path.relpath(dir_path, pardir)) for file in files: file_path = os.path.join(root, file) self.add_path(file_path, os.path.relpath(file_path, pardir)) else: self.add_path(path, archive_name or os.path.basename(path))