import bz2
import gzip
import json
import logging
import os
import shutil
import tempfile
from collections import namedtuple
from typing import (
List,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)
from sqlalchemy import select
from sqlalchemy.sql.expression import null
import tool_shed.repository_types.util as rt_util
from galaxy.util import checkers
from galaxy.util.path import safe_relpath
from tool_shed.tools.data_table_manager import ShedToolDataTableManager
from tool_shed.util import (
basic_util,
hg_util,
shed_util_common as suc,
)
from tool_shed.webapp.model import Repository
if TYPE_CHECKING:
from tool_shed.structured_app import ToolShedApp
log = logging.getLogger(__name__)
UNDESIRABLE_DIRS = [".hg", ".svn", ".git", ".cvs", ".idea"]
UNDESIRABLE_FILES = [".hg_archival.txt", "hgrc", ".DS_Store", "tool_test_output.html", "tool_test_output.json"]
[docs]def check_archive(repository: "Repository", archive):
valid = []
invalid = []
errors = []
undesirable_files = []
undesirable_dirs = []
for member in archive.getmembers():
# Allow regular files and directories only
if not (member.isdir() or member.isfile() or member.islnk()):
errors.append(
"Uploaded archives can only include regular directories and files (no symbolic links, devices, etc)."
)
invalid.append(member)
continue
if not safe_relpath(member.name):
errors.append("Uploaded archives cannot contain files that would extract outside of the archive.")
invalid.append(member)
continue
if os.path.basename(member.name) in UNDESIRABLE_FILES:
undesirable_files.append(member)
continue
head = tail = member.name
found_undesirable_dir = False
while tail:
head, tail = os.path.split(head)
if tail in UNDESIRABLE_DIRS:
undesirable_dirs.append(member)
found_undesirable_dir = True
break
if found_undesirable_dir:
continue
if (
repository.type == rt_util.REPOSITORY_SUITE_DEFINITION
and member.name != rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME
):
errors.append(
"Repositories of type <b>Repository suite definition</b> can contain only a single file named <b>repository_dependencies.xml</b>."
)
invalid.append(member)
continue
if (
repository.type == rt_util.TOOL_DEPENDENCY_DEFINITION
and member.name != rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME
):
errors.append(
"Repositories of type <b>Tool dependency definition</b> can contain only a single file named <b>tool_dependencies.xml</b>."
)
invalid.append(member)
continue
valid.append(member)
ArchiveCheckResults = namedtuple(
"ArchiveCheckResults", ["valid", "invalid", "undesirable_files", "undesirable_dirs", "errors"]
)
return ArchiveCheckResults(valid, invalid, undesirable_files, undesirable_dirs, errors)
[docs]def check_file_contents_for_email_alerts(app: "ToolShedApp"):
"""
See if any admin users have chosen to receive email alerts when a repository is updated.
If so, the file contents of the update must be checked for inappropriate content.
"""
sa_session = app.model.session
admin_users = app.config.get("admin_users", "").split(",")
for repository in get_repositories_with_alerts(sa_session, app.model.Repository):
email_alerts = json.loads(repository.email_alerts)
for user_email in email_alerts:
if user_email in admin_users:
return True
return False
[docs]def check_file_content_for_html_and_images(file_path):
message = ""
if checkers.check_html(file_path):
message = f'The file "{str(file_path)}" contains HTML content.\n'
elif checkers.check_image(file_path):
message = f'The file "{str(file_path)}" contains image content.\n'
return message
[docs]def get_change_lines_in_file_for_tag(tag, change_dict):
"""
The received change_dict is the jsonified version of the changes to a file in a
changeset being pushed to the Tool Shed from the command line. This method cleans
and returns appropriate lines for inspection.
"""
cleaned_lines = []
data_list = change_dict.get("data", [])
for data_dict in data_list:
block = data_dict.get("block", "")
lines = block.split("\\n")
for line in lines:
index = line.find(tag)
if index > -1:
line = line[index:]
cleaned_lines.append(line)
return cleaned_lines
[docs]def handle_bz2(repository: "Repository", uploaded_file_name):
with tempfile.NamedTemporaryFile(
mode="wb",
prefix=f"repo_{repository.id}_upload_bunzip2_",
dir=os.path.dirname(uploaded_file_name),
delete=False,
) as uncompressed, bz2.BZ2File(uploaded_file_name, "rb") as bzipped_file:
while 1:
try:
chunk = bzipped_file.read(basic_util.CHUNK_SIZE)
except OSError:
os.remove(uncompressed.name)
log.exception(f'Problem uncompressing bz2 data "{uploaded_file_name}"')
return
if not chunk:
break
uncompressed.write(chunk)
shutil.move(uncompressed.name, uploaded_file_name)
ChangeResponseT = Tuple[Union[bool, str], str, List[str], str, int, int]
[docs]def handle_directory_changes(
app: "ToolShedApp",
host: str,
username: str,
repository: "Repository",
full_path: str,
filenames_in_archive,
remove_repo_files_not_in_tar,
new_repo_alert,
commit_message: str,
undesirable_dirs_removed: int,
undesirable_files_removed: int,
repo_path: Optional[str] = None,
dry_run: bool = False,
) -> ChangeResponseT:
repo_path = repo_path or repository.repo_path(app)
content_alert_str = ""
files_to_remove = []
filenames_in_archive = [os.path.normpath(os.path.join(full_path, name)) for name in filenames_in_archive]
if remove_repo_files_not_in_tar and not repository.is_new():
# We have a repository that is not new (it contains files), so discover those files that are in the
# repository, but not in the uploaded archive.
for root, dirs, files in os.walk(full_path):
if root.find(".hg") < 0 and root.find("hgrc") < 0:
for undesirable_dir in UNDESIRABLE_DIRS:
if undesirable_dir in dirs:
dirs.remove(undesirable_dir)
undesirable_dirs_removed += 1
for undesirable_file in UNDESIRABLE_FILES:
if undesirable_file in files:
files.remove(undesirable_file)
undesirable_files_removed += 1
for name in files:
full_name = os.path.join(root, name)
if full_name not in filenames_in_archive:
files_to_remove.append(full_name)
for repo_file in files_to_remove:
# Remove files in the repository (relative to the upload point) that are not in
# the uploaded archive.
try:
hg_util.remove_path(repo_path, repo_file)
except Exception as e:
error_message = f"Error removing file {repo_file} in mercurial repo:\n{e}"
log.debug(error_message)
return "error", error_message, files_to_remove, content_alert_str, 0, 0
# See if any admin users have chosen to receive email alerts when a repository is updated.
# If so, check every uploaded file to ensure content is appropriate.
check_contents = check_file_contents_for_email_alerts(app)
for filename_in_archive in filenames_in_archive:
# Check file content to ensure it is appropriate.
if check_contents and os.path.isfile(filename_in_archive):
content_alert_str += check_file_content_for_html_and_images(filename_in_archive)
hg_util.add_changeset(repo_path, filename_in_archive)
if filename_in_archive.endswith("tool_data_table_conf.xml.sample"):
# Handle the special case where a tool_data_table_conf.xml.sample file is being uploaded
# by parsing the file and adding new entries to the in-memory app.tool_data_tables
# dictionary.
stdtm = ShedToolDataTableManager(app)
error, message = stdtm.handle_sample_tool_data_table_conf_file(filename_in_archive, persist=False)
if error:
return (
False,
message,
files_to_remove,
content_alert_str,
undesirable_dirs_removed,
undesirable_files_removed,
)
hg_util.commit_changeset(repo_path, full_path_to_changeset=full_path, username=username, message=commit_message)
admin_only = len(repository.downloadable_revisions) != 1
if not dry_run:
suc.handle_email_alerts(
app,
host,
repository,
content_alert_str=content_alert_str,
new_repo_alert=new_repo_alert,
admin_only=admin_only,
)
return True, "", files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed
[docs]def handle_gzip(repository, uploaded_file_name):
with tempfile.NamedTemporaryFile(
mode="wb", prefix=f"repo_{repository.id}_upload_gunzip_", dir=os.path.dirname(uploaded_file_name), delete=False
) as uncompressed, gzip.GzipFile(uploaded_file_name, "rb") as gzipped_file:
while 1:
try:
chunk = gzipped_file.read(basic_util.CHUNK_SIZE)
except OSError:
os.remove(uncompressed.name)
log.exception(f'Problem uncompressing gz data "{uploaded_file_name}"')
return
if not chunk:
break
uncompressed.write(chunk)
shutil.move(uncompressed.name, uploaded_file_name)
[docs]def uncompress(repository, uploaded_file_name, uploaded_file_filename, isgzip=False, isbz2=False):
if isgzip:
handle_gzip(repository, uploaded_file_name)
return uploaded_file_filename.rstrip(".gz")
if isbz2:
handle_bz2(repository, uploaded_file_name)
return uploaded_file_filename.rstrip(".bz2")
[docs]def get_repositories_with_alerts(session, repository_model):
stmt = select(repository_model).where(repository_model.email_alerts != null())
return session.scalars(stmt)