Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.controllers.upload

import logging
import os
import shutil
import tarfile
import tempfile

import requests

from galaxy import (
    util,
    web,
)
from galaxy.tool_shed.util import dependency_display
from galaxy.util import checkers
from galaxy.webapps.base.controller import BaseUIController
from tool_shed.dependencies import attribute_handlers
from tool_shed.metadata import repository_metadata_manager
from tool_shed.repository_types import util as rt_util
from tool_shed.tools.data_table_manager import ShedToolDataTableManager
from tool_shed.util import (
    basic_util,
    commit_util,
    hg_util,
    repository_content_util,
    repository_util,
    shed_util_common as suc,
    xml_util,
)
from tool_shed.util.web_util import escape
from tool_shed.webapp.framework.decorators import require_login

log = logging.getLogger(__name__)


[docs]class UploadController(BaseUIController):
[docs] @web.expose @require_login("upload", use_panels=True) def upload(self, trans, **kwd): message = escape(kwd.get("message", "")) status = kwd.get("status", "done") commit_message = escape(kwd.get("commit_message", "Uploaded")) repository_id = kwd.get("repository_id", "") repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id) repo_dir = repository.repo_path(trans.app) uncompress_file = util.string_as_bool(kwd.get("uncompress_file", "true")) remove_repo_files_not_in_tar = util.string_as_bool(kwd.get("remove_repo_files_not_in_tar", "true")) uploaded_file = None upload_point = commit_util.get_upload_point(repository, **kwd) tip = repository.tip() file_data = kwd.get("file_data", "") url = kwd.get("url", "") # Part of the upload process is sending email notification to those that have registered to # receive them. One scenario occurs when the first change set is produced for the repository. # See the suc.handle_email_alerts() method for the definition of the scenarios. new_repo_alert = repository.is_new() uploaded_directory = None if kwd.get("upload_button", False): if file_data == "" and url == "": message = "No files were entered on the upload form." status = "error" uploaded_file = None elif url and url.startswith("hg"): # Use mercurial clone to fetch repository, contents will then be copied over. uploaded_directory = tempfile.mkdtemp() repo_url = f"http{url[len('hg'):]}" cloned_ok, error_message = hg_util.clone_repository(repo_url, uploaded_directory) if not cloned_ok: message = f"Error uploading via mercurial clone: {error_message}" status = "error" basic_util.remove_dir(uploaded_directory) uploaded_directory = None elif url: valid_url = True try: stream = requests.get(url, stream=True, timeout=util.DEFAULT_SOCKET_TIMEOUT) except Exception as e: valid_url = False message = f"Error uploading file via http: {util.unicodify(e)}" status = "error" uploaded_file = None if valid_url: with tempfile.NamedTemporaryFile(mode="wb", delete=False) as uploaded_file: uploaded_file_name = uploaded_file.name for chunk in stream.iter_content(chunk_size=util.CHUNK_SIZE): if chunk: uploaded_file.write(chunk) uploaded_file.flush() uploaded_file_filename = url.split("/")[-1] isempty = os.path.getsize(os.path.abspath(uploaded_file_name)) == 0 elif file_data not in ("", None): uploaded_file = file_data.file uploaded_file_name = uploaded_file.name uploaded_file_filename = os.path.split(file_data.filename)[-1] isempty = os.path.getsize(os.path.abspath(uploaded_file_name)) == 0 if uploaded_file or uploaded_directory: rdah = attribute_handlers.RepositoryDependencyAttributeHandler(trans.app, unpopulate=False) tdah = attribute_handlers.ToolDependencyAttributeHandler(trans.app, unpopulate=False) stdtm = ShedToolDataTableManager(trans.app) ok = True isgzip = False isbz2 = False if uploaded_file: if uncompress_file: isgzip = checkers.is_gzip(uploaded_file_name) if not isgzip: isbz2 = checkers.is_bz2(uploaded_file_name) if isempty: tar = None istar = False else: # Determine what we have - a single file or an archive try: if (isgzip or isbz2) and uncompress_file: # Open for reading with transparent compression. tar = tarfile.open(uploaded_file_name, "r:*") else: tar = tarfile.open(uploaded_file_name) istar = True except tarfile.ReadError: tar = None istar = False else: # Uploaded directory istar = False if istar: ( ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed, ) = repository_content_util.upload_tar( trans, rdah, tdah, repository, tar, uploaded_file, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ) elif uploaded_directory: ( ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed, ) = self.upload_directory( trans, rdah, tdah, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ) else: if (isgzip or isbz2) and uncompress_file: uploaded_file_filename = commit_util.uncompress( repository, uploaded_file_name, uploaded_file_filename, isgzip=isgzip, isbz2=isbz2 ) if ( repository.type == rt_util.REPOSITORY_SUITE_DEFINITION and uploaded_file_filename != rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME ): ok = False message = "Repositories of type <b>Repository suite definition</b> can only contain a single file named " message += "<b>repository_dependencies.xml</b>." elif ( repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION and uploaded_file_filename != rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ): ok = False message = "Repositories of type <b>Tool dependency definition</b> can only contain a single file named " message += "<b>tool_dependencies.xml</b>." if ok: if upload_point is not None: full_path = os.path.abspath(os.path.join(repo_dir, upload_point, uploaded_file_filename)) else: full_path = os.path.abspath(os.path.join(repo_dir, uploaded_file_filename)) # Move some version of the uploaded file to the load_point within the repository hierarchy. if uploaded_file_filename in [rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME]: # Inspect the contents of the file to see if toolshed or changeset_revision attributes # are missing and if so, set them appropriately. altered, root_elem, error_message = rdah.handle_tag_attributes(uploaded_file_name) if error_message: ok = False message = error_message status = "error" elif altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, full_path) else: shutil.move(uploaded_file_name, full_path) elif uploaded_file_filename in [rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME]: # Inspect the contents of the file to see if changeset_revision values are # missing and if so, set them appropriately. altered, root_elem, error_message = tdah.handle_tag_attributes(uploaded_file_name) if error_message: ok = False message = error_message status = "error" if ok: if altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, full_path) else: shutil.move(uploaded_file_name, full_path) else: shutil.move(uploaded_file_name, full_path) if ok: # See if any admin users have chosen to receive email alerts when a repository is updated. # If so, check every uploaded file to ensure content is appropriate. check_contents = commit_util.check_file_contents_for_email_alerts(trans.app) if check_contents and os.path.isfile(full_path): content_alert_str = commit_util.check_file_content_for_html_and_images(full_path) else: content_alert_str = "" hg_util.add_changeset(repo_dir, full_path) hg_util.commit_changeset( repo_dir, full_path_to_changeset=full_path, username=trans.user.username, message=commit_message, ) if full_path.endswith("tool_data_table_conf.xml.sample"): # Handle the special case where a tool_data_table_conf.xml.sample file is being uploaded # by parsing the file and adding new entries to the in-memory trans.app.tool_data_tables # dictionary. error, error_message = stdtm.handle_sample_tool_data_table_conf_file( full_path, persist=False ) if error: message = f"{message}<br/>{error_message}" # See if the content of the change set was valid. admin_only = len(repository.downloadable_revisions) != 1 suc.handle_email_alerts( trans.app, trans.request.host, repository, content_alert_str=content_alert_str, new_repo_alert=new_repo_alert, admin_only=admin_only, ) if ok: # Update the repository files for browsing. hg_util.update_repository(repo_dir) # Get the new repository tip. if tip == repository.tip(): message = "No changes to repository. " status = "warning" else: if (isgzip or isbz2) and uncompress_file: uncompress_str = " uncompressed and " else: uncompress_str = " " if uploaded_directory: source_type = "repository" source = url else: source_type = "file" source = uploaded_file_filename message = f"The {source_type} <b>{escape(source)}</b> has been successfully{uncompress_str}uploaded to the repository. " if istar and (undesirable_dirs_removed or undesirable_files_removed): items_removed = undesirable_dirs_removed + undesirable_files_removed message += ( " %d undesirable items (.hg .svn .git directories, .DS_Store, hgrc files, etc) " % items_removed ) message += "were removed from the archive. " if istar and remove_repo_files_not_in_tar and files_to_remove: if upload_point is not None: message += ( " %d files were removed from the repository relative to the selected upload point '%s'. " % (len(files_to_remove), upload_point) ) else: message += " %d files were removed from the repository root. " % len(files_to_remove) rmm = repository_metadata_manager.RepositoryMetadataManager( app=trans.app, user=trans.user, repository=repository ) status, error_message = rmm.set_repository_metadata_due_to_new_tip( trans.request.host, content_alert_str=content_alert_str, **kwd ) if error_message: message = error_message kwd["message"] = message if repository.metadata_revisions: # A repository's metadata revisions are order descending by update_time, so the zeroth revision # will be the tip just after an upload. metadata_dict = repository.metadata_revisions[0].metadata else: metadata_dict = {} dd = dependency_display.DependencyDisplayer(trans.app) if str(repository.type) not in [ rt_util.REPOSITORY_SUITE_DEFINITION, rt_util.TOOL_DEPENDENCY_DEFINITION, ]: change_repository_type_message = rt_util.generate_message_for_repository_type_change( trans.app, repository ) if change_repository_type_message: message += change_repository_type_message status = "warning" else: # Provide a warning message if a tool_dependencies.xml file is provided, but tool dependencies # weren't loaded due to a requirement tag mismatch or some other problem. Tool dependency # definitions can define orphan tool dependencies (no relationship to any tools contained in the # repository), so warning messages are important because orphans are always valid. The repository # owner must be warned in case they did not intend to define an orphan dependency, but simply # provided incorrect information (tool shed, name owner, changeset_revision) for the definition. orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata_dict) if orphan_message: message += orphan_message status = "warning" # Handle messaging for invalid tool dependencies. invalid_tool_dependencies_message = dd.generate_message_for_invalid_tool_dependencies(metadata_dict) if invalid_tool_dependencies_message: message += invalid_tool_dependencies_message status = "error" # Handle messaging for invalid repository dependencies. invalid_repository_dependencies_message = dd.generate_message_for_invalid_repository_dependencies( metadata_dict, error_from_tuple=True ) if invalid_repository_dependencies_message: message += invalid_repository_dependencies_message status = "error" # Reset the tool_data_tables by loading the empty tool_data_table_conf.xml file. stdtm.reset_tool_data_tables() if uploaded_directory: basic_util.remove_dir(uploaded_directory) trans.response.send_redirect( web.url_for( controller="repository", action="browse_repository", id=repository_id, commit_message="Deleted selected files", message=message, status=status, ) ) else: if uploaded_directory: basic_util.remove_dir(uploaded_directory) status = "error" # Reset the tool_data_tables by loading the empty tool_data_table_conf.xml file. stdtm.reset_tool_data_tables() return trans.fill_template( "/webapps/tool_shed/repository/upload.mako", repository=repository, changeset_revision=tip, url=url, commit_message=commit_message, uncompress_file=uncompress_file, remove_repo_files_not_in_tar=remove_repo_files_not_in_tar, message=message, status=status, )
[docs] def upload_directory( self, trans, rdah, tdah, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ): repo_dir = repository.repo_path(trans.app) undesirable_dirs_removed = 0 undesirable_files_removed = 0 if upload_point is not None: full_path = os.path.abspath(os.path.join(repo_dir, upload_point)) else: full_path = os.path.abspath(repo_dir) filenames_in_archive = [] for root, _dirs, files in os.walk(uploaded_directory): for uploaded_file in files: relative_path = os.path.normpath(os.path.join(os.path.relpath(root, uploaded_directory), uploaded_file)) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: ok = os.path.basename(uploaded_file) == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME elif repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION: ok = os.path.basename(uploaded_file) == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME else: ok = os.path.basename(uploaded_file) not in commit_util.UNDESIRABLE_FILES if ok: for file_path_item in relative_path.split("/"): if file_path_item in commit_util.UNDESIRABLE_DIRS: undesirable_dirs_removed += 1 ok = False break else: undesirable_files_removed += 1 if ok: uploaded_file_name = os.path.abspath(os.path.join(root, uploaded_file)) if os.path.split(uploaded_file_name)[-1] == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision # attributes are missing and if so, set them appropriately. altered, root_elem, error_message = rdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", [], [] elif altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) elif os.path.split(uploaded_file_name)[-1] == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision # attributes are missing and if so, set them appropriately. altered, root_elem, error_message = tdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", [], [] if altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) repo_path = os.path.join(full_path, relative_path) repo_basedir = os.path.normpath(os.path.join(repo_path, os.path.pardir)) if not os.path.exists(repo_basedir): os.makedirs(repo_basedir) if os.path.exists(repo_path): if os.path.isdir(repo_path): shutil.rmtree(repo_path) else: os.remove(repo_path) shutil.move(os.path.join(uploaded_directory, relative_path), repo_path) filenames_in_archive.append(relative_path) return commit_util.handle_directory_changes( trans.app, trans.request.host, trans.user.username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar, new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed, )