import abc
import contextlib
import logging
import os
import shutil
import string
import tarfile
import tempfile
import time
from collections.abc import Iterator
from json import loads
from pathlib import Path
from typing import (
Any,
Optional,
Union,
)
from urllib.parse import (
quote_plus,
urlencode,
urlparse,
)
import pytest
import requests
from mercurial import (
commands,
hg,
ui,
)
from playwright.sync_api import Page
from sqlalchemy import (
false,
select,
)
import galaxy.model.tool_shed_install as galaxy_model
from galaxy.schema.schema import CheckForUpdatesResponse
from galaxy.security import idencoding
from galaxy.tool_shed.galaxy_install.install_manager import InstallRepositoryManager
from galaxy.tool_shed.galaxy_install.metadata.installed_repository_metadata_manager import (
InstalledRepositoryMetadataManager,
)
from galaxy.tool_shed.unittest_utils import (
StandaloneInstallationTarget,
ToolShedTarget,
)
from galaxy.tool_shed.util.dependency_display import build_manage_repository_dict
from galaxy.tool_shed.util.repository_util import check_for_updates
from galaxy.util import (
DEFAULT_SOCKET_TIMEOUT,
smart_str,
)
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.resources import as_file
from galaxy_test.base.api_asserts import assert_status_code_is_ok
from galaxy_test.base.api_util import get_admin_api_key
from galaxy_test.base.populators import wait_on_assertion
from tool_shed.test.base.populators import TEST_DATA_REPO_FILES
from tool_shed.util import (
hg_util,
hgweb_config,
xml_util,
)
from tool_shed.webapp.model import Repository as DbRepository
from tool_shed_client.schema import (
Category,
Repository,
RepositoryMetadata,
)
from . import (
common,
test_db_util,
)
from .api import ShedApiTestCase
from .browser import ShedBrowser
from .playwrightbrowser import PlaywrightShedBrowser
from .twillbrowser import (
page_content,
visit_url,
)
# Set a 10 minute timeout for repository installation.
repository_installation_timeout = 600
log = logging.getLogger(__name__)
[docs]
@pytest.mark.usefixtures("shed_browser")
class ShedTwillTestCase(ShedApiTestCase):
"""Class of FunctionalTestCase geared toward HTML interactions using the Twill library."""
requires_galaxy: bool = False
_installation_client: Optional[
Union[StandaloneToolShedInstallationClient, GalaxyInteractorToolShedInstallationClient]
] = None
__browser: Optional[ShedBrowser] = None
[docs]
def setUp(self):
super().setUp()
# Security helper
self.security = idencoding.IdEncodingHelper(id_secret="changethisinproductiontoo")
self.history_id = None
self.hgweb_config_dir = os.environ.get("TEST_HG_WEB_CONFIG_DIR")
self.hgweb_config_manager = hgweb_config.hgweb_config_manager
self.hgweb_config_manager.hgweb_config_dir = self.hgweb_config_dir
self.tool_shed_test_tmp_dir = os.environ.get("TOOL_SHED_TEST_TMP_DIR", None)
self.file_dir = os.environ.get("TOOL_SHED_TEST_FILE_DIR", None)
self.shed_tool_conf = os.environ.get("GALAXY_TEST_SHED_TOOL_CONF")
self.test_db_util = test_db_util
if os.environ.get("TOOL_SHED_TEST_INSTALL_CLIENT") == "standalone":
# TODO: once nose is out of the way - try to get away without
# instantiating the unused Galaxy server here.
installation_client_class = StandaloneToolShedInstallationClient
full_stack_galaxy = False
else:
installation_client_class = GalaxyInteractorToolShedInstallationClient
full_stack_galaxy = True
self.full_stack_galaxy = full_stack_galaxy
if self.requires_galaxy and (self.__class__._installation_client is None):
self.__class__._installation_client = installation_client_class(self)
self.__class__._installation_client.setup()
self._installation_client = self.__class__._installation_client
[docs]
@pytest.fixture(autouse=True)
def inject_shed_browser(self, shed_browser: ShedBrowser):
self.__browser = shed_browser
@property
def _browser(self) -> ShedBrowser:
assert self.__browser
return self.__browser
def _escape_page_content_if_needed(self, content: str) -> str:
# if twill browser is being used - replace spaces with " "
if self._browser.is_twill:
content = content.replace(" ", " ")
return content
[docs]
def check_for_strings(self, strings_displayed=None, strings_not_displayed=None):
strings_displayed = strings_displayed or []
strings_not_displayed = strings_not_displayed or []
if strings_displayed:
for check_str in strings_displayed:
self.check_page_for_string(check_str)
if strings_not_displayed:
for check_str in strings_not_displayed:
self.check_string_not_in_page(check_str)
[docs]
def check_page_for_string(self, patt):
"""Looks for 'patt' in the current browser page"""
self._browser.check_page_for_string(patt)
[docs]
def check_string_not_in_page(self, patt):
"""Checks to make sure 'patt' is NOT in the page."""
self._browser.check_string_not_in_page(patt)
# Functions associated with user accounts
def _submit_register_form(self, email: str, password: str, username: str, redirect: Optional[str] = None):
self._browser.fill_form_value("registration", "email", email)
if redirect is not None:
self._browser.fill_form_value("registration", "redirect", redirect)
self._browser.fill_form_value("registration", "password", password)
self._browser.fill_form_value("registration", "confirm", password)
self._browser.fill_form_value("registration", "username", username)
self._browser.submit_form_with_name("registration", "create_user_button")
@property
def invalid_tools_labels(self) -> str:
return "Invalid Tools" if self.is_v2 else "Invalid tools"
[docs]
def create(
self,
cntrller: str = "user",
email: str = "test@bx.psu.edu",
password: str = "testuser",
username: str = "admin-user",
redirect: Optional[str] = None,
) -> tuple[bool, bool, bool]:
# HACK: don't use panels because late_javascripts() messes up the twill browser and it
# can't find form fields (and hence user can't be logged in).
params = dict(cntrller=cntrller, use_panels=False)
self.visit_url("/user/create", params)
self._submit_register_form(
email,
password,
username,
redirect,
)
previously_created = False
username_taken = False
invalid_username = False
if not self.is_v2:
try:
self.check_page_for_string("Created new user account")
except AssertionError:
try:
# May have created the account in a previous test run...
self.check_page_for_string(f"User with email '{email}' already exists.")
previously_created = True
except AssertionError:
try:
self.check_page_for_string("Public name is taken; please choose another")
username_taken = True
except AssertionError:
# Note that we're only checking if the usr name is >< 4 chars here...
try:
self.check_page_for_string("Public name must be at least 4 characters in length")
invalid_username = True
except AssertionError:
pass
return previously_created, username_taken, invalid_username
[docs]
def last_page(self):
"""
Return the last visited page (usually HTML, but can binary data as
well).
"""
return self._browser.page_content()
[docs]
def user_api_interactor(self, email="test@bx.psu.edu", password="testuser"):
return self._api_interactor_by_credentials(email, password)
[docs]
def user_populator(self, email="test@bx.psu.edu", password="testuser"):
return self._get_populator(self.user_api_interactor(email=email, password=password))
[docs]
def login(
self,
email: str = "test@bx.psu.edu",
password: str = "testuser",
username: str = "admin-user",
redirect: Optional[str] = None,
logout_first: bool = True,
explicit_logout: bool = False,
):
if self.is_v2:
# old version had a logout URL, this one needs to check
# page if logged in
self.visit_url("/")
# Clear cookies.
if logout_first:
self.logout(explicit=explicit_logout)
# test@bx.psu.edu is configured as an admin user
previously_created, username_taken, invalid_username = self.create(
email=email, password=password, username=username, redirect=redirect
)
# v2 doesn't log you in on account creation... so force a login here
if previously_created or self.is_v2:
# The account has previously been created, so just login.
# HACK: don't use panels because late_javascripts() messes up the twill browser and it
# can't find form fields (and hence user can't be logged in).
params = {"use_panels": False}
self.visit_url("/user/login", params=params)
self.submit_form(button="login_button", login=email, redirect=redirect, password=password)
@property
def is_v2(self) -> bool:
return self.api_interactor.api_version == "v2"
@property
def _playwright_browser(self) -> PlaywrightShedBrowser:
# make sure self.is_v2
browser = self._browser
assert isinstance(browser, PlaywrightShedBrowser)
return browser
@property
def _page(self) -> Page:
return self._playwright_browser._page
[docs]
def logout(self, explicit: bool = False):
"""logout of the current tool shed session.
By default this is a logout if logged in action,
however if explicit is True - ensure there is a session
and be explicit in logging out to provide extract test
structure.
"""
if self.is_v2:
if explicit:
self._playwright_browser.explicit_logout()
else:
self._playwright_browser.logout_if_logged_in()
else:
self.visit_url("/user/logout")
self.check_page_for_string("You have been logged out")
[docs]
def join_url_and_params(self, url: str, params, query=None) -> str:
if params is None:
params = {}
if query is None:
query = urlparse(url).query
if query:
for query_parameter in query.split("&"):
key, value = query_parameter.split("=")
params[key] = value
if params:
url += f"?{urlencode(params)}"
return url
[docs]
def visit_url(self, url: str, params=None, allowed_codes: Optional[list[int]] = None) -> str:
parsed_url = urlparse(url)
if len(parsed_url.netloc) == 0:
url = f"http://{self.host}:{self.port}{parsed_url.path}"
else:
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
url = self.join_url_and_params(url, params, query=parsed_url.query)
if allowed_codes is None:
allowed_codes = [200]
return self._browser.visit_url(url, allowed_codes=allowed_codes)
[docs]
def write_temp_file(self, content, suffix=".html"):
with tempfile.NamedTemporaryFile(suffix=suffix, prefix="twilltestcase-", delete=False) as fh:
fh.write(smart_str(content))
return fh.name
[docs]
def assign_admin_role(self, repository: Repository, user):
# As elsewhere, twill limits the possibility of submitting the form, this time due to not executing the javascript
# attached to the role selection form. Visit the action url directly with the necessary parameters.
params = {
"id": repository.id,
"in_users": user.id,
"manage_role_associations_button": "Save",
}
self.visit_url("/repository/manage_repository_admins", params=params)
self.check_for_strings(strings_displayed=["Role", "has been associated"])
[docs]
def browse_category(self, category: Category, strings_displayed=None, strings_not_displayed=None):
if self.is_v2:
self.visit_url(f"/repositories_by_category/{category.id}")
else:
params = {
"sort": "name",
"operation": "valid_repositories_by_category",
"id": category.id,
}
self.visit_url("/repository/browse_valid_categories", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def browse_repository(self, repository: Repository, strings_displayed=None, strings_not_displayed=None):
params = {"id": repository.id}
self.visit_url("/repository/browse_repository", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def browse_repository_dependencies(self, strings_displayed=None, strings_not_displayed=None):
url = "/repository/browse_repository_dependencies"
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def check_galaxy_repository_db_status(self, repository_name, owner, expected_status):
installed_repository = self._get_installed_repository_by_name_owner(repository_name, owner)
self._refresh_tool_shed_repository(installed_repository)
assert (
installed_repository.status == expected_status
), f"Status in database is {installed_repository.status}, expected {expected_status}"
[docs]
def check_repository_changelog(self, repository: Repository, strings_displayed=None, strings_not_displayed=None):
params = {"id": repository.id}
self.visit_url("/repository/view_changelog", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def check_repository_dependency(
self,
repository: Repository,
depends_on_repository: Repository,
depends_on_changeset_revision=None,
changeset_revision=None,
):
if not self.is_v2:
# v2 doesn't display repository repository dependencies, they are deprecated
strings_displayed = [depends_on_repository.name, depends_on_repository.owner]
if depends_on_changeset_revision:
strings_displayed.append(depends_on_changeset_revision)
self.display_manage_repository_page(
repository, changeset_revision=changeset_revision, strings_displayed=strings_displayed
)
[docs]
def check_string_count_in_page(self, pattern, min_count: int, max_count: Optional[int] = None):
"""Checks the number of 'pattern' occurrences in the current browser page"""
page = self.last_page()
pattern_count = page.count(pattern)
if max_count is None:
max_count = min_count
# The number of occurrences of pattern in the page should be between min_count
# and max_count, so show error if pattern_count is less than min_count or greater
# than max_count.
if pattern_count < min_count or pattern_count > max_count:
fname = self.write_temp_file(page)
raise AssertionError(
f"{pattern_count} occurrences of '{pattern}' found (min. {min_count}, max. {max_count}).\npage content written to '{fname}' "
)
[docs]
def clone_repository(self, repository: Repository, destination_path: str) -> None:
url = f"{self.url}/repos/{repository.owner}/{repository.name}"
success, message = hg_util.clone_repository(url, destination_path, self.get_repository_tip(repository))
assert success is True, message
[docs]
def commit_and_push(self, repository, hgrepo, options, username, password):
url = f"http://{username}:{password}@{self.host}:{self.port}/repos/{repository.user.username}/{repository.name}"
commands.commit(ui.ui(), hgrepo, **options)
# Try pushing multiple times as it transiently fails on Jenkins.
# TODO: Figure out why that happens
for _ in range(5):
try:
commands.push(ui.ui(), hgrepo, dest=url)
except Exception as e:
if str(e).find("Pushing to Tool Shed is disabled") != -1:
return False
else:
return True
raise
[docs]
def create_category(self, **kwd) -> Category:
category_name = kwd["name"]
try:
category = self.populator.get_category_with_name(category_name)
except ValueError:
# not recreating this functionality in the UI I don't think?
category = self.populator.new_category(category_name)
return category
return category
[docs]
def create_repository_dependency(
self,
repository: Repository,
repository_tuples=None,
filepath=None,
prior_installation_required=False,
complex=False,
package=None,
version=None,
strings_displayed=None,
strings_not_displayed=None,
):
repository_tuples = repository_tuples or []
repository_names = []
if complex:
filename = "tool_dependencies.xml"
target = self.generate_complex_dependency_xml(
filename=filename,
filepath=filepath,
repository_tuples=repository_tuples,
package=package,
version=version,
)
else:
for _, name, _, _ in repository_tuples:
repository_names.append(name)
dependency_description = f"{repository.name} depends on {', '.join(repository_names)}."
filename = "repository_dependencies.xml"
target = self.generate_simple_dependency_xml(
repository_tuples=repository_tuples,
filename=filename,
filepath=filepath,
dependency_description=dependency_description,
prior_installation_required=prior_installation_required,
)
self.add_file_to_repository(repository, target, filename, strings_displayed=strings_displayed)
[docs]
def deactivate_repository(self, installed_repository: galaxy_model.ToolShedRepository) -> None:
assert self._installation_client
self._installation_client.deactivate_repository(installed_repository)
[docs]
@contextlib.contextmanager
def cloned_repo(self, repository: Repository) -> Iterator[str]:
temp_directory = tempfile.mkdtemp(prefix="toolshedrepowithoutfiles")
try:
self.clone_repository(repository, temp_directory)
shutil.rmtree(os.path.join(temp_directory, ".hg"))
contents = os.listdir(temp_directory)
if len(contents) == 1 and contents[0] == "repo":
yield os.path.join(temp_directory, "repo")
else:
yield temp_directory
finally:
shutil.rmtree(temp_directory)
[docs]
def setup_freebayes_0010_repo(self, repository: Repository):
strings_displayed = [
"Metadata may have been defined",
"This file requires an entry",
"tool_data_table_conf",
]
self.add_file_to_repository(repository, "freebayes/freebayes.xml", strings_displayed=strings_displayed)
strings_displayed = ["Upload a file named <b>sam_fa_indices.loc.sample"]
self.add_file_to_repository(
repository, "freebayes/tool_data_table_conf.xml.sample", strings_displayed=strings_displayed
)
self.add_file_to_repository(repository, "freebayes/sam_fa_indices.loc.sample")
target = os.path.join("freebayes", "malformed_tool_dependencies", "tool_dependencies.xml")
self.add_file_to_repository(
repository, target, strings_displayed=["Exception attempting to parse", "invalid element name"]
)
target = os.path.join("freebayes", "invalid_tool_dependencies", "tool_dependencies.xml")
strings_displayed = [
"The settings for <b>name</b>, <b>version</b> and <b>type</b> from a contained tool configuration"
]
# , strings_displayed=strings_displayed
self.add_file_to_repository(repository, target)
target = os.path.join("freebayes", "tool_dependencies.xml")
self.add_file_to_repository(repository, target)
[docs]
def add_file_to_repository(
self,
repository: Repository,
source: str,
target: Optional[str] = None,
strings_displayed=None,
commit_message: Optional[str] = None,
):
with self.cloned_repo(repository) as temp_directory:
if target is None:
target = os.path.basename(source)
full_target = os.path.join(temp_directory, target)
full_source = TEST_DATA_REPO_FILES.joinpath(source)
with as_file(full_source) as full_source_path:
shutil.copyfile(full_source_path, full_target)
commit_message = commit_message or "Uploaded revision with added file."
self._upload_dir_to_repository(
repository, temp_directory, commit_message=commit_message, strings_displayed=strings_displayed
)
[docs]
def add_tar_to_repository(self, repository: Repository, source: str, strings_displayed=None):
with self.cloned_repo(repository) as temp_directory:
full_source = TEST_DATA_REPO_FILES.joinpath(source)
with full_source.open("rb") as full_source_fileobj:
with CompressedFile.open_tar(full_source_fileobj) as tar:
tar.extractall(path=temp_directory)
commit_message = "Uploaded revision with added files from tar."
self._upload_dir_to_repository(
repository, temp_directory, commit_message=commit_message, strings_displayed=strings_displayed
)
[docs]
def commit_tar_to_repository(
self, repository: Repository, source: str, commit_message=None, strings_displayed=None
):
full_source = TEST_DATA_REPO_FILES.joinpath(source)
assert full_source.is_file(), f"Attempting to upload {full_source} as a tar which is not a file"
populator = self.user_populator()
if strings_displayed is None:
# Just assume this is a valid upload...
populator.upload_revision(repository, full_source, commit_message=commit_message)
else:
response = populator.upload_revision_raw(repository, full_source, commit_message=commit_message)
try:
text = response.json()["message"]
except Exception:
text = response.text
for string_displayed in strings_displayed:
if string_displayed not in text:
raise AssertionError(f"Failed to find {string_displayed} in JSON response {text}")
[docs]
def delete_files_from_repository(self, repository: Repository, filenames: list[str]):
with self.cloned_repo(repository) as temp_directory:
for filename in filenames:
to_delete = os.path.join(temp_directory, filename)
os.remove(to_delete)
commit_message = "Uploaded revision with deleted files."
self._upload_dir_to_repository(repository, temp_directory, commit_message=commit_message)
def _upload_dir_to_repository(self, repository: Repository, target, commit_message, strings_displayed=None):
tf = tempfile.NamedTemporaryFile()
with tarfile.open(tf.name, "w:gz") as tar:
tar.add(target, arcname=".")
target = os.path.abspath(tf.name)
self.commit_tar_to_repository(
repository, target, commit_message=commit_message, strings_displayed=strings_displayed
)
[docs]
def delete_repository(self, repository: Repository) -> None:
repository_id = repository.id
self.visit_url("/admin/browse_repositories")
params = {"operation": "Delete", "id": repository_id}
self.visit_url("/admin/browse_repositories", params=params)
strings_displayed = ["Deleted 1 repository", repository.name]
strings_not_displayed: list[str] = []
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def display_installed_jobs_list_page(self, installed_repository, data_manager_names=None, strings_displayed=None):
assert self._installation_client
self._installation_client.display_installed_jobs_list_page(
installed_repository, data_manager_names, strings_displayed
)
[docs]
def display_installed_repository_manage_json(self, installed_repository):
assert self._installation_client
return self._installation_client.installed_repository_extended_info(installed_repository)
[docs]
def display_manage_repository_page(
self, repository: Repository, changeset_revision=None, strings_displayed=None, strings_not_displayed=None
):
params = {"id": repository.id}
if changeset_revision:
params["changeset_revision"] = changeset_revision
url = "/repository/manage_repository"
if self.is_v2:
url = f"/repositories/{repository.id}"
self.visit_url(url, params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def display_repository_clone_page(
self, owner_name, repository_name, strings_displayed=None, strings_not_displayed=None
):
url = f"/repos/{owner_name}/{repository_name}"
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def display_repository_file_contents(
self, repository: Repository, filename, filepath=None, strings_displayed=None, strings_not_displayed=None
):
"""Find a file in the repository and display the contents."""
basepath = self.get_repo_path(repository)
repository_file_list = []
if filepath:
relative_path = os.path.join(basepath, filepath)
else:
relative_path = basepath
repository_file_list = self.get_repository_file_list(
repository=repository, base_path=relative_path, current_path=None
)
assert filename in repository_file_list, f"File {filename} not found in the repository under {relative_path}."
params = dict(file_path=os.path.join(relative_path, filename), repository_id=repository.id)
url = "/repository/get_file_contents"
self.visit_url(url, params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def edit_repository_categories(
self,
repository: Repository,
categories_to_add: list[str],
categories_to_remove: list[str],
restore_original=True,
) -> None:
params = {"id": repository.id}
self.visit_url("/repository/manage_repository", params=params)
self._browser.edit_repository_categories(categories_to_add, categories_to_remove)
[docs]
def enable_email_alerts(self, repository: Repository, strings_displayed=None, strings_not_displayed=None) -> None:
repository_id = repository.id
params = dict(operation="Receive email alerts", id=repository_id)
self.visit_url("/repository/browse_repositories", params)
self.check_for_strings(strings_displayed)
[docs]
def escape_html(self, string, unescape=False):
html_entities = [("&", "X"), ("'", "'"), ('"', """)]
for character, replacement in html_entities:
if unescape:
string = string.replace(replacement, character)
else:
string = string.replace(character, replacement)
return string
[docs]
def expect_repo_created_strings(self, name):
return [
f"Repository <b>{name}</b>",
f"Repository <b>{name}</b> has been created",
]
[docs]
def generate_complex_dependency_xml(self, filename, filepath, repository_tuples, package, version):
file_path = os.path.join(filepath, filename)
dependency_entries = []
template = string.Template(common.new_repository_dependencies_line)
for toolshed_url, name, owner, changeset_revision in repository_tuples:
dependency_entries.append(
template.safe_substitute(
toolshed_url=toolshed_url,
owner=owner,
repository_name=name,
changeset_revision=changeset_revision,
prior_installation_required="",
)
)
if not os.path.exists(filepath):
os.makedirs(filepath)
dependency_template = string.Template(common.complex_repository_dependency_template)
repository_dependency_xml = dependency_template.safe_substitute(
package=package, version=version, dependency_lines="\n".join(dependency_entries)
)
# Save the generated xml to the specified location.
open(file_path, "w").write(repository_dependency_xml)
return file_path
[docs]
def generate_simple_dependency_xml(
self,
repository_tuples,
filename,
filepath,
dependency_description="",
complex=False,
package=None,
version=None,
prior_installation_required=False,
):
if not os.path.exists(filepath):
os.makedirs(filepath)
dependency_entries = []
if prior_installation_required:
prior_installation_value = ' prior_installation_required="True"'
else:
prior_installation_value = ""
for toolshed_url, name, owner, changeset_revision in repository_tuples:
template = string.Template(common.new_repository_dependencies_line)
dependency_entries.append(
template.safe_substitute(
toolshed_url=toolshed_url,
owner=owner,
repository_name=name,
changeset_revision=changeset_revision,
prior_installation_required=prior_installation_value,
)
)
if dependency_description:
description = f' description="{dependency_description}"'
else:
description = dependency_description
template_parser = string.Template(common.new_repository_dependencies_xml)
repository_dependency_xml = template_parser.safe_substitute(
description=description, dependency_lines="\n".join(dependency_entries)
)
# Save the generated xml to the specified location.
full_path = os.path.join(filepath, filename)
open(full_path, "w").write(repository_dependency_xml)
return full_path
[docs]
def generate_temp_path(self, test_script_path, additional_paths=None):
additional_paths = additional_paths or []
temp_path = os.path.join(self.tool_shed_test_tmp_dir, test_script_path, os.sep.join(additional_paths))
if not os.path.exists(temp_path):
os.makedirs(temp_path)
return temp_path
[docs]
def get_all_installed_repositories(self) -> list[galaxy_model.ToolShedRepository]:
assert self._installation_client
return self._installation_client.get_all_installed_repositories()
[docs]
def get_filename(self, filename, filepath=None):
if filepath is not None:
return os.path.abspath(os.path.join(filepath, filename))
else:
return os.path.abspath(os.path.join(self.file_dir, filename))
[docs]
def get_hg_repo(self, path):
return hg.repository(ui.ui(), path.encode("utf-8"))
[docs]
def get_repositories_category_api(
self, categories: list[Category], strings_displayed=None, strings_not_displayed=None
):
for category in categories:
url = f"/api/categories/{category.id}/repositories"
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def get_or_create_repository(
self, category: Category, owner: str, name: str, strings_displayed=None, strings_not_displayed=None, **kwd
) -> Repository:
# If not checking for a specific string, it should be safe to assume that
# we expect repository creation to be successful.
if strings_displayed is None:
strings_displayed = ["Repository", name, "has been created"]
if strings_not_displayed is None:
strings_not_displayed = []
repository = self.populator.get_repository_for(owner, name)
if repository is None:
category_id = category.id
assert category_id
self.visit_url("/repository/create_repository")
self.submit_form(button="create_repository_button", name=name, category_id=category_id, **kwd)
self.check_for_strings(strings_displayed, strings_not_displayed)
repository = self.populator.get_repository_for(owner, name)
assert repository
return repository
[docs]
def get_repo_path(self, repository: Repository) -> str:
# An entry in the hgweb.config file looks something like: repos/test/mira_assembler = database/community_files/000/repo_123
lhs = f"repos/{repository.owner}/{repository.name}"
try:
return self.hgweb_config_manager.get_entry(lhs)
except Exception:
raise Exception(
f"Entry for repository {lhs} missing in hgweb config file {self.hgweb_config_manager.hgweb_config}."
)
[docs]
def get_repository_changelog_tuples(self, repository):
repo = self.get_hg_repo(self.get_repo_path(repository))
changelog_tuples = []
for changeset in repo.changelog:
ctx = repo[changeset]
changelog_tuples.append((ctx.rev(), ctx))
return changelog_tuples
[docs]
def get_repository_file_list(self, repository: Repository, base_path: str, current_path=None) -> list[str]:
"""Recursively load repository folder contents and append them to a list. Similar to os.walk but via /repository/open_folder."""
if current_path is None:
request_param_path = base_path
else:
request_param_path = os.path.join(base_path, current_path)
# Get the current folder's contents.
params = dict(folder_path=request_param_path, repository_id=repository.id)
url = "/repository/open_folder"
self.visit_url(url, params=params)
file_list = loads(self.last_page())
returned_file_list = []
if current_path is not None:
returned_file_list.append(current_path)
# Loop through the json dict returned by /repository/open_folder.
for file_dict in file_list:
if file_dict["isFolder"]:
# This is a folder. Get the contents of the folder and append it to the list,
# prefixed with the path relative to the repository root, if any.
if current_path is None:
returned_file_list.extend(
self.get_repository_file_list(
repository=repository, base_path=base_path, current_path=file_dict["title"]
)
)
else:
sub_path = os.path.join(current_path, file_dict["title"])
returned_file_list.extend(
self.get_repository_file_list(repository=repository, base_path=base_path, current_path=sub_path)
)
else:
# This is a regular file, prefix the filename with the current path and append it to the list.
if current_path is not None:
returned_file_list.append(os.path.join(current_path, file_dict["title"]))
else:
returned_file_list.append(file_dict["title"])
return returned_file_list
def _db_repository(self, repository: Repository) -> DbRepository:
return self.test_db_util.get_repository_by_name_and_owner(repository.name, repository.owner)
def _get_repository_by_name_and_owner(self, name: str, owner: str) -> Repository:
repo = self.populator.get_repository_for(owner, name)
if repo is None:
repo = self.populator.get_repository_for(owner, name, deleted="true")
assert repo
return repo
[docs]
def get_repository_tip(self, repository: Repository) -> str:
repo = self.get_hg_repo(self.get_repo_path(repository))
return str(repo[repo.changelog.tip()])
[docs]
def get_repository_first_revision(self, repository: Repository) -> str:
repo = self.get_hg_repo(self.get_repo_path(repository))
return str(repo[0])
def _get_metadata_revision_count(self, repository: Repository) -> int:
repository_metadata: RepositoryMetadata = self.populator.get_metadata(repository, downloadable_only=False)
return len(repository_metadata.root)
[docs]
def grant_role_to_user(self, user, role):
strings_displayed = [self.security.encode_id(role.id), role.name]
strings_not_displayed = []
self.visit_url("/admin/roles")
self.check_for_strings(strings_displayed, strings_not_displayed)
params = dict(operation="manage users and groups", id=self.security.encode_id(role.id))
url = "/admin/roles"
self.visit_url(url, params)
strings_displayed = [common.test_user_1_email, common.test_user_2_email]
self.check_for_strings(strings_displayed, strings_not_displayed)
# As elsewhere, twill limits the possibility of submitting the form, this time due to not executing the javascript
# attached to the role selection form. Visit the action url directly with the necessary parameters.
params = dict(
id=self.security.encode_id(role.id),
in_users=user.id,
operation="manage users and groups",
role_members_edit_button="Save",
)
url = "/admin/manage_users_and_groups_for_role"
self.visit_url(url, params)
strings_displayed = [f"Role '{role.name}' has been updated"]
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def grant_write_access(
self,
repository: Repository,
usernames=None,
strings_displayed=None,
strings_not_displayed=None,
post_submit_strings_displayed=None,
post_submit_strings_not_displayed=None,
):
usernames = usernames or []
self.display_manage_repository_page(repository)
self.check_for_strings(strings_displayed, strings_not_displayed)
self._browser.grant_users_access(usernames)
self.check_for_strings(post_submit_strings_displayed, post_submit_strings_not_displayed)
def _install_repository(
self,
name: str,
owner: str,
category_name: str,
install_tool_dependencies: bool = False,
install_repository_dependencies: bool = True,
changeset_revision: Optional[str] = None,
preview_strings_displayed: Optional[list[str]] = None,
new_tool_panel_section_label: Optional[str] = None,
) -> None:
self.browse_tool_shed(url=self.url)
category = self.populator.get_category_with_name(category_name)
self.browse_category(category)
self.preview_repository_in_tool_shed(name, owner, strings_displayed=preview_strings_displayed)
repository = self._get_repository_by_name_and_owner(name, owner)
assert repository
# repository_id = repository.id
if changeset_revision is None:
changeset_revision = self.get_repository_tip(repository)
assert self._installation_client
self._installation_client.install_repository(
name,
owner,
changeset_revision,
install_tool_dependencies,
install_repository_dependencies,
new_tool_panel_section_label,
)
[docs]
def load_citable_url(
self,
username,
repository_name,
changeset_revision,
encoded_user_id,
encoded_repository_id,
strings_displayed=None,
strings_not_displayed=None,
strings_displayed_in_iframe=None,
strings_not_displayed_in_iframe=None,
):
strings_displayed_in_iframe = strings_displayed_in_iframe or []
strings_not_displayed_in_iframe = strings_not_displayed_in_iframe or []
url = f"{self.url}/view/{username}"
# If repository name is passed in, append that to the url.
if repository_name:
url += f"/{repository_name}"
if changeset_revision:
# Changeset revision should never be provided unless repository name also is.
assert repository_name is not None, "Changeset revision is present, but repository name is not - aborting."
url += f"/{changeset_revision}"
if self.is_v2:
# I think pagination broke this legacy test - so I added this
url += "?rows_per_page=250"
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
if self.is_v2:
self.check_for_strings(strings_displayed_in_iframe, strings_not_displayed_in_iframe)
else:
# Now load the page that should be displayed inside the iframe and check for strings.
if encoded_repository_id:
params = {"id": encoded_repository_id, "operation": "view_or_manage_repository"}
if changeset_revision:
params["changeset_revision"] = changeset_revision
self.visit_url("/repository/view_repository", params=params)
self.check_for_strings(strings_displayed_in_iframe, strings_not_displayed_in_iframe)
elif encoded_user_id:
params = {"user_id": encoded_user_id, "operation": "repositories_by_user"}
self.visit_url("/repository/browse_repositories", params=params)
self.check_for_strings(strings_displayed_in_iframe, strings_not_displayed_in_iframe)
[docs]
def load_checkable_revisions(self, strings_displayed=None, strings_not_displayed=None):
params = {
"do_not_test": "false",
"downloadable": "true",
"includes_tools": "true",
"malicious": "false",
"missing_test_components": "false",
"skip_tool_test": "false",
}
self.visit_url("/api/repository_revisions", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def load_display_tool_page(
self,
repository: Repository,
tool_xml_path,
changeset_revision,
strings_displayed=None,
strings_not_displayed=None,
):
params = {
"repository_id": repository.id,
"tool_config": tool_xml_path,
"changeset_revision": changeset_revision,
}
self.visit_url("/repository/display_tool", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def load_invalid_tool_page(
self, repository: Repository, tool_xml, changeset_revision, strings_displayed=None, strings_not_displayed=None
):
params = {
"repository_id": repository.id,
"tool_config": tool_xml,
"changeset_revision": changeset_revision,
}
self.visit_url("/repository/load_invalid_tool", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def reactivate_repository(self, installed_repository):
self._installation_client.reactivate_repository(installed_repository)
[docs]
def reinstall_repository_api(
self,
installed_repository,
install_repository_dependencies=True,
install_tool_dependencies=False,
new_tool_panel_section_label="",
):
name = installed_repository.name
owner = installed_repository.owner
self._installation_client.install_repository(
name,
owner,
installed_repository.installed_changeset_revision,
install_tool_dependencies,
install_repository_dependencies,
new_tool_panel_section_label,
)
[docs]
def repository_is_new(self, repository: Repository) -> bool:
repo = self.get_hg_repo(self.get_repo_path(repository))
tip_ctx = repo[repo.changelog.tip()]
return tip_ctx.rev() < 0
[docs]
def revoke_write_access(self, repository, username):
params = {"user_access_button": "Remove", "id": repository.id, "remove_auth": username}
self.visit_url("/repository/manage_repository", params=params)
[docs]
def set_repository_deprecated(
self, repository: Repository, set_deprecated=True, strings_displayed=None, strings_not_displayed=None
):
params = {"id": repository.id, "mark_deprecated": set_deprecated}
self.visit_url("/repository/deprecate", params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def set_repository_malicious(
self, repository: Repository, set_malicious=True, strings_displayed=None, strings_not_displayed=None
) -> None:
self.display_manage_repository_page(repository)
self._browser.fill_form_value("malicious", "malicious", set_malicious)
self._browser.submit_form_with_name("malicious", "malicious_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs]
def undelete_repository(self, repository: Repository) -> None:
params = {"operation": "Undelete", "id": repository.id}
self.visit_url("/admin/browse_repositories", params=params)
strings_displayed = ["Undeleted 1 repository", repository.name]
strings_not_displayed: list[str] = []
self.check_for_strings(strings_displayed, strings_not_displayed)
def _uninstall_repository(self, installed_repository: galaxy_model.ToolShedRepository) -> None:
assert self._installation_client
self._installation_client.uninstall_repository(installed_repository)
[docs]
def update_installed_repository(
self, installed_repository: galaxy_model.ToolShedRepository, verify_no_updates: bool = False
) -> dict[str, Any]:
assert self._installation_client
return self._installation_client.update_installed_repository(installed_repository, verify_no_updates=False)
[docs]
def verify_installed_repositories(self, installed_repositories=None, uninstalled_repositories=None):
installed_repositories = installed_repositories or []
uninstalled_repositories = uninstalled_repositories or []
for repository_name, repository_owner in installed_repositories:
galaxy_repository = self._get_installed_repository_by_name_owner(repository_name, repository_owner)
if galaxy_repository:
assert (
galaxy_repository.status == "Installed"
), f"Repository {repository_name} should be installed, but is {galaxy_repository.status}"
@property
def shed_tool_data_table_conf(self):
return self._installation_client.shed_tool_data_table_conf
@property
def tool_data_path(self):
return self._installation_client.tool_data_path
def _refresh_tool_shed_repository(self, repo: galaxy_model.ToolShedRepository) -> None:
assert self._installation_client
self._installation_client.refresh_tool_shed_repository(repo)
[docs]
def verify_installed_repository_data_table_entries(self, required_data_table_entries):
# The value of the received required_data_table_entries will be something like: [ 'sam_fa_indexes' ]
shed_tool_data_table_conf = self.shed_tool_data_table_conf
data_tables, error_message = xml_util.parse_xml(shed_tool_data_table_conf)
with open(shed_tool_data_table_conf) as f:
shed_tool_data_table_conf_contents = f.read()
assert (
not error_message
), f"Failed to parse {shed_tool_data_table_conf} properly. File contents [{shed_tool_data_table_conf_contents}]"
found = False
# With the tool shed, the "path" attribute that is hard-coded into the tool_data_tble_conf.xml
# file is ignored. This is because the tool shed requires the directory location to which this
# path points to be empty except when a specific tool is loaded. The default location for this
# directory configured for the tool shed is <Galaxy root>/shed-tool-data. When a tool is loaded
# in the tool shed, all contained .loc.sample files are copied to this directory and the
# ToolDataTableManager parses and loads the files in the same way that Galaxy does with a very
# important exception. When the tool shed loads a tool and parses and loads the copied ,loc.sample
# files, the ToolDataTableManager is already instantiated, and so its add_new_entries_from_config_file()
# method is called and the tool_data_path parameter is used to over-ride the hard-coded "tool-data"
# directory that Galaxy always uses.
#
# Tool data table xml structure:
# <tables>
# <table comment_char="#" name="sam_fa_indexes">
# <columns>line_type, value, path</columns>
# <file path="tool-data/sam_fa_indices.loc" />
# </table>
# </tables>
required_data_table_entry = None
for table_elem in data_tables.findall("table"):
# The value of table_elem will be something like: <table comment_char="#" name="sam_fa_indexes">
for required_data_table_entry in required_data_table_entries:
# The value of required_data_table_entry will be something like: 'sam_fa_indexes'
if "name" in table_elem.attrib and table_elem.attrib["name"] == required_data_table_entry:
found = True
# We're processing something like: sam_fa_indexes
file_elem = table_elem.find("file")
# We have something like: <file path="tool-data/sam_fa_indices.loc" />
# The "path" attribute of the "file" tag is the location that Galaxy always uses because the
# Galaxy ToolDataTableManager was implemented in such a way that the hard-coded path is used
# rather than allowing the location to be a configurable setting like the tool shed requires.
file_path = file_elem.get("path", None)
# The value of file_path will be something like: "tool-data/all_fasta.loc"
assert (
file_path is not None
), f'The "path" attribute is missing for the {required_data_table_entry} entry.'
# The following test is probably not necesary, but the tool-data directory should exist!
galaxy_tool_data_dir, loc_file_name = os.path.split(file_path)
assert (
galaxy_tool_data_dir is not None
), f"The hard-coded Galaxy tool-data directory is missing for the {required_data_table_entry} entry."
assert os.path.exists(galaxy_tool_data_dir), "The Galaxy tool-data directory does not exist."
# Make sure the loc_file_name was correctly copied into the configured directory location.
configured_file_location = os.path.join(self.tool_data_path, loc_file_name)
assert os.path.isfile(
configured_file_location
), f'The expected copied file "{configured_file_location}" is missing.'
# We've found the value of the required_data_table_entry in data_tables, which is the parsed
# shed_tool_data_table_conf.xml, so all is well!
break
if found:
break
# We better have an entry like: <table comment_char="#" name="sam_fa_indexes"> in our parsed data_tables
# or we know that the repository was not correctly installed!
if not found:
if required_data_table_entry is None:
raise AssertionError(
f"No tables found in {shed_tool_data_table_conf}. File contents {shed_tool_data_table_conf_contents}"
)
else:
raise AssertionError(
f"No entry for {required_data_table_entry} in {shed_tool_data_table_conf}. File contents {shed_tool_data_table_conf_contents}"
)
def _get_installed_repository_by_name_owner(
self, repository_name: str, repository_owner: str
) -> galaxy_model.ToolShedRepository:
assert self._installation_client
return self._installation_client.get_installed_repository_by_name_owner(repository_name, repository_owner)
def _get_installed_repositories_by_name_owner(
self, repository_name: str, repository_owner: str
) -> list[galaxy_model.ToolShedRepository]:
assert self._installation_client
return self._installation_client.get_installed_repositories_by_name_owner(repository_name, repository_owner)
def _get_installed_repository_for(
self, owner: Optional[str] = None, name: Optional[str] = None, changeset: Optional[str] = None
):
assert self._installation_client
return self._installation_client.get_installed_repository_for(owner=owner, name=name, changeset=changeset)
def _assert_has_installed_repos_with_names(self, *names):
for name in names:
assert self._get_installed_repository_for(name=name)
def _assert_has_no_installed_repos_with_names(self, *names):
for name in names:
assert not self._get_installed_repository_for(name=name)
def _assert_has_missing_dependency(
self, installed_repository: galaxy_model.ToolShedRepository, repository_name: str
) -> None:
json = self.display_installed_repository_manage_json(installed_repository)
assert (
"missing_repository_dependencies" in json
), f"Expecting missing dependency {repository_name} but no missing dependencies found."
missing_repository_dependencies = json["missing_repository_dependencies"]
folder = missing_repository_dependencies["folders"][0]
assert "repository_dependencies" in folder
rds = folder["repository_dependencies"]
found_missing_repository_dependency = False
missing_repos = set()
for rd in rds:
missing_repos.add(rd["repository_name"])
if rd["repository_name"] == repository_name:
found_missing_repository_dependency = True
assert (
found_missing_repository_dependency
), f"Expecting missing dependency {repository_name} but the missing repositories were {missing_repos}."
def _assert_has_installed_repository_dependency(
self,
installed_repository: galaxy_model.ToolShedRepository,
repository_name: str,
changeset: Optional[str] = None,
) -> None:
json = self.display_installed_repository_manage_json(installed_repository)
if "repository_dependencies" not in json:
name = installed_repository.name
raise AssertionError(f"No repository dependencies were defined in {name}. manage json is {json}")
repository_dependencies = json["repository_dependencies"]
found = False
for folder in repository_dependencies.get("folders"):
for rd in folder["repository_dependencies"]:
if rd["repository_name"] != repository_name:
continue
if changeset and rd["changeset_revision"] != changeset:
continue
found = True
break
assert found, f"Failed to find target repository dependency in {json}"
def _assert_is_not_missing_dependency(
self, installed_repository: galaxy_model.ToolShedRepository, repository_name: str
) -> None:
json = self.display_installed_repository_manage_json(installed_repository)
if "missing_repository_dependencies" not in json:
return
missing_repository_dependencies = json["missing_repository_dependencies"]
folder = missing_repository_dependencies["folders"][0]
assert "repository_dependencies" in folder
rds = folder["repository_dependencies"]
found_missing_repository_dependency = False
for rd in rds:
if rd["repository_name"] == repository_name:
found_missing_repository_dependency = True
assert not found_missing_repository_dependency
def _assert_has_valid_tool_with_name(self, tool_name: str) -> None:
def assert_has():
assert self._installation_client
tool_names = self._installation_client.get_tool_names()
assert tool_name in tool_names
# May need to wait on toolbox reload.
wait_on_assertion(assert_has, f"toolbox to contain {tool_name}", 10)
def _assert_repo_has_tool_with_id(
self, installed_repository: galaxy_model.ToolShedRepository, tool_id: str
) -> None:
assert "tools" in installed_repository.metadata_, f"No valid tools were defined in {installed_repository.name}."
tools = installed_repository.metadata_["tools"]
found_it = False
for tool in tools: # type:ignore[attr-defined]
if "id" not in tool:
continue
if tool["id"] == tool_id:
found_it = True
break
assert found_it, f"Did not find valid tool with name {tool_id} in {tools}"
def _assert_repo_has_invalid_tool_in_file(
self, installed_repository: galaxy_model.ToolShedRepository, name: str
) -> None:
assert (
"invalid_tools" in installed_repository.metadata_
), f"No invalid tools were defined in {installed_repository.name}."
invalid_tools = installed_repository.metadata_["invalid_tools"]
found_it = name in invalid_tools
assert found_it, f"Did not find invalid tool file {name} in {invalid_tools}"
def _wait_for_installation(repository: galaxy_model.ToolShedRepository, refresh):
final_states = [
galaxy_model.ToolShedRepository.installation_status.ERROR,
galaxy_model.ToolShedRepository.installation_status.INSTALLED,
]
# Wait until all repositories are in a final state before returning. This ensures that subsequent tests
# are running against an installed repository, and not one that is still in the process of installing.
timeout_counter = 0
while repository.status not in final_states:
refresh(repository)
timeout_counter = timeout_counter + 1
# This timeout currently defaults to 10 minutes.
if timeout_counter > repository_installation_timeout:
raise AssertionError(
f"Repository installation timed out, {timeout_counter} seconds elapsed, repository state is {repository.status}."
)
time.sleep(1)
def _get_tool_panel_section_from_repository_metadata(metadata):
tool_metadata = metadata["tools"]
tool_guid = tool_metadata[0]["guid"]
assert "tool_panel_section" in metadata, f"Tool panel section not found in metadata: {metadata}"
tool_panel_section_metadata = metadata["tool_panel_section"]
tool_panel_section = tool_panel_section_metadata[tool_guid][0]["name"]
return tool_panel_section
[docs]
def get_installed_repository(session, name, owner, changeset):
ToolShedRepository = galaxy_model.ToolShedRepository
stmt = select(ToolShedRepository)
if name is not None:
stmt = stmt.where(ToolShedRepository.name == name)
if owner is not None:
stmt = stmt.where(ToolShedRepository.owner == owner)
if changeset is not None:
stmt = stmt.where(ToolShedRepository.changeset_revision == changeset)
stmt = stmt.where(ToolShedRepository.deleted == false())
stmt = stmt.where(ToolShedRepository.uninstalled == false())
return session.scalars(stmt).one_or_none()