Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.commit_util
import bz2
import gzip
import json
import logging
import os
import shutil
import tempfile
from collections import namedtuple
from sqlalchemy.sql.expression import null
import tool_shed.repository_types.util as rt_util
from galaxy.util import checkers
from galaxy.util.path import safe_relpath
from tool_shed.tools.data_table_manager import ShedToolDataTableManager
from tool_shed.util import basic_util, hg_util, shed_util_common as suc
log = logging.getLogger(__name__)
UNDESIRABLE_DIRS = ['.hg', '.svn', '.git', '.cvs']
UNDESIRABLE_FILES = ['.hg_archival.txt', 'hgrc', '.DS_Store', 'tool_test_output.html', 'tool_test_output.json']
[docs]def check_archive(repository, archive):
valid = []
invalid = []
errors = []
undesirable_files = []
undesirable_dirs = []
for member in archive.getmembers():
# Allow regular files and directories only
if not (member.isdir() or member.isfile() or member.islnk()):
errors.append("Uploaded archives can only include regular directories and files (no symbolic links, devices, etc).")
invalid.append(member)
continue
if not safe_relpath(member.name):
errors.append("Uploaded archives cannot contain files that would extract outside of the archive.")
invalid.append(member)
continue
if os.path.basename(member.name) in UNDESIRABLE_FILES:
undesirable_files.append(member)
continue
head = tail = member.name
found_undesirable_dir = False
while tail:
head, tail = os.path.split(head)
if tail in UNDESIRABLE_DIRS:
undesirable_dirs.append(member)
found_undesirable_dir = True
break
if found_undesirable_dir:
continue
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION and member.name != rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME:
errors.append('Repositories of type <b>Repository suite definition</b> can contain only a single file named <b>repository_dependencies.xml</b>.')
invalid.append(member)
continue
if repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION and member.name != rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME:
errors.append('Repositories of type <b>Tool dependency definition</b> can contain only a single file named <b>tool_dependencies.xml</b>.')
invalid.append(member)
continue
valid.append(member)
ArchiveCheckResults = namedtuple('ArchiveCheckResults', ['valid', 'invalid', 'undesirable_files', 'undesirable_dirs', 'errors'])
return ArchiveCheckResults(valid, invalid, undesirable_files, undesirable_dirs, errors)
[docs]def check_file_contents_for_email_alerts(app):
"""
See if any admin users have chosen to receive email alerts when a repository is updated.
If so, the file contents of the update must be checked for inappropriate content.
"""
sa_session = app.model.session
admin_users = app.config.get("admin_users", "").split(",")
for repository in sa_session.query(app.model.Repository) \
.filter(app.model.Repository.table.c.email_alerts != null()):
email_alerts = json.loads(repository.email_alerts)
for user_email in email_alerts:
if user_email in admin_users:
return True
return False
[docs]def check_file_content_for_html_and_images(file_path):
message = ''
if checkers.check_html(file_path):
message = 'The file "%s" contains HTML content.\n' % str(file_path)
elif checkers.check_image(file_path):
message = 'The file "%s" contains image content.\n' % str(file_path)
return message
[docs]def get_change_lines_in_file_for_tag(tag, change_dict):
"""
The received change_dict is the jsonified version of the changes to a file in a
changeset being pushed to the Tool Shed from the command line. This method cleans
and returns appropriate lines for inspection.
"""
cleaned_lines = []
data_list = change_dict.get('data', [])
for data_dict in data_list:
block = data_dict.get('block', '')
lines = block.split('\\n')
for line in lines:
index = line.find(tag)
if index > -1:
line = line[index:]
cleaned_lines.append(line)
return cleaned_lines
[docs]def get_upload_point(repository, **kwd):
upload_point = kwd.get('upload_point', None)
if upload_point is not None:
# The value of upload_point will be something like: database/community_files/000/repo_12/1.bed
if os.path.exists(upload_point):
if os.path.isfile(upload_point):
# Get the parent directory
upload_point, not_needed = os.path.split(upload_point)
# Now the value of uplaod_point will be something like: database/community_files/000/repo_12/
upload_point = upload_point.split('repo_%d' % repository.id)[1]
if upload_point:
upload_point = upload_point.lstrip('/')
upload_point = upload_point.rstrip('/')
# Now the value of uplaod_point will be something like: /
if upload_point == '/':
upload_point = None
else:
# Must have been an error selecting something that didn't exist, so default to repository root
upload_point = None
return upload_point
[docs]def handle_bz2(repository, uploaded_file_name):
with tempfile.NamedTemporaryFile(
mode='wb',
prefix=f'repo_{repository.id}_upload_bunzip2_',
dir=os.path.dirname(uploaded_file_name),
delete=False,
) as uncompressed, bz2.BZ2File(uploaded_file_name, 'rb') as bzipped_file:
while 1:
try:
chunk = bzipped_file.read(basic_util.CHUNK_SIZE)
except OSError:
os.remove(uncompressed.name)
log.exception(f'Problem uncompressing bz2 data "{uploaded_file_name}"')
return
if not chunk:
break
uncompressed.write(chunk)
shutil.move(uncompressed.name, uploaded_file_name)
[docs]def handle_directory_changes(app, host, username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar,
new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed):
repo_path = repository.repo_path(app)
content_alert_str = ''
files_to_remove = []
filenames_in_archive = [os.path.join(full_path, name) for name in filenames_in_archive]
repo = repository.hg_repo
if remove_repo_files_not_in_tar and not repository.is_new():
# We have a repository that is not new (it contains files), so discover those files that are in the
# repository, but not in the uploaded archive.
for root, dirs, files in os.walk(full_path):
if root.find('.hg') < 0 and root.find('hgrc') < 0:
for undesirable_dir in UNDESIRABLE_DIRS:
if undesirable_dir in dirs:
dirs.remove(undesirable_dir)
undesirable_dirs_removed += 1
for undesirable_file in UNDESIRABLE_FILES:
if undesirable_file in files:
files.remove(undesirable_file)
undesirable_files_removed += 1
for name in files:
full_name = os.path.join(root, name)
if full_name not in filenames_in_archive:
files_to_remove.append(full_name)
for repo_file in files_to_remove:
# Remove files in the repository (relative to the upload point) that are not in
# the uploaded archive.
try:
hg_util.remove_file(repo_path, repo_file, force=True)
except Exception as e:
log.debug("Error removing files using the mercurial API, so trying a different approach, the error was: %s" % str(e))
relative_selected_file = repo_file.split('repo_%d' % repository.id)[1].lstrip('/')
repo.dirstate.remove(relative_selected_file)
repo.dirstate.write()
absolute_selected_file = os.path.abspath(repo_file)
if os.path.isdir(absolute_selected_file):
try:
os.rmdir(absolute_selected_file)
except OSError:
# The directory is not empty.
pass
elif os.path.isfile(absolute_selected_file):
os.remove(absolute_selected_file)
dir = os.path.split(absolute_selected_file)[0]
try:
os.rmdir(dir)
except OSError:
# The directory is not empty.
pass
# See if any admin users have chosen to receive email alerts when a repository is updated.
# If so, check every uploaded file to ensure content is appropriate.
check_contents = check_file_contents_for_email_alerts(app)
for filename_in_archive in filenames_in_archive:
# Check file content to ensure it is appropriate.
if check_contents and os.path.isfile(filename_in_archive):
content_alert_str += check_file_content_for_html_and_images(filename_in_archive)
hg_util.add_changeset(repo_path, filename_in_archive)
if filename_in_archive.endswith('tool_data_table_conf.xml.sample'):
# Handle the special case where a tool_data_table_conf.xml.sample file is being uploaded
# by parsing the file and adding new entries to the in-memory app.tool_data_tables
# dictionary.
stdtm = ShedToolDataTableManager(app)
error, message = stdtm.handle_sample_tool_data_table_conf_file(filename_in_archive, persist=False)
if error:
return False, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed
hg_util.commit_changeset(repo_path,
full_path_to_changeset=full_path,
username=username,
message=commit_message)
admin_only = len(repository.downloadable_revisions) != 1
suc.handle_email_alerts(app,
host,
repository,
content_alert_str=content_alert_str,
new_repo_alert=new_repo_alert,
admin_only=admin_only)
return True, '', files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed
[docs]def handle_gzip(repository, uploaded_file_name):
with tempfile.NamedTemporaryFile(
mode='wb',
prefix=f'repo_{repository.id}_upload_gunzip_',
dir=os.path.dirname(uploaded_file_name),
delete=False
) as uncompressed, gzip.GzipFile(uploaded_file_name, 'rb') as gzipped_file:
while 1:
try:
chunk = gzipped_file.read(basic_util.CHUNK_SIZE)
except OSError:
os.remove(uncompressed.name)
log.exception(f'Problem uncompressing gz data "{uploaded_file_name}"')
return
if not chunk:
break
uncompressed.write(chunk)
shutil.move(uncompressed.name, uploaded_file_name)
[docs]def uncompress(repository, uploaded_file_name, uploaded_file_filename, isgzip=False, isbz2=False):
if isgzip:
handle_gzip(repository, uploaded_file_name)
return uploaded_file_filename.rstrip('.gz')
if isbz2:
handle_bz2(repository, uploaded_file_name)
return uploaded_file_filename.rstrip('.bz2')