Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.commit_util
import gzip
import json
import logging
import os
import shutil
import sys
import tempfile
from collections import namedtuple
from sqlalchemy.sql.expression import null
import tool_shed.repository_types.util as rt_util
from galaxy.util import checkers
from galaxy.util.path import safe_relpath
from tool_shed.tools.data_table_manager import ShedToolDataTableManager
from tool_shed.util import basic_util, hg_util, shed_util_common as suc
if sys.version_info < (3, 3):
import bz2file as bz2
else:
import bz2
log = logging.getLogger(__name__)
UNDESIRABLE_DIRS = ['.hg', '.svn', '.git', '.cvs']
UNDESIRABLE_FILES = ['.hg_archival.txt', 'hgrc', '.DS_Store', 'tool_test_output.html', 'tool_test_output.json']
[docs]def check_archive(repository, archive):
valid = []
invalid = []
errors = []
undesirable_files = []
undesirable_dirs = []
for member in archive.getmembers():
# Allow regular files and directories only
if not (member.isdir() or member.isfile() or member.islnk()):
errors.append("Uploaded archives can only include regular directories and files (no symbolic links, devices, etc).")
invalid.append(member)
continue
if not safe_relpath(member.name):
errors.append("Uploaded archives cannot contain files that would extract outside of the archive.")
invalid.append(member)
continue
if os.path.basename(member.name) in UNDESIRABLE_FILES:
undesirable_files.append(member)
continue
head = tail = member.name
try:
while tail:
head, tail = os.path.split(head)
if tail in UNDESIRABLE_DIRS:
undesirable_dirs.append(member)
assert False
except AssertionError:
continue
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION and member.name != rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME:
errors.append('Repositories of type <b>Repository suite definition</b> can contain only a single file named <b>repository_dependencies.xml</b>.')
invalid.append(member)
continue
if repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION and member.name != rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME:
errors.append('Repositories of type <b>Tool dependency definition</b> can contain only a single file named <b>tool_dependencies.xml</b>.')
invalid.append(member)
continue
valid.append(member)
ArchiveCheckResults = namedtuple('ArchiveCheckResults', ['valid', 'invalid', 'undesirable_files', 'undesirable_dirs', 'errors'])
return ArchiveCheckResults(valid, invalid, undesirable_files, undesirable_dirs, errors)
[docs]def check_file_contents_for_email_alerts(app):
"""
See if any admin users have chosen to receive email alerts when a repository is updated.
If so, the file contents of the update must be checked for inappropriate content.
"""
sa_session = app.model.context.current
admin_users = app.config.get("admin_users", "").split(",")
for repository in sa_session.query(app.model.Repository) \
.filter(app.model.Repository.table.c.email_alerts != null()):
email_alerts = json.loads(repository.email_alerts)
for user_email in email_alerts:
if user_email in admin_users:
return True
return False
[docs]def check_file_content_for_html_and_images(file_path):
message = ''
if checkers.check_html(file_path):
message = 'The file "%s" contains HTML content.\n' % str(file_path)
elif checkers.check_image(file_path):
message = 'The file "%s" contains image content.\n' % str(file_path)
return message
[docs]def get_change_lines_in_file_for_tag(tag, change_dict):
"""
The received change_dict is the jsonified version of the changes to a file in a
changeset being pushed to the Tool Shed from the command line. This method cleans
and returns appropriate lines for inspection.
"""
cleaned_lines = []
data_list = change_dict.get('data', [])
for data_dict in data_list:
block = data_dict.get('block', '')
lines = block.split('\\n')
for line in lines:
index = line.find(tag)
if index > -1:
line = line[index:]
cleaned_lines.append(line)
return cleaned_lines
[docs]def get_upload_point(repository, **kwd):
upload_point = kwd.get('upload_point', None)
if upload_point is not None:
# The value of upload_point will be something like: database/community_files/000/repo_12/1.bed
if os.path.exists(upload_point):
if os.path.isfile(upload_point):
# Get the parent directory
upload_point, not_needed = os.path.split(upload_point)
# Now the value of uplaod_point will be something like: database/community_files/000/repo_12/
upload_point = upload_point.split('repo_%d' % repository.id)[1]
if upload_point:
upload_point = upload_point.lstrip('/')
upload_point = upload_point.rstrip('/')
# Now the value of uplaod_point will be something like: /
if upload_point == '/':
upload_point = None
else:
# Must have been an error selecting something that didn't exist, so default to repository root
upload_point = None
return upload_point
[docs]def handle_bz2(repository, uploaded_file_name):
fd, uncompressed = tempfile.mkstemp(prefix='repo_%d_upload_bunzip2_' % repository.id,
dir=os.path.dirname(uploaded_file_name),
text=False)
bzipped_file = bz2.BZ2File(uploaded_file_name, 'rb')
while 1:
try:
chunk = bzipped_file.read(basic_util.CHUNK_SIZE)
except IOError:
os.close(fd)
os.remove(uncompressed)
log.exception('Problem uncompressing bz2 data "%s"', uploaded_file_name)
return
if not chunk:
break
os.write(fd, chunk)
os.close(fd)
bzipped_file.close()
shutil.move(uncompressed, uploaded_file_name)
[docs]def handle_directory_changes(app, host, username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar,
new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed):
repo_path = repository.repo_path(app)
content_alert_str = ''
files_to_remove = []
filenames_in_archive = [os.path.join(full_path, name) for name in filenames_in_archive]
repo = repository.hg_repo
if remove_repo_files_not_in_tar and not repository.is_new():
# We have a repository that is not new (it contains files), so discover those files that are in the
# repository, but not in the uploaded archive.
for root, dirs, files in os.walk(full_path):
if root.find('.hg') < 0 and root.find('hgrc') < 0:
for undesirable_dir in UNDESIRABLE_DIRS:
if undesirable_dir in dirs:
dirs.remove(undesirable_dir)
undesirable_dirs_removed += 1
for undesirable_file in UNDESIRABLE_FILES:
if undesirable_file in files:
files.remove(undesirable_file)
undesirable_files_removed += 1
for name in files:
full_name = os.path.join(root, name)
if full_name not in filenames_in_archive:
files_to_remove.append(full_name)
for repo_file in files_to_remove:
# Remove files in the repository (relative to the upload point) that are not in
# the uploaded archive.
try:
hg_util.remove_file(repo_path, repo_file, force=True)
except Exception as e:
log.debug("Error removing files using the mercurial API, so trying a different approach, the error was: %s" % str(e))
relative_selected_file = repo_file.split('repo_%d' % repository.id)[1].lstrip('/')
repo.dirstate.remove(relative_selected_file)
repo.dirstate.write()
absolute_selected_file = os.path.abspath(repo_file)
if os.path.isdir(absolute_selected_file):
try:
os.rmdir(absolute_selected_file)
except OSError:
# The directory is not empty.
pass
elif os.path.isfile(absolute_selected_file):
os.remove(absolute_selected_file)
dir = os.path.split(absolute_selected_file)[0]
try:
os.rmdir(dir)
except OSError:
# The directory is not empty.
pass
# See if any admin users have chosen to receive email alerts when a repository is updated.
# If so, check every uploaded file to ensure content is appropriate.
check_contents = check_file_contents_for_email_alerts(app)
for filename_in_archive in filenames_in_archive:
# Check file content to ensure it is appropriate.
if check_contents and os.path.isfile(filename_in_archive):
content_alert_str += check_file_content_for_html_and_images(filename_in_archive)
hg_util.add_changeset(repo_path, filename_in_archive)
if filename_in_archive.endswith('tool_data_table_conf.xml.sample'):
# Handle the special case where a tool_data_table_conf.xml.sample file is being uploaded
# by parsing the file and adding new entries to the in-memory app.tool_data_tables
# dictionary.
stdtm = ShedToolDataTableManager(app)
error, message = stdtm.handle_sample_tool_data_table_conf_file(filename_in_archive, persist=False)
if error:
return False, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed
hg_util.commit_changeset(repo_path,
full_path_to_changeset=full_path,
username=username,
message=commit_message)
admin_only = len(repository.downloadable_revisions) != 1
suc.handle_email_alerts(app,
host,
repository,
content_alert_str=content_alert_str,
new_repo_alert=new_repo_alert,
admin_only=admin_only)
return True, '', files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed
[docs]def handle_gzip(repository, uploaded_file_name):
fd, uncompressed = tempfile.mkstemp(prefix='repo_%d_upload_gunzip_' % repository.id,
dir=os.path.dirname(uploaded_file_name),
text=False)
gzipped_file = gzip.GzipFile(uploaded_file_name, 'rb')
while 1:
try:
chunk = gzipped_file.read(basic_util.CHUNK_SIZE)
except IOError:
os.close(fd)
os.remove(uncompressed)
log.exception('Problem uncompressing gz data "%s"', uploaded_file_name)
return
if not chunk:
break
os.write(fd, chunk)
os.close(fd)
gzipped_file.close()
shutil.move(uncompressed, uploaded_file_name)
[docs]def uncompress(repository, uploaded_file_name, uploaded_file_filename, isgzip=False, isbz2=False):
if isgzip:
handle_gzip(repository, uploaded_file_name)
return uploaded_file_filename.rstrip('.gz')
if isbz2:
handle_bz2(repository, uploaded_file_name)
return uploaded_file_filename.rstrip('.bz2')