Source code for tool_shed.util.repository_content_util

import os
import shutil
import tarfile
import tempfile
from typing import (
    Optional,
    TYPE_CHECKING,
)

import tool_shed.repository_types.util as rt_util
from galaxy.tool_shed.util.hg_util import clone_repository
from galaxy.util import checkers
from tool_shed.dependencies.attribute_handlers import (
    RepositoryDependencyAttributeHandler,
    ToolDependencyAttributeHandler,
)
from tool_shed.util import xml_util
from tool_shed.util.commit_util import (
    ChangeResponseT,
    check_archive,
    handle_directory_changes,
)

if TYPE_CHECKING:
    from tool_shed.context import ProvidesRepositoriesContext
    from tool_shed.webapp.model import Repository


[docs]def tar_open(uploaded_file): isgzip = False isbz2 = False isgzip = checkers.is_gzip(uploaded_file) if not isgzip: isbz2 = checkers.is_bz2(uploaded_file) if isgzip or isbz2: # Open for reading with transparent compression. tar = tarfile.open(uploaded_file, "r:*") else: tar = tarfile.open(uploaded_file) return tar
[docs]def upload_tar( trans: "ProvidesRepositoriesContext", username: str, repository: "Repository", uploaded_file, commit_message: str, dry_run: bool = False, remove_repo_files_not_in_tar: bool = True, new_repo_alert: bool = False, tar=None, rdah: Optional[RepositoryDependencyAttributeHandler] = None, tdah: Optional[ToolDependencyAttributeHandler] = None, ) -> ChangeResponseT: host = trans.repositories_hostname app = trans.app if tar is None: tar = tar_open(uploaded_file) rdah = rdah or RepositoryDependencyAttributeHandler(trans, unpopulate=False) tdah = tdah or ToolDependencyAttributeHandler(trans, unpopulate=False) # Upload a tar archive of files. undesirable_dirs_removed = 0 undesirable_files_removed = 0 check_results = check_archive(repository, tar) if check_results.invalid: tar.close() try: uploaded_file.close() except AttributeError: pass message = "{} Invalid paths were: {}".format( " ".join(check_results.errors), ", ".join([i.name for i in check_results.invalid]) ) return False, message, [], "", undesirable_dirs_removed, undesirable_files_removed else: repo_dir = repository.repo_path(app) if dry_run: full_path = tempfile.mkdtemp() clone_repository(repo_dir, full_path) else: full_path = os.path.abspath(repo_dir) undesirable_files_removed = len(check_results.undesirable_files) undesirable_dirs_removed = len(check_results.undesirable_dirs) filenames_in_archive = [ti.name for ti in check_results.valid] # Extract the uploaded tar to the load_point within the repository hierarchy. tar.extractall(path=full_path, members=check_results.valid) tar.close() try: uploaded_file.close() except AttributeError: pass for filename in filenames_in_archive: uploaded_file_name = os.path.join(full_path, filename) if os.path.split(uploaded_file_name)[-1] == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision attributes # are missing and if so, set them appropriately. altered, root_elem, error_message = rdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", 0, 0 elif altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) elif os.path.split(uploaded_file_name)[-1] == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision # attributes are missing and if so, set them appropriately. altered, root_elem, error_message = tdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", 0, 0 if altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) return handle_directory_changes( app, host, username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar, new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed, repo_path=full_path, )