Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.controllers.upload

import logging
import os
import shutil
import tarfile
import tempfile

import requests

from galaxy import (
    util,
    web,
)
from galaxy.util import checkers
from galaxy.webapps.base.controller import BaseUIController
from tool_shed.dependencies import attribute_handlers
from tool_shed.galaxy_install import dependency_display
from tool_shed.metadata import repository_metadata_manager
from tool_shed.repository_types import util as rt_util
from tool_shed.tools.data_table_manager import ShedToolDataTableManager
from tool_shed.util import (
    basic_util,
    commit_util,
    hg_util,
    repository_content_util,
    repository_util,
)
from tool_shed.util import shed_util_common as suc
from tool_shed.util import xml_util
from tool_shed.util.web_util import escape
from tool_shed.webapp.framework.decorators import require_login

log = logging.getLogger(__name__)


[docs]class UploadController(BaseUIController):
[docs] @web.expose @require_login("upload", use_panels=True) def upload(self, trans, **kwd): message = escape(kwd.get("message", "")) status = kwd.get("status", "done") commit_message = escape(kwd.get("commit_message", "Uploaded")) repository_id = kwd.get("repository_id", "") repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id) repo_dir = repository.repo_path(trans.app) uncompress_file = util.string_as_bool(kwd.get("uncompress_file", "true")) remove_repo_files_not_in_tar = util.string_as_bool(kwd.get("remove_repo_files_not_in_tar", "true")) uploaded_file = None upload_point = commit_util.get_upload_point(repository, **kwd) tip = repository.tip() file_data = kwd.get("file_data", "") url = kwd.get("url", "") # Part of the upload process is sending email notification to those that have registered to # receive them. One scenario occurs when the first change set is produced for the repository. # See the suc.handle_email_alerts() method for the definition of the scenarios. new_repo_alert = repository.is_new() uploaded_directory = None if kwd.get("upload_button", False): if file_data == "" and url == "": message = "No files were entered on the upload form." status = "error" uploaded_file = None elif url and url.startswith("hg"): # Use mercurial clone to fetch repository, contents will then be copied over. uploaded_directory = tempfile.mkdtemp() repo_url = f"http{url[len('hg'):]}" cloned_ok, error_message = hg_util.clone_repository(repo_url, uploaded_directory) if not cloned_ok: message = f"Error uploading via mercurial clone: {error_message}" status = "error" basic_util.remove_dir(uploaded_directory) uploaded_directory = None elif url: valid_url = True try: stream = requests.get(url, stream=True, timeout=util.DEFAULT_SOCKET_TIMEOUT) except Exception as e: valid_url = False message = f"Error uploading file via http: {util.unicodify(e)}" status = "error" uploaded_file = None if valid_url: with tempfile.NamedTemporaryFile(mode="wb", delete=False) as uploaded_file: uploaded_file_name = uploaded_file.name for chunk in stream.iter_content(chunk_size=util.CHUNK_SIZE): if chunk: uploaded_file.write(chunk) uploaded_file.flush() uploaded_file_filename = url.split("/")[-1] isempty = os.path.getsize(os.path.abspath(uploaded_file_name)) == 0 elif file_data not in ("", None): uploaded_file = file_data.file uploaded_file_name = uploaded_file.name uploaded_file_filename = os.path.split(file_data.filename)[-1] isempty = os.path.getsize(os.path.abspath(uploaded_file_name)) == 0 if uploaded_file or uploaded_directory: rdah = attribute_handlers.RepositoryDependencyAttributeHandler(trans.app, unpopulate=False) tdah = attribute_handlers.ToolDependencyAttributeHandler(trans.app, unpopulate=False) stdtm = ShedToolDataTableManager(trans.app) ok = True isgzip = False isbz2 = False if uploaded_file: if uncompress_file: isgzip = checkers.is_gzip(uploaded_file_name) if not isgzip: isbz2 = checkers.is_bz2(uploaded_file_name) if isempty: tar = None istar = False else: # Determine what we have - a single file or an archive try: if (isgzip or isbz2) and uncompress_file: # Open for reading with transparent compression. tar = tarfile.open(uploaded_file_name, "r:*") else: tar = tarfile.open(uploaded_file_name) istar = True except tarfile.ReadError: tar = None istar = False else: # Uploaded directory istar = False if istar: ( ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed, ) = repository_content_util.upload_tar( trans, rdah, tdah, repository, tar, uploaded_file, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ) elif uploaded_directory: ( ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed, ) = self.upload_directory( trans, rdah, tdah, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ) else: if (isgzip or isbz2) and uncompress_file: uploaded_file_filename = commit_util.uncompress( repository, uploaded_file_name, uploaded_file_filename, isgzip=isgzip, isbz2=isbz2 ) if ( repository.type == rt_util.REPOSITORY_SUITE_DEFINITION and uploaded_file_filename != rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME ): ok = False message = "Repositories of type <b>Repository suite definition</b> can only contain a single file named " message += "<b>repository_dependencies.xml</b>." elif ( repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION and uploaded_file_filename != rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ): ok = False message = "Repositories of type <b>Tool dependency definition</b> can only contain a single file named " message += "<b>tool_dependencies.xml</b>." if ok: if upload_point is not None: full_path = os.path.abspath(os.path.join(repo_dir, upload_point, uploaded_file_filename)) else: full_path = os.path.abspath(os.path.join(repo_dir, uploaded_file_filename)) # Move some version of the uploaded file to the load_point within the repository hierarchy. if uploaded_file_filename in [rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME]: # Inspect the contents of the file to see if toolshed or changeset_revision attributes # are missing and if so, set them appropriately. altered, root_elem, error_message = rdah.handle_tag_attributes(uploaded_file_name) if error_message: ok = False message = error_message status = "error" elif altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, full_path) else: shutil.move(uploaded_file_name, full_path) elif uploaded_file_filename in [rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME]: # Inspect the contents of the file to see if changeset_revision values are # missing and if so, set them appropriately. altered, root_elem, error_message = tdah.handle_tag_attributes(uploaded_file_name) if error_message: ok = False message = error_message status = "error" if ok: if altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, full_path) else: shutil.move(uploaded_file_name, full_path) else: shutil.move(uploaded_file_name, full_path) if ok: # See if any admin users have chosen to receive email alerts when a repository is updated. # If so, check every uploaded file to ensure content is appropriate. check_contents = commit_util.check_file_contents_for_email_alerts(trans.app) if check_contents and os.path.isfile(full_path): content_alert_str = commit_util.check_file_content_for_html_and_images(full_path) else: content_alert_str = "" hg_util.add_changeset(repo_dir, full_path) hg_util.commit_changeset( repo_dir, full_path_to_changeset=full_path, username=trans.user.username, message=commit_message, ) if full_path.endswith("tool_data_table_conf.xml.sample"): # Handle the special case where a tool_data_table_conf.xml.sample file is being uploaded # by parsing the file and adding new entries to the in-memory trans.app.tool_data_tables # dictionary. error, error_message = stdtm.handle_sample_tool_data_table_conf_file( full_path, persist=False ) if error: message = f"{message}<br/>{error_message}" # See if the content of the change set was valid. admin_only = len(repository.downloadable_revisions) != 1 suc.handle_email_alerts( trans.app, trans.request.host, repository, content_alert_str=content_alert_str, new_repo_alert=new_repo_alert, admin_only=admin_only, ) if ok: # Update the repository files for browsing. hg_util.update_repository(repo_dir) # Get the new repository tip. if tip == repository.tip(): message = "No changes to repository. " status = "warning" else: if (isgzip or isbz2) and uncompress_file: uncompress_str = " uncompressed and " else: uncompress_str = " " if uploaded_directory: source_type = "repository" source = url else: source_type = "file" source = uploaded_file_filename message = f"The {source_type} <b>{escape(source)}</b> has been successfully{uncompress_str}uploaded to the repository. " if istar and (undesirable_dirs_removed or undesirable_files_removed): items_removed = undesirable_dirs_removed + undesirable_files_removed message += ( " %d undesirable items (.hg .svn .git directories, .DS_Store, hgrc files, etc) " % items_removed ) message += "were removed from the archive. " if istar and remove_repo_files_not_in_tar and files_to_remove: if upload_point is not None: message += ( " %d files were removed from the repository relative to the selected upload point '%s'. " % (len(files_to_remove), upload_point) ) else: message += " %d files were removed from the repository root. " % len(files_to_remove) rmm = repository_metadata_manager.RepositoryMetadataManager( app=trans.app, user=trans.user, repository=repository ) status, error_message = rmm.set_repository_metadata_due_to_new_tip( trans.request.host, content_alert_str=content_alert_str, **kwd ) if error_message: message = error_message kwd["message"] = message if repository.metadata_revisions: # A repository's metadata revisions are order descending by update_time, so the zeroth revision # will be the tip just after an upload. metadata_dict = repository.metadata_revisions[0].metadata else: metadata_dict = {} dd = dependency_display.DependencyDisplayer(trans.app) if str(repository.type) not in [ rt_util.REPOSITORY_SUITE_DEFINITION, rt_util.TOOL_DEPENDENCY_DEFINITION, ]: change_repository_type_message = rt_util.generate_message_for_repository_type_change( trans.app, repository ) if change_repository_type_message: message += change_repository_type_message status = "warning" else: # Provide a warning message if a tool_dependencies.xml file is provided, but tool dependencies # weren't loaded due to a requirement tag mismatch or some other problem. Tool dependency # definitions can define orphan tool dependencies (no relationship to any tools contained in the # repository), so warning messages are important because orphans are always valid. The repository # owner must be warned in case they did not intend to define an orphan dependency, but simply # provided incorrect information (tool shed, name owner, changeset_revision) for the definition. orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata_dict) if orphan_message: message += orphan_message status = "warning" # Handle messaging for invalid tool dependencies. invalid_tool_dependencies_message = dd.generate_message_for_invalid_tool_dependencies(metadata_dict) if invalid_tool_dependencies_message: message += invalid_tool_dependencies_message status = "error" # Handle messaging for invalid repository dependencies. invalid_repository_dependencies_message = dd.generate_message_for_invalid_repository_dependencies( metadata_dict, error_from_tuple=True ) if invalid_repository_dependencies_message: message += invalid_repository_dependencies_message status = "error" # Reset the tool_data_tables by loading the empty tool_data_table_conf.xml file. stdtm.reset_tool_data_tables() if uploaded_directory: basic_util.remove_dir(uploaded_directory) trans.response.send_redirect( web.url_for( controller="repository", action="browse_repository", id=repository_id, commit_message="Deleted selected files", message=message, status=status, ) ) else: if uploaded_directory: basic_util.remove_dir(uploaded_directory) status = "error" # Reset the tool_data_tables by loading the empty tool_data_table_conf.xml file. stdtm.reset_tool_data_tables() return trans.fill_template( "/webapps/tool_shed/repository/upload.mako", repository=repository, changeset_revision=tip, url=url, commit_message=commit_message, uncompress_file=uncompress_file, remove_repo_files_not_in_tar=remove_repo_files_not_in_tar, message=message, status=status, )
[docs] def upload_directory( self, trans, rdah, tdah, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert, ): repo_dir = repository.repo_path(trans.app) undesirable_dirs_removed = 0 undesirable_files_removed = 0 if upload_point is not None: full_path = os.path.abspath(os.path.join(repo_dir, upload_point)) else: full_path = os.path.abspath(repo_dir) filenames_in_archive = [] for root, _dirs, files in os.walk(uploaded_directory): for uploaded_file in files: relative_path = os.path.normpath(os.path.join(os.path.relpath(root, uploaded_directory), uploaded_file)) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: ok = os.path.basename(uploaded_file) == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME elif repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION: ok = os.path.basename(uploaded_file) == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME else: ok = os.path.basename(uploaded_file) not in commit_util.UNDESIRABLE_FILES if ok: for file_path_item in relative_path.split("/"): if file_path_item in commit_util.UNDESIRABLE_DIRS: undesirable_dirs_removed += 1 ok = False break else: undesirable_files_removed += 1 if ok: uploaded_file_name = os.path.abspath(os.path.join(root, uploaded_file)) if os.path.split(uploaded_file_name)[-1] == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision # attributes are missing and if so, set them appropriately. altered, root_elem, error_message = rdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", [], [] elif altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) elif os.path.split(uploaded_file_name)[-1] == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: # Inspect the contents of the file to see if toolshed or changeset_revision # attributes are missing and if so, set them appropriately. altered, root_elem, error_message = tdah.handle_tag_attributes(uploaded_file_name) if error_message: return False, error_message, [], "", [], [] if altered: tmp_filename = xml_util.create_and_write_tmp_file(root_elem) shutil.move(tmp_filename, uploaded_file_name) repo_path = os.path.join(full_path, relative_path) repo_basedir = os.path.normpath(os.path.join(repo_path, os.path.pardir)) if not os.path.exists(repo_basedir): os.makedirs(repo_basedir) if os.path.exists(repo_path): if os.path.isdir(repo_path): shutil.rmtree(repo_path) else: os.remove(repo_path) shutil.move(os.path.join(uploaded_directory, relative_path), repo_path) filenames_in_archive.append(relative_path) return commit_util.handle_directory_changes( trans.app, trans.request.host, trans.user.username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar, new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed, )