Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.test.base.twilltestcase
import logging
import os
import re
import string
import tempfile
import time
from json import loads
from urllib.parse import (
quote_plus,
urlencode,
urlparse,
)
import requests
import twill.commands as tc
from mercurial import commands, hg, ui
from twill.utils import ResultWrapper
import galaxy.model.tool_shed_install as galaxy_model
import galaxy.util
from galaxy.security import idencoding
from galaxy.util import (
DEFAULT_SOCKET_TIMEOUT,
smart_str,
unicodify,
)
from galaxy_test.base.api_util import get_admin_api_key
from galaxy_test.driver.testcase import DrivenFunctionalTestCase
from tool_shed.util import (
hg_util,
hgweb_config,
xml_util,
)
from . import common, test_db_util
# Set a 10 minute timeout for repository installation.
repository_installation_timeout = 600
# Dial ClientCookie logging down (very noisy)
logging.getLogger("ClientCookie.cookies").setLevel(logging.WARNING)
log = logging.getLogger(__name__)
tc.options['equiv_refresh_interval'] = 0
[docs]class ShedTwillTestCase(DrivenFunctionalTestCase):
"""Class of FunctionalTestCase geared toward HTML interactions using the Twill library."""
[docs] def setUp(self):
# Security helper
self.security = idencoding.IdEncodingHelper(id_secret='changethisinproductiontoo')
self.history_id = None
self.hgweb_config_dir = os.environ.get('TEST_HG_WEB_CONFIG_DIR')
self.hgweb_config_manager = hgweb_config.hgweb_config_manager
self.hgweb_config_manager.hgweb_config_dir = self.hgweb_config_dir
self.tool_shed_test_tmp_dir = os.environ.get('TOOL_SHED_TEST_TMP_DIR', None)
self.host = os.environ.get('TOOL_SHED_TEST_HOST')
self.port = os.environ.get('TOOL_SHED_TEST_PORT')
self.url = f"http://{self.host}:{self.port}"
self.galaxy_host = os.environ.get('GALAXY_TEST_HOST')
self.galaxy_port = os.environ.get('GALAXY_TEST_PORT')
self.galaxy_url = f"http://{self.galaxy_host}:{self.galaxy_port}"
self.shed_tool_data_table_conf = os.environ.get('TOOL_SHED_TEST_TOOL_DATA_TABLE_CONF')
self.file_dir = os.environ.get('TOOL_SHED_TEST_FILE_DIR', None)
self.tool_data_path = os.environ.get('GALAXY_TEST_TOOL_DATA_PATH')
self.shed_tool_conf = os.environ.get('GALAXY_TEST_SHED_TOOL_CONF')
self.test_db_util = test_db_util
# TODO: Figure out a way to alter these attributes during tests.
self.galaxy_tool_dependency_dir = os.environ.get('GALAXY_TEST_TOOL_DEPENDENCY_DIR')
[docs] def check_for_strings(self, strings_displayed=None, strings_not_displayed=None):
strings_displayed = strings_displayed or []
strings_not_displayed = strings_not_displayed or []
if strings_displayed:
for check_str in strings_displayed:
self.check_page_for_string(check_str)
if strings_not_displayed:
for check_str in strings_not_displayed:
self.check_string_not_in_page(check_str)
[docs] def check_page(self, strings_displayed, strings_displayed_count, strings_not_displayed):
"""Checks a page for strings displayed, not displayed and number of occurrences of a string"""
for check_str in strings_displayed:
self.check_page_for_string(check_str)
for check_str, count in strings_displayed_count:
self.check_string_count_in_page(check_str, count)
for check_str in strings_not_displayed:
self.check_string_not_in_page(check_str)
[docs] def check_page_for_string(self, patt):
"""Looks for 'patt' in the current browser page"""
page = unicodify(self.last_page())
if page.find(patt) == -1:
fname = self.write_temp_file(page)
errmsg = f"no match to '{patt}'\npage content written to '{fname}'\npage: [[{page}]]"
raise AssertionError(errmsg)
[docs] def check_string_not_in_page(self, patt):
"""Checks to make sure 'patt' is NOT in the page."""
page = self.last_page()
if page.find(patt) != -1:
fname = self.write_temp_file(page)
errmsg = f"string ({patt}) incorrectly displayed in page.\npage content written to '{fname}'"
raise AssertionError(errmsg)
# Functions associated with user accounts
[docs] def create(self, cntrller='user', email='test@bx.psu.edu', password='testuser', username='admin-user', redirect=''):
# HACK: don't use panels because late_javascripts() messes up the twill browser and it
# can't find form fields (and hence user can't be logged in).
params = dict(cntrller=cntrller, use_panels=False)
self.visit_url("/user/create", params)
tc.fv('registration', 'email', email)
tc.fv('registration', 'redirect', redirect)
tc.fv('registration', 'password', password)
tc.fv('registration', 'confirm', password)
tc.fv('registration', 'username', username)
tc.submit('create_user_button')
previously_created = False
username_taken = False
invalid_username = False
try:
self.check_page_for_string("Created new user account")
except AssertionError:
try:
# May have created the account in a previous test run...
self.check_page_for_string(f"User with email '{email}' already exists.")
previously_created = True
except AssertionError:
try:
self.check_page_for_string('Public name is taken; please choose another')
username_taken = True
except AssertionError:
# Note that we're only checking if the usr name is >< 4 chars here...
try:
self.check_page_for_string('Public name must be at least 4 characters in length')
invalid_username = True
except AssertionError:
pass
return previously_created, username_taken, invalid_username
[docs] def last_page(self):
"""
Return the last visited page (usually HTML, but can binary data as
well).
"""
return tc.browser.html
[docs] def login(self, email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='', logout_first=True):
# Clear cookies.
if logout_first:
self.logout()
# test@bx.psu.edu is configured as an admin user
previously_created, username_taken, invalid_username = \
self.create(email=email, password=password, username=username, redirect=redirect)
if previously_created:
# The acount has previously been created, so just login.
# HACK: don't use panels because late_javascripts() messes up the twill browser and it
# can't find form fields (and hence user can't be logged in).
params = {
'use_panels': False
}
self.visit_url('/user/login', params=params)
self.submit_form(button='login_button', login=email, redirect=redirect, password=password)
[docs] def logout(self):
self.visit_url("/user/logout")
self.check_page_for_string("You have been logged out")
[docs] def showforms(self):
"""Shows form, helpful for debugging new tests"""
return tc.browser.forms
[docs] def submit_form(self, form_no=-1, button="runtool_btn", form=None, **kwd):
"""Populates and submits a form from the keyword arguments."""
# An HTMLForm contains a sequence of Controls. Supported control classes are:
# TextControl, FileControl, ListControl, RadioControl, CheckboxControl, SelectControl,
# SubmitControl, ImageControl
if form is None:
try:
form = self.showforms()[form_no]
except IndexError:
raise ValueError("No form to submit found")
controls = {c.name: c for c in form.inputs}
form_name = form.get('name')
for control_name, control_value in kwd.items():
if control_name not in controls:
continue # these cannot be handled safely - cause the test to barf out
if not isinstance(control_value, list):
control_value = [str(control_value)]
control = controls[control_name]
control_type = getattr(control, "type", None)
if control_type in ("text", "textfield", "submit", "password", "TextareaElement", "checkbox", "radio", None):
for cv in control_value:
tc.fv(form_name, control.name, cv)
else:
# Add conditions for other control types here when necessary.
pass
tc.submit(button)
[docs] def visit_url(self, url, params=None, doseq=False, allowed_codes=None):
if allowed_codes is None:
allowed_codes = [200]
if params is None:
params = dict()
parsed_url = urlparse(url)
if len(parsed_url.netloc) == 0:
url = f'http://{self.host}:{self.port}{parsed_url.path}'
else:
url = f'{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}'
if parsed_url.query:
for query_parameter in parsed_url.query.split('&'):
key, value = query_parameter.split('=')
params[key] = value
if params:
url += f'?{urlencode(params, doseq=doseq)}'
new_url = tc.go(url)
return_code = tc.browser.code
assert return_code in allowed_codes, 'Invalid HTTP return code %s, allowed codes: %s' % \
(return_code, ', '.join(str(code) for code in allowed_codes))
return new_url
[docs] def write_temp_file(self, content, suffix='.html'):
with tempfile.NamedTemporaryFile(suffix=suffix, prefix='twilltestcase-', delete=False) as fh:
fh.write(smart_str(content))
return fh.name
[docs] def add_repository_review_component(self, **kwd):
params = {
'operation': 'create'
}
self.visit_url('/repository_review/create_component', params=params)
self.submit_form(button='create_component_button', **kwd)
[docs] def assign_admin_role(self, repository, user):
# As elsewhere, twill limits the possibility of submitting the form, this time due to not executing the javascript
# attached to the role selection form. Visit the action url directly with the necessary parameters.
params = {
'id': self.security.encode_id(repository.id),
'in_users': user.id,
'manage_role_associations_button': 'Save'
}
self.visit_url('/repository/manage_repository_admins', params=params)
self.check_for_strings(strings_displayed=['Role', 'has been associated'])
[docs] def browse_category(self, category, strings_displayed=None, strings_not_displayed=None):
params = {
'sort': 'name',
'operation': 'valid_repositories_by_category',
'id': self.security.encode_id(category.id)
}
self.visit_url('/repository/browse_valid_categories', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_component_review(self, review, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(review.id)
}
self.visit_url('/repository_review/browse_review', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_custom_datatypes(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository/browse_datatypes'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_repository(self, repository, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/browse_repository', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_repository_dependencies(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository/browse_repository_dependencies'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_tool_shed(self, url, strings_displayed=None, strings_not_displayed=None):
url = '/repository/browse_valid_categories'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_tool_dependencies(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository/browse_tool_dependencies'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def browse_tools(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository/browse_tools'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def check_count_of_metadata_revisions_associated_with_repository(self, repository, metadata_count):
self.check_repository_changelog(repository)
self.check_string_count_in_page('Repository metadata is associated with this change set.', metadata_count)
[docs] def check_exported_repository_dependency(self, dependency_filename, repository_name, repository_owner):
root, error_message = xml_util.parse_xml(dependency_filename)
for elem in root.findall('repository'):
if 'changeset_revision' in elem:
raise AssertionError('Exported repository %s with owner %s has a dependency with a defined changeset revision.' %
(repository_name, repository_owner))
if 'toolshed' in elem:
raise AssertionError('Exported repository %s with owner %s has a dependency with a defined tool shed.' %
(repository_name, repository_owner))
[docs] def check_for_valid_tools(self, repository, strings_displayed=None, strings_not_displayed=None):
if strings_displayed is None:
strings_displayed = ['Valid tools']
else:
strings_displayed.append('Valid tools')
self.display_manage_repository_page(repository, strings_displayed, strings_not_displayed)
[docs] def check_galaxy_repository_db_status(self, repository_name, owner, expected_status):
installed_repository = test_db_util.get_installed_repository_by_name_owner(repository_name, owner)
assert installed_repository.status == expected_status, 'Status in database is %s, expected %s' % \
(installed_repository.status, expected_status)
[docs] def check_galaxy_repository_tool_panel_section(self, repository, expected_tool_panel_section):
metadata = repository.metadata_
assert 'tools' in metadata, f'Tools not found in repository metadata: {metadata}'
# If integrated_tool_panel.xml is to be tested, this test method will need to be enhanced to handle tools
# from the same repository in different tool panel sections. Getting the first tool guid is ok, because
# currently all tools contained in a single repository will be loaded into the same tool panel section.
if repository.status in [galaxy_model.ToolShedRepository.installation_status.UNINSTALLED,
galaxy_model.ToolShedRepository.installation_status.DEACTIVATED]:
tool_panel_section = self.get_tool_panel_section_from_repository_metadata(metadata)
else:
tool_panel_section = self.get_tool_panel_section_from_api(metadata)
assert tool_panel_section == expected_tool_panel_section, 'Expected to find tool panel section *%s*, but instead found *%s*\nMetadata: %s\n' % \
(expected_tool_panel_section, tool_panel_section, metadata)
[docs] def check_installed_repository_tool_dependencies(self,
installed_repository,
strings_displayed=None,
strings_not_displayed=None,
dependencies_installed=False):
# Tool dependencies are not being installed in these functional tests. If this is changed, the test method will also need to be updated.
if not dependencies_installed:
strings_displayed.append('Missing tool dependencies')
else:
strings_displayed.append('Tool dependencies')
if dependencies_installed:
strings_displayed.append('Installed')
else:
strings_displayed.append('Never installed')
params = {
'id': self.security.encode_id(installed_repository.id)
}
self.visit_galaxy_url('/admin_toolshed/manage_repository', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def check_repository_changelog(self, repository, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/view_changelog', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def check_repository_dependency(self, repository, depends_on_repository, depends_on_changeset_revision=None, changeset_revision=None):
strings_displayed = [depends_on_repository.name, depends_on_repository.user.username]
if depends_on_changeset_revision:
strings_displayed.append(depends_on_changeset_revision)
self.display_manage_repository_page(repository, changeset_revision=changeset_revision, strings_displayed=strings_displayed)
[docs] def check_repository_metadata(self, repository, tip_only=True):
if tip_only:
assert self.tip_has_metadata(repository) and len(self.get_repository_metadata_revisions(repository)) == 1, \
'Repository tip is not a metadata revision: Repository tip - %s, metadata revisions - %s.'
else:
assert len(self.get_repository_metadata_revisions(repository)) > 0, \
'Repository tip is not a metadata revision: Repository tip - %s, metadata revisions - %s.' % \
(self.get_repository_tip(repository), ', '.join(self.get_repository_metadata_revisions(repository)))
[docs] def check_repository_tools_for_changeset_revision(self, repository, changeset_revision, tool_metadata_strings_displayed=None, tool_page_strings_displayed=None):
'''
Loop through each tool dictionary in the repository metadata associated with the received changeset_revision.
For each of these, check for a tools attribute, and load the tool metadata page if it exists, then display that tool's page.
'''
test_db_util.refresh(repository)
repository_metadata = self.get_repository_metadata_by_changeset_revision(repository, changeset_revision)
metadata = repository_metadata.metadata
if 'tools' not in metadata:
raise AssertionError(f'No tools in {repository.name} revision {changeset_revision}.')
for tool_dict in metadata['tools']:
tool_id = tool_dict['id']
tool_xml = tool_dict['tool_config']
params = {
'repository_id': self.security.encode_id(repository.id),
'changeset_revision': changeset_revision,
'tool_id': tool_id
}
self.visit_url('/repository/view_tool_metadata', params=params)
self.check_for_strings(tool_metadata_strings_displayed)
self.load_display_tool_page(repository, tool_xml_path=tool_xml,
changeset_revision=changeset_revision,
strings_displayed=tool_page_strings_displayed,
strings_not_displayed=None)
[docs] def check_repository_invalid_tools_for_changeset_revision(self, repository, changeset_revision, strings_displayed=None, strings_not_displayed=None):
'''Load the invalid tool page for each invalid tool associated with this changeset revision and verify the received error messages.'''
repository_metadata = self.get_repository_metadata_by_changeset_revision(repository, changeset_revision)
metadata = repository_metadata.metadata
assert 'invalid_tools' in metadata, f'Metadata for changeset revision {changeset_revision} does not define invalid tools'
for tool_xml in metadata['invalid_tools']:
self.load_invalid_tool_page(repository,
tool_xml=tool_xml,
changeset_revision=changeset_revision,
strings_displayed=strings_displayed,
strings_not_displayed=strings_not_displayed)
[docs] def check_string_count_in_page(self, pattern, min_count, max_count=None):
"""Checks the number of 'pattern' occurrences in the current browser page"""
page = self.last_page()
pattern_count = page.count(pattern)
if max_count is None:
max_count = min_count
# The number of occurrences of pattern in the page should be between min_count
# and max_count, so show error if pattern_count is less than min_count or greater
# than max_count.
if pattern_count < min_count or pattern_count > max_count:
fname = self.write_temp_file(page)
errmsg = "%i occurrences of '%s' found (min. %i, max. %i).\npage content written to '%s' " % \
(pattern_count, pattern, min_count, max_count, fname)
raise AssertionError(errmsg)
[docs] def clone_repository(self, repository, destination_path):
url = f'{self.url}/repos/{repository.user.username}/{repository.name}'
success, message = hg_util.clone_repository(url, destination_path, self.get_repository_tip(repository))
assert success is True, message
[docs] def commit_and_push(self, repository, hgrepo, options, username, password):
url = f'http://{username}:{password}@{self.host}:{self.port}/repos/{repository.user.username}/{repository.name}'
commands.commit(ui.ui(), hgrepo, **options)
# Try pushing multiple times as it transiently fails on Jenkins.
# TODO: Figure out why that happens
for _ in range(5):
try:
commands.push(ui.ui(), hgrepo, dest=url)
except Exception as e:
if str(e).find('Pushing to Tool Shed is disabled') != -1:
return False
else:
return True
raise
[docs] def create_category(self, **kwd):
category = test_db_util.get_category_by_name(kwd['name'])
if category is None:
params = {
'operation': 'create'
}
self.visit_url('/admin/manage_categories', params=params)
self.submit_form(button="create_category_button", **kwd)
category = test_db_util.get_category_by_name(kwd['name'])
return category
[docs] def create_repository_dependency(self,
repository=None,
repository_tuples=None,
filepath=None,
prior_installation_required=False,
complex=False,
package=None,
version=None,
strings_displayed=None,
strings_not_displayed=None):
repository_tuples = repository_tuples or []
repository_names = []
if complex:
filename = 'tool_dependencies.xml'
self.generate_complex_dependency_xml(filename=filename, filepath=filepath, repository_tuples=repository_tuples, package=package, version=version)
else:
for _, name, _, _ in repository_tuples:
repository_names.append(name)
dependency_description = f"{repository.name} depends on {', '.join(repository_names)}."
filename = 'repository_dependencies.xml'
self.generate_simple_dependency_xml(repository_tuples=repository_tuples,
filename=filename,
filepath=filepath,
dependency_description=dependency_description,
prior_installation_required=prior_installation_required)
self.upload_file(repository,
filename=filename,
filepath=filepath,
valid_tools_only=False,
uncompress_file=False,
remove_repo_files_not_in_tar=False,
commit_message=f"Uploaded dependency on {', '.join(repository_names)}.",
strings_displayed=None,
strings_not_displayed=None)
[docs] def create_repository_review(self, repository, review_contents_dict, changeset_revision=None, copy_from=None):
strings_displayed = []
if not copy_from:
strings_displayed.append('Begin your review')
strings_not_displayed = []
if not changeset_revision:
changeset_revision = self.get_repository_tip(repository)
params = {
'changeset_revision': changeset_revision,
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository_review/create_review', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
strings_displayed = []
if copy_from:
old_changeset_revision, review_id = copy_from
strings_displayed = ['You have elected to create a new review', 'Select previous revision', changeset_revision]
self.check_for_strings(strings_displayed)
strings_displayed = []
params = {
'changeset_revision': self.get_repository_tip(repository),
'id': self.security.encode_id(repository.id),
'previous_review_id': self.security.encode_id(review_id)
}
self.visit_url('/repository_review/create_review', params=params)
self.fill_review_form(review_contents_dict, strings_displayed, strings_not_displayed)
[docs] def create_user_in_galaxy(self, cntrller='user', email='test@bx.psu.edu', password='testuser', username='admin-user', redirect=''):
params = {
'username': username,
'email': email,
'password': password,
'confirm': password,
'session_csrf_token': self.galaxy_token()
}
self.visit_galaxy_url('/user/create', params=params, allowed_codes=[200, 400])
[docs] def deactivate_repository(self, installed_repository, strings_displayed=None, strings_not_displayed=None):
encoded_id = self.security.encode_id(installed_repository.id)
api_key = get_admin_api_key()
response = requests.delete(f"{self.galaxy_url}/api/tool_shed_repositories/{encoded_id}", data={'remove_from_disk': False, 'key': api_key}, timeout=DEFAULT_SOCKET_TIMEOUT)
assert response.status_code != 403, response.content
[docs] def delete_files_from_repository(self, repository, filenames=None, strings_displayed=None, strings_not_displayed=None):
filenames = filenames or []
files_to_delete = []
if strings_displayed is None:
strings_displayed = ['were deleted from the repository']
basepath = self.get_repo_path(repository)
repository_files = self.get_repository_file_list(repository=repository, base_path=basepath, current_path=None)
# Verify that the files to delete actually exist in the repository.
for filename in repository_files:
if filename in filenames:
files_to_delete.append(os.path.join(basepath, filename))
self.browse_repository(repository)
tc.fv("2", "selected_files_to_delete", ','.join(files_to_delete))
tc.submit('select_files_to_delete_button')
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def delete_repository(self, repository):
repository_id = self.security.encode_id(repository.id)
self.visit_url('/admin/browse_repositories')
params = {
'operation': 'Delete',
'id': repository_id
}
self.visit_url('/admin/browse_repositories', params=params)
strings_displayed = ['Deleted 1 repository', repository.name]
strings_not_displayed = []
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_galaxy_browse_repositories_page(self, strings_displayed=None, strings_not_displayed=None):
url = '/admin_toolshed/browse_repositories'
self.visit_galaxy_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_installed_jobs_list_page(self, installed_repository, data_manager_names=None, strings_displayed=None, strings_not_displayed=None):
data_managers = installed_repository.metadata_.get('data_manager', {}).get('data_managers', {})
if data_manager_names:
if not isinstance(data_manager_names, list):
data_manager_names = [data_manager_names]
for data_manager_name in data_manager_names:
assert data_manager_name in data_managers, f"The requested Data Manager '{data_manager_name}' was not found in repository metadata."
else:
data_manager_name = list(data_managers.keys())
for data_manager_name in data_manager_names:
params = {
'id': data_managers[data_manager_name]['guid']
}
self.visit_galaxy_url('/data_manager/jobs_list', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_installed_repository_manage_page(self, installed_repository, strings_displayed=None, strings_not_displayed=None):
if strings_displayed is None:
strings_displayed = []
if strings_not_displayed is None:
strings_not_displayed = []
params = {
'id': self.security.encode_id(installed_repository.id)
}
self.visit_galaxy_url('/admin_toolshed/manage_repository', params=params)
strings_displayed.append(str(installed_repository.installed_changeset_revision))
# Every place Galaxy's XXXX tool appears in attribute - need to quote.
strings_displayed = [x.replace("'", "'") for x in strings_displayed]
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_manage_repository_page(self, repository, changeset_revision=None, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(repository.id)
}
if changeset_revision:
params['changeset_revision'] = changeset_revision
self.visit_url('/repository/manage_repository', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_repository_clone_page(self, owner_name, repository_name, strings_displayed=None, strings_not_displayed=None):
url = f'/repos/{owner_name}/{repository_name}'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_repository_file_contents(self, repository, filename, filepath=None, strings_displayed=None, strings_not_displayed=None):
'''Find a file in the repository and display the contents.'''
basepath = self.get_repo_path(repository)
repository_file_list = []
if filepath:
relative_path = os.path.join(basepath, filepath)
else:
relative_path = basepath
repository_file_list = self.get_repository_file_list(repository=repository, base_path=relative_path, current_path=None)
assert filename in repository_file_list, f'File {filename} not found in the repository under {relative_path}.'
params = dict(file_path=os.path.join(relative_path, filename), repository_id=self.security.encode_id(repository.id))
url = '/repository/get_file_contents'
self.visit_url(url, params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_reviewed_repositories_owned_by_user(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository_review/reviewed_repositories_i_own'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def display_repository_reviews_by_user(self, user, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(user.id)
}
self.visit_url('/repository_review/repository_reviews_by_user', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def edit_repository_categories(self, repository, categories_to_add=None, categories_to_remove=None, restore_original=True):
categories_to_add = categories_to_add or []
categories_to_remove = categories_to_remove or []
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/manage_repository', params=params)
strings_displayed = []
strings_not_displayed = []
for category in categories_to_add:
tc.fv("2", "category_id", f'+{category}')
strings_displayed.append(f"selected>{category}")
for category in categories_to_remove:
tc.fv("2", "category_id", f'-{category}')
strings_not_displayed.append(f"selected>{category}")
tc.submit("manage_categories_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
if restore_original:
strings_displayed = []
strings_not_displayed = []
for category in categories_to_remove:
tc.fv("2", "category_id", f'+{category}')
strings_displayed.append(f"selected>{category}")
for category in categories_to_add:
tc.fv("2", "category_id", f'-{category}')
strings_not_displayed.append(f"selected>{category}")
tc.submit("manage_categories_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def edit_repository_information(self, repository, revert=True, **kwd):
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/manage_repository', params=params)
original_information = dict(repo_name=repository.name, description=repository.description, long_description=repository.long_description)
strings_displayed = []
for input_elem_name in ['repo_name', 'description', 'long_description', 'repository_type']:
if input_elem_name in kwd:
tc.fv("edit_repository", input_elem_name, kwd[input_elem_name])
strings_displayed.append(self.escape_html(kwd[input_elem_name]))
tc.submit("edit_repository_button")
self.check_for_strings(strings_displayed)
if revert:
strings_displayed = []
for input_elem_name in ['repo_name', 'description', 'long_description']:
tc.fv("edit_repository", input_elem_name, original_information[input_elem_name])
strings_displayed.append(self.escape_html(original_information[input_elem_name]))
tc.submit("edit_repository_button")
self.check_for_strings(strings_displayed)
[docs] def enable_email_alerts(self, repository, strings_displayed=None, strings_not_displayed=None):
repository_id = self.security.encode_id(repository.id)
params = dict(operation='Receive email alerts', id=repository_id)
self.visit_url('/repository/browse_repositories', params)
self.check_for_strings(strings_displayed)
[docs] def escape_html(self, string, unescape=False):
html_entities = [('&', 'X'), ("'", '''), ('"', '"')]
for character, replacement in html_entities:
if unescape:
string = string.replace(replacement, character)
else:
string = string.replace(character, replacement)
return string
[docs] def expect_repo_created_strings(self, name):
return [
f'Repository <b>{name}</b>',
f'Repository <b>{name}</b> has been created',
]
[docs] def fetch_repository_metadata(self, repository, strings_displayed=None, strings_not_displayed=None):
url = f'/api/repositories/{self.security.encode_id(repository.id)}/metadata'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def fill_review_form(self, review_contents_dict, strings_displayed=None, strings_not_displayed=None):
kwd = dict()
changed = False
for label, contents in review_contents_dict.items():
if contents:
changed = True
kwd[f'{label}__ESEP__comment'] = contents['comment']
kwd[f'{label}__ESEP__rating'] = contents['rating']
if 'private' in contents:
kwd[f'{label}__ESEP__private'] = contents['private']
kwd[f'{label}__ESEP__approved'] = contents['approved']
else:
kwd[f'{label}__ESEP__approved'] = 'not_applicable'
self.check_for_strings(strings_displayed, strings_not_displayed)
self.submit_form(button='Workflows__ESEP__review_button', **kwd)
if changed:
strings_displayed.append('Reviews were saved')
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def galaxy_token(self):
self.visit_galaxy_url("/")
html = self.last_page()
token_def_index = html.find("session_csrf_token")
token_sep_index = html.find(":", token_def_index)
token_quote_start_index = html.find('"', token_sep_index)
token_quote_end_index = html.find('"', token_quote_start_index + 1)
token = html[(token_quote_start_index + 1):token_quote_end_index]
return token
[docs] def galaxy_login(self, email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='', logout_first=True):
if logout_first:
self.galaxy_logout()
self.create_user_in_galaxy(email=email, password=password, username=username, redirect=redirect)
params = {
"login": email,
"password": password,
"session_csrf_token": self.galaxy_token()
}
self.visit_galaxy_url('/user/login', params=params)
[docs] def galaxy_logout(self):
self.visit_galaxy_url("/user/logout", params=dict(session_csrf_token=self.galaxy_token()))
[docs] def generate_complex_dependency_xml(self, filename, filepath, repository_tuples, package, version):
file_path = os.path.join(filepath, filename)
dependency_entries = []
template = string.Template(common.new_repository_dependencies_line)
for toolshed_url, name, owner, changeset_revision in repository_tuples:
dependency_entries.append(template.safe_substitute(toolshed_url=toolshed_url,
owner=owner,
repository_name=name,
changeset_revision=changeset_revision,
prior_installation_required=''))
if not os.path.exists(filepath):
os.makedirs(filepath)
dependency_template = string.Template(common.complex_repository_dependency_template)
repository_dependency_xml = dependency_template.safe_substitute(package=package, version=version, dependency_lines='\n'.join(dependency_entries))
# Save the generated xml to the specified location.
open(file_path, 'w').write(repository_dependency_xml)
[docs] def generate_simple_dependency_xml(self,
repository_tuples,
filename,
filepath,
dependency_description='',
complex=False,
package=None,
version=None,
prior_installation_required=False):
if not os.path.exists(filepath):
os.makedirs(filepath)
dependency_entries = []
if prior_installation_required:
prior_installation_value = ' prior_installation_required="True"'
else:
prior_installation_value = ''
for toolshed_url, name, owner, changeset_revision in repository_tuples:
template = string.Template(common.new_repository_dependencies_line)
dependency_entries.append(template.safe_substitute(toolshed_url=toolshed_url,
owner=owner,
repository_name=name,
changeset_revision=changeset_revision,
prior_installation_required=prior_installation_value))
if dependency_description:
description = f' description="{dependency_description}"'
else:
description = dependency_description
template_parser = string.Template(common.new_repository_dependencies_xml)
repository_dependency_xml = template_parser.safe_substitute(description=description, dependency_lines='\n'.join(dependency_entries))
# Save the generated xml to the specified location.
full_path = os.path.join(filepath, filename)
open(full_path, 'w').write(repository_dependency_xml)
[docs] def generate_temp_path(self, test_script_path, additional_paths=None):
additional_paths = additional_paths or []
temp_path = os.path.join(self.tool_shed_test_tmp_dir, test_script_path, os.sep.join(additional_paths))
if not os.path.exists(temp_path):
os.makedirs(temp_path)
return temp_path
[docs] def get_datatypes_count(self):
params = {
'upload_only': False
}
self.visit_galaxy_url('/api/datatypes', params=params)
html = self.last_page()
datatypes = loads(html)
return len(datatypes)
[docs] def get_env_sh_path(self, tool_dependency_name, tool_dependency_version, repository):
'''Return the absolute path to an installed repository's env.sh file.'''
env_sh_path = os.path.join(self.get_tool_dependency_path(tool_dependency_name, tool_dependency_version, repository),
'env.sh')
return env_sh_path
[docs] def get_filename(self, filename, filepath=None):
if filepath is not None:
return os.path.abspath(os.path.join(filepath, filename))
else:
return os.path.abspath(os.path.join(self.file_dir, filename))
[docs] def get_last_reviewed_revision_by_user(self, user, repository):
changelog_tuples = self.get_repository_changelog_tuples(repository)
reviews = test_db_util.get_reviews_ordered_by_changeset_revision(repository.id, changelog_tuples, reviewer_user_id=user.id)
if reviews:
last_review = reviews[-1]
else:
last_review = None
return last_review
[docs] def get_repositories_category_api(self, categories, strings_displayed=None, strings_not_displayed=None):
for category in categories:
url = f'/api/categories/{self.security.encode_id(category.id)}/repositories'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def get_tool_dependency_path(self, tool_dependency_name, tool_dependency_version, repository):
'''Return the absolute path for an installed tool dependency.'''
return os.path.join(self.galaxy_tool_dependency_dir,
tool_dependency_name,
tool_dependency_version,
repository.owner,
repository.name,
repository.installed_changeset_revision)
[docs] def get_or_create_repository(self, owner=None, strings_displayed=None, strings_not_displayed=None, **kwd):
# If not checking for a specific string, it should be safe to assume that
# we expect repository creation to be successful.
if strings_displayed is None:
strings_displayed = ['Repository', kwd['name'], 'has been created']
if strings_not_displayed is None:
strings_not_displayed = []
repository = test_db_util.get_repository_by_name_and_owner(kwd['name'], owner)
if repository is None:
self.visit_url('/repository/create_repository')
self.submit_form(button='create_repository_button', **kwd)
self.check_for_strings(strings_displayed, strings_not_displayed)
repository = test_db_util.get_repository_by_name_and_owner(kwd['name'], owner)
return repository
[docs] def get_repo_path(self, repository):
# An entry in the hgweb.config file looks something like: repos/test/mira_assembler = database/community_files/000/repo_123
lhs = f"repos/{repository.user.username}/{repository.name}"
try:
return self.hgweb_config_manager.get_entry(lhs)
except Exception:
raise Exception(f"Entry for repository {lhs} missing in hgweb config file {self.hgweb_config_manager.hgweb_config}.")
[docs] def get_repository_changelog_tuples(self, repository):
repo = self.get_hg_repo(self.get_repo_path(repository))
changelog_tuples = []
for changeset in repo.changelog:
ctx = repo[changeset]
changelog_tuples.append((ctx.rev(), ctx))
return changelog_tuples
[docs] def get_repository_datatypes_count(self, repository):
metadata = self.get_repository_metadata(repository)[0].metadata
if 'datatypes' not in metadata:
return 0
else:
return len(metadata['datatypes'])
[docs] def get_repository_file_list(self, repository, base_path, current_path=None):
'''Recursively load repository folder contents and append them to a list. Similar to os.walk but via /repository/open_folder.'''
if current_path is None:
request_param_path = base_path
else:
request_param_path = os.path.join(base_path, current_path)
# Get the current folder's contents.
params = dict(folder_path=request_param_path, repository_id=self.security.encode_id(repository.id))
url = '/repository/open_folder'
self.visit_url(url, params=params)
file_list = loads(self.last_page())
returned_file_list = []
if current_path is not None:
returned_file_list.append(current_path)
# Loop through the json dict returned by /repository/open_folder.
for file_dict in file_list:
if file_dict['isFolder']:
# This is a folder. Get the contents of the folder and append it to the list,
# prefixed with the path relative to the repository root, if any.
if current_path is None:
returned_file_list.extend(self.get_repository_file_list(repository=repository, base_path=base_path, current_path=file_dict['title']))
else:
sub_path = os.path.join(current_path, file_dict['title'])
returned_file_list.extend(self.get_repository_file_list(repository=repository, base_path=base_path, current_path=sub_path))
else:
# This is a regular file, prefix the filename with the current path and append it to the list.
if current_path is not None:
returned_file_list.append(os.path.join(current_path, file_dict['title']))
else:
returned_file_list.append(file_dict['title'])
return returned_file_list
[docs] def get_repository_metadata(self, repository):
return [metadata_revision for metadata_revision in repository.metadata_revisions]
[docs] def get_repository_metadata_by_changeset_revision(self, repository, changeset_revision):
return test_db_util.get_repository_metadata_for_changeset_revision(repository.id, changeset_revision)
[docs] def get_repository_metadata_revisions(self, repository):
return [str(repository_metadata.changeset_revision) for repository_metadata in repository.metadata_revisions]
[docs] def get_repository_tip(self, repository):
repo = self.get_hg_repo(self.get_repo_path(repository))
return str(repo[repo.changelog.tip()])
[docs] def get_sniffers_count(self):
url = '/api/datatypes/sniffers'
self.visit_galaxy_url(url)
html = self.last_page()
sniffers = loads(html)
return len(sniffers)
[docs] def get_tools_from_repository_metadata(self, repository, include_invalid=False):
'''Get a list of valid and (optionally) invalid tool dicts from the repository metadata.'''
valid_tools = []
invalid_tools = []
for repository_metadata in repository.metadata_revisions:
if 'tools' in repository_metadata.metadata:
valid_tools.append(dict(tools=repository_metadata.metadata['tools'], changeset_revision=repository_metadata.changeset_revision))
if include_invalid and 'invalid_tools' in repository_metadata.metadata:
invalid_tools.append(dict(tools=repository_metadata.metadata['invalid_tools'], changeset_revision=repository_metadata.changeset_revision))
return valid_tools, invalid_tools
[docs] def get_tool_panel_section_from_api(self, metadata):
tool_metadata = metadata['tools']
tool_guid = quote_plus(tool_metadata[0]['guid'], safe='')
api_url = f'/api/tools/{tool_guid}'
self.visit_galaxy_url(api_url)
tool_dict = loads(self.last_page())
tool_panel_section = tool_dict['panel_section_name']
return tool_panel_section
[docs] def get_tool_panel_section_from_repository_metadata(self, metadata):
tool_metadata = metadata['tools']
tool_guid = tool_metadata[0]['guid']
assert 'tool_panel_section' in metadata, f'Tool panel section not found in metadata: {metadata}'
tool_panel_section_metadata = metadata['tool_panel_section']
# tool_section_dict = dict( tool_config=guids_and_configs[ guid ],
# id=section_id,
# name=section_name,
# version=section_version )
# This dict is appended to tool_panel_section_metadata[ tool_guid ]
tool_panel_section = tool_panel_section_metadata[tool_guid][0]['name']
return tool_panel_section
[docs] def grant_role_to_user(self, user, role):
strings_displayed = [self.security.encode_id(role.id), role.name]
strings_not_displayed = []
self.visit_url('/admin/roles')
self.check_for_strings(strings_displayed, strings_not_displayed)
params = dict(operation='manage users and groups', id=self.security.encode_id(role.id))
url = '/admin/roles'
self.visit_url(url, params)
strings_displayed = [common.test_user_1_email, common.test_user_2_email]
self.check_for_strings(strings_displayed, strings_not_displayed)
# As elsewhere, twill limits the possibility of submitting the form, this time due to not executing the javascript
# attached to the role selection form. Visit the action url directly with the necessary parameters.
params = dict(id=self.security.encode_id(role.id),
in_users=user.id,
operation='manage users and groups',
role_members_edit_button='Save')
url = '/admin/manage_users_and_groups_for_role'
self.visit_url(url, params)
strings_displayed = [f"Role '{role.name}' has been updated"]
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def grant_write_access(self,
repository,
usernames=None,
strings_displayed=None,
strings_not_displayed=None,
post_submit_strings_displayed=None,
post_submit_strings_not_displayed=None):
usernames = usernames or []
self.display_manage_repository_page(repository)
self.check_for_strings(strings_displayed, strings_not_displayed)
for username in usernames:
tc.fv("user_access", "allow_push", f'+{username}')
tc.submit('user_access_button')
self.check_for_strings(post_submit_strings_displayed, post_submit_strings_not_displayed)
[docs] def initiate_installation_process(self,
install_tool_dependencies=False,
install_repository_dependencies=True,
no_changes=True,
new_tool_panel_section_label=None):
html = self.last_page()
# Since the installation process is by necessity asynchronous, we have to get the parameters to 'manually' initiate the
# installation process. This regex will return the tool shed repository IDs in group(1), the encoded_kwd parameter in
# group(2), and the reinstalling flag in group(3) and pass them to the manage_repositories method in the Galaxy
# admin_toolshed controller.
install_parameters = re.search(r'initiate_repository_installation\( "([^"]+)", "([^"]+)", "([^"]+)" \);', html)
if install_parameters:
iri_ids = install_parameters.group(1)
# In some cases, the returned iri_ids are of the form: "[u'<encoded id>', u'<encoded id>']"
# This regex ensures that non-hex characters are stripped out of the list, so that galaxy.util.listify/decode_id
# will handle them correctly. It's safe to pass the cleaned list to manage_repositories, because it can parse
# comma-separated values.
repository_ids = str(iri_ids)
repository_ids = re.sub('[^a-fA-F0-9,]+', '', repository_ids)
encoded_kwd = install_parameters.group(2)
reinstalling = install_parameters.group(3)
params = {
'tool_shed_repository_ids': ','.join(galaxy.util.listify(repository_ids)),
'encoded_kwd': encoded_kwd,
'reinstalling': reinstalling
}
self.visit_galaxy_url('/admin_toolshed/install_repositories', params=params)
return galaxy.util.listify(repository_ids)
[docs] def install_repository(self, name, owner, category_name, install_resolver_dependencies=False, install_tool_dependencies=False,
install_repository_dependencies=True, changeset_revision=None,
strings_displayed=None, strings_not_displayed=None, preview_strings_displayed=None,
post_submit_strings_displayed=None, new_tool_panel_section_label=None, includes_tools_for_display_in_tool_panel=True,
**kwd):
self.browse_tool_shed(url=self.url)
self.browse_category(test_db_util.get_category_by_name(category_name))
self.preview_repository_in_tool_shed(name, owner, strings_displayed=preview_strings_displayed)
repository = test_db_util.get_repository_by_name_and_owner(name, owner)
repository_id = self.security.encode_id(repository.id)
if changeset_revision is None:
changeset_revision = self.get_repository_tip(repository)
params = {
'changeset_revisions': changeset_revision,
'repository_ids': repository_id,
'galaxy_url': self.galaxy_url
}
self.visit_url('/repository/install_repositories_by_revision', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
form = tc.browser.form('select_tool_panel_section')
if form is None:
form = tc.browser.form('select_shed_tool_panel_config')
assert form is not None, 'Could not find form select_shed_tool_panel_config or select_tool_panel_section.'
kwds = {
'install_tool_dependencies': install_tool_dependencies,
'install_repository_dependencies': install_repository_dependencies,
'install_resolver_dependencies': install_resolver_dependencies,
'shed_tool_conf': self.shed_tool_conf,
}
if new_tool_panel_section_label is not None:
kwds['new_tool_panel_section_label'] = new_tool_panel_section_label
submit_button = [inp.name for inp in form.inputs if getattr(inp, 'type', None) == 'submit']
if len(submit_button) == 0:
# TODO: refactor, use regular TS install API
submit_kwargs = {inp.name: inp.value for inp in tc.browser.forms[0].inputs if getattr(inp, 'type', None) == 'submit'}
payload = {_: form.inputs[_].value for _ in form.fields.keys()}
payload.update(kwds)
payload.update(submit_kwargs)
r = tc.browser._session.post(
self.galaxy_url + form.action,
data=payload,
)
tc.browser.result = ResultWrapper(r)
else:
assert len(submit_button) == 1, f"Expected to find a single submit button, found {len(submit_button)} ({','.join(submit_button)})"
submit_button = submit_button[0]
self.submit_form(form=form, button=submit_button, **kwds)
self.check_for_strings(post_submit_strings_displayed, strings_not_displayed)
repository_ids = self.initiate_installation_process(new_tool_panel_section_label=new_tool_panel_section_label)
log.debug(f'Waiting for the installation of repository IDs: {str(repository_ids)}')
self.wait_for_repository_installation(repository_ids)
[docs] def load_citable_url(self,
username,
repository_name,
changeset_revision,
encoded_user_id,
encoded_repository_id,
strings_displayed=None,
strings_not_displayed=None,
strings_displayed_in_iframe=None,
strings_not_displayed_in_iframe=None):
strings_displayed_in_iframe = strings_displayed_in_iframe or []
strings_not_displayed_in_iframe = strings_not_displayed_in_iframe or []
url = f'{self.url}/view/{username}'
# If repository name is passed in, append that to the url.
if repository_name:
url += f'/{repository_name}'
if changeset_revision:
# Changeset revision should never be provided unless repository name also is.
assert repository_name is not None, 'Changeset revision is present, but repository name is not - aborting.'
url += f'/{changeset_revision}'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
# Now load the page that should be displayed inside the iframe and check for strings.
if encoded_repository_id:
params = {
'id': encoded_repository_id,
'operation': 'view_or_manage_repository'
}
if changeset_revision:
params['changeset_revision'] = changeset_revision
self.visit_url('/repository/view_repository', params=params)
self.check_for_strings(strings_displayed_in_iframe, strings_not_displayed_in_iframe)
elif encoded_user_id:
params = {
'user_id': encoded_user_id,
'operation': 'repositories_by_user'
}
self.visit_url('/repository/browse_repositories', params=params)
self.check_for_strings(strings_displayed_in_iframe, strings_not_displayed_in_iframe)
[docs] def load_changeset_in_tool_shed(self, repository_id, changeset_revision, strings_displayed=None, strings_not_displayed=None):
params = {
'ctx_str': changeset_revision,
'id': repository_id
}
self.visit_url('/repository/view_changeset', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def load_checkable_revisions(self, strings_displayed=None, strings_not_displayed=None):
params = {
'do_not_test': 'false',
'downloadable': 'true',
'includes_tools': 'true',
'malicious': 'false',
'missing_test_components': 'false',
'skip_tool_test': 'false'
}
self.visit_url('/api/repository_revisions', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def load_display_tool_page(self, repository, tool_xml_path, changeset_revision, strings_displayed=None, strings_not_displayed=None):
params = {
'repository_id': self.security.encode_id(repository.id),
'tool_config': tool_xml_path,
'changeset_revision': changeset_revision
}
self.visit_url('/repository/display_tool', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def load_invalid_tool_page(self, repository, tool_xml, changeset_revision, strings_displayed=None, strings_not_displayed=None):
params = {
'repository_id': self.security.encode_id(repository.id),
'tool_config': tool_xml,
'changeset_revision': changeset_revision
}
self.visit_url('/repository/load_invalid_tool', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def load_page_for_installed_tool(self, tool_guid, strings_displayed=None, strings_not_displayed=None):
params = {
'tool_id': tool_guid
}
self.visit_galaxy_url('/tool_runner', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def manage_review_components(self, strings_displayed=None, strings_not_displayed=None):
url = '/repository_review/manage_components'
self.visit_url(url)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def preview_repository_in_tool_shed(self, name, owner, changeset_revision=None, strings_displayed=None, strings_not_displayed=None):
repository = test_db_util.get_repository_by_name_and_owner(name, owner)
if not changeset_revision:
changeset_revision = self.get_repository_tip(repository)
params = {
'repository_id': self.security.encode_id(repository.id),
'changeset_revision': changeset_revision
}
self.visit_url('/repository/preview_tools_in_changeset', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def reactivate_repository(self, installed_repository):
params = dict(id=self.security.encode_id(installed_repository.id))
url = '/admin_toolshed/restore_repository'
self.visit_galaxy_url(url, params=params)
[docs] def reinstall_repository(self,
installed_repository,
install_repository_dependencies=True,
install_tool_dependencies=False,
no_changes=True,
new_tool_panel_section_label='',
strings_displayed=None,
strings_not_displayed=None):
params = {
'id': self.security.encode_id(installed_repository.id)
}
self.visit_galaxy_url('/admin_toolshed/reselect_tool_panel_section', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed=None)
# Build the url that will simulate a filled-out form being submitted. Due to a limitation in twill, the reselect_tool_panel_section
# form doesn't get parsed correctly.
encoded_repository_id = self.security.encode_id(installed_repository.id)
params = dict(id=encoded_repository_id, no_changes=no_changes, new_tool_panel_section_label=new_tool_panel_section_label)
doseq = False
if install_repository_dependencies:
params['install_repository_dependencies'] = True
doseq = True
else:
params['install_repository_dependencies'] = False
if install_tool_dependencies:
params['install_tool_dependencies'] = True
doseq = True
else:
params['install_tool_dependencies'] = False
url = '/admin_toolshed/reinstall_repository'
self.visit_galaxy_url(url, params=params, doseq=doseq)
# Manually initiate the install process, as with installing a repository. See comments in the
# initiate_installation_process method for details.
repository_ids = self.initiate_installation_process(install_tool_dependencies,
install_repository_dependencies,
no_changes,
new_tool_panel_section_label)
# Finally, wait until all repositories are in a final state (either Error or Installed) before returning.
self.wait_for_repository_installation(repository_ids)
[docs] def repository_is_new(self, repository):
repo = self.get_hg_repo(self.get_repo_path(repository))
tip_ctx = repo[repo.changelog.tip()]
return tip_ctx.rev() < 0
[docs] def reset_installed_repository_metadata(self, repository):
encoded_id = self.security.encode_id(repository.id)
api_key = get_admin_api_key()
response = requests.post(f"{self.galaxy_url}/api/tool_shed_repositories/reset_metadata_on_selected_installed_repositories", data={'repository_ids': [encoded_id], 'key': api_key}, timeout=DEFAULT_SOCKET_TIMEOUT)
assert response.status_code != 403, response.content
[docs] def reset_metadata_on_selected_repositories(self, repository_ids):
self.visit_url('/admin/reset_metadata_on_selected_repositories_in_tool_shed')
kwd = dict(repository_ids=repository_ids)
self.submit_form(button="reset_metadata_on_selected_repositories_button", **kwd)
[docs] def reset_metadata_on_selected_installed_repositories(self, repository_ids):
api_key = get_admin_api_key()
response = requests.post(f"{self.galaxy_url}/api/tool_shed_repositories/reset_metadata_on_selected_installed_repositories", data={'repository_ids': repository_ids, 'key': api_key}, timeout=DEFAULT_SOCKET_TIMEOUT)
assert response.status_code != 403, response.content
[docs] def reset_repository_metadata(self, repository):
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/reset_all_metadata', params=params)
self.check_for_strings(['All repository metadata has been reset.'])
[docs] def review_repository(self, repository, review_contents_dict, user=None, changeset_revision=None):
strings_displayed = []
strings_not_displayed = []
if not changeset_revision:
changeset_revision = self.get_repository_tip(repository)
if user:
review = test_db_util.get_repository_review_by_user_id_changeset_revision(user.id, repository.id, changeset_revision)
params = {
'id': self.security.encode_id(review.id)
}
self.visit_url('/repository_review/edit_review', params=params)
self.fill_review_form(review_contents_dict, strings_displayed, strings_not_displayed)
[docs] def revoke_write_access(self, repository, username):
params = {
'user_access_button': 'Remove',
'id': self.security.encode_id(repository.id),
'remove_auth': username
}
self.visit_url('/repository/manage_repository', params=params)
[docs] def search_for_valid_tools(self, search_fields=None, exact_matches=False, strings_displayed=None, strings_not_displayed=None, from_galaxy=False):
params = {}
search_fields = search_fields or {}
if from_galaxy:
params['galaxy_url'] = self.galaxy_url
for field_name, search_string in search_fields.items():
self.visit_url('/repository/find_tools', params=params)
tc.fv("1", "exact_matches", exact_matches)
tc.fv("1", field_name, search_string)
tc.submit()
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def send_message_to_repository_owner(self,
repository,
message,
strings_displayed=None,
strings_not_displayed=None,
post_submit_strings_displayed=None,
post_submit_strings_not_displayed=None):
params = {
'id': self.security.encode_id(repository.id)
}
self.visit_url('/repository/contact_owner', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
tc.fv(1, 'message', message)
tc.submit()
self.check_for_strings(post_submit_strings_displayed, post_submit_strings_not_displayed)
[docs] def set_form_value(self, form, kwd, field_name, field_value):
'''
Set the form field field_name to field_value if it exists, and return the provided dict containing that value. If
the field does not exist in the provided form, return a dict without that index.
'''
form_id = form.attrib.get('id')
controls = [control for control in form.inputs if str(control.name) == field_name]
if len(controls) > 0:
log.debug(f'Setting field {field_name} of form {form_id} to {str(field_value)}.')
tc.formvalue(form_id, field_name, str(field_value))
kwd[field_name] = str(field_value)
else:
if field_name in kwd:
log.debug('No field %s in form %s, discarding from return value.', field_name, form_id)
del(kwd[field_name])
return kwd
[docs] def set_repository_deprecated(self, repository, set_deprecated=True, strings_displayed=None, strings_not_displayed=None):
params = {
'id': self.security.encode_id(repository.id),
'mark_deprecated': set_deprecated
}
self.visit_url('/repository/deprecate', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def set_repository_malicious(self, repository, set_malicious=True, strings_displayed=None, strings_not_displayed=None):
self.display_manage_repository_page(repository)
tc.fv("malicious", "malicious", set_malicious)
tc.submit("malicious_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def tip_has_metadata(self, repository):
tip = self.get_repository_tip(repository)
return test_db_util.get_repository_metadata_by_repository_id_changeset_revision(repository.id, tip)
[docs] def undelete_repository(self, repository):
params = {
'operation': 'Undelete',
'id': self.security.encode_id(repository.id)
}
self.visit_url('/admin/browse_repositories', params=params)
strings_displayed = ['Undeleted 1 repository', repository.name]
strings_not_displayed = []
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def uninstall_repository(self, installed_repository, strings_displayed=None, strings_not_displayed=None):
encoded_id = self.security.encode_id(installed_repository.id)
api_key = get_admin_api_key()
response = requests.delete(f"{self.galaxy_url}/api/tool_shed_repositories/{encoded_id}", data={'remove_from_disk': True, 'key': api_key}, timeout=DEFAULT_SOCKET_TIMEOUT)
assert response.status_code != 403, response.content
[docs] def update_installed_repository(self, installed_repository, strings_displayed=None, strings_not_displayed=None):
params = {
'name': installed_repository.name,
'owner': installed_repository.owner,
'changeset_revision': installed_repository.installed_changeset_revision,
'galaxy_url': self.galaxy_url
}
self.visit_url('/repository/check_for_updates', params=params)
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def update_tool_shed_status(self):
api_key = get_admin_api_key()
response = requests.get(f"{self.galaxy_url}/api/tool_shed_repositories/check_for_updates?key={api_key}", timeout=DEFAULT_SOCKET_TIMEOUT)
assert response.status_code != 403, response.content
[docs] def upload_file(self,
repository,
filename,
filepath,
valid_tools_only,
uncompress_file,
remove_repo_files_not_in_tar,
commit_message,
strings_displayed=None,
strings_not_displayed=None):
if strings_displayed is None:
strings_displayed = []
if strings_not_displayed is None:
strings_not_displayed = []
removed_message = 'files were removed from the repository'
if remove_repo_files_not_in_tar:
if not self.repository_is_new(repository):
if removed_message not in strings_displayed:
strings_displayed.append(removed_message)
else:
if removed_message not in strings_not_displayed:
strings_not_displayed.append(removed_message)
params = {
'repository_id': self.security.encode_id(repository.id)
}
self.visit_url('/upload/upload', params=params)
if valid_tools_only:
strings_displayed.extend(['has been successfully', 'uploaded to the repository.'])
tc.formfile("1", "file_data", self.get_filename(filename, filepath))
if uncompress_file:
tc.fv(1, 'uncompress_file', 'Yes')
else:
tc.fv(1, 'uncompress_file', 'No')
if not self.repository_is_new(repository):
if remove_repo_files_not_in_tar:
tc.fv(1, 'remove_repo_files_not_in_tar', 'Yes')
else:
tc.fv(1, 'remove_repo_files_not_in_tar', 'No')
tc.fv(1, 'commit_message', commit_message)
tc.submit("upload_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
# Uncomment this if it becomes necessary to wait for an asynchronous process to complete after submitting an upload.
# for i in range( 5 ):
# try:
# self.check_for_strings( strings_displayed, strings_not_displayed )
# break
# except Exception as e:
# if i == 4:
# raise e
# else:
# time.sleep( 1 )
# continue
[docs] def upload_url(self,
repository,
url,
filepath,
valid_tools_only,
uncompress_file,
remove_repo_files_not_in_tar,
commit_message,
strings_displayed=None,
strings_not_displayed=None):
removed_message = 'files were removed from the repository'
if remove_repo_files_not_in_tar:
if not self.repository_is_new(repository):
if removed_message not in strings_displayed:
strings_displayed.append(removed_message)
else:
if removed_message not in strings_not_displayed:
strings_not_displayed.append(removed_message)
params = {
'repository_id': self.security.encode_id(repository.id)
}
self.visit_url('/upload/upload', params=params)
if valid_tools_only:
strings_displayed.extend(['has been successfully', 'uploaded to the repository.'])
tc.fv("1", "url", url)
if uncompress_file:
tc.fv(1, 'uncompress_file', 'Yes')
else:
tc.fv(1, 'uncompress_file', 'No')
if not self.repository_is_new(repository):
if remove_repo_files_not_in_tar:
tc.fv(1, 'remove_repo_files_not_in_tar', 'Yes')
else:
tc.fv(1, 'remove_repo_files_not_in_tar', 'No')
tc.fv(1, 'commit_message', commit_message)
tc.submit("upload_button")
self.check_for_strings(strings_displayed, strings_not_displayed)
[docs] def verify_installed_repositories(self, installed_repositories=None, uninstalled_repositories=None):
installed_repositories = installed_repositories or []
uninstalled_repositories = uninstalled_repositories or []
for repository_name, repository_owner in installed_repositories:
galaxy_repository = test_db_util.get_installed_repository_by_name_owner(repository_name, repository_owner)
if galaxy_repository:
assert galaxy_repository.status == 'Installed', \
f'Repository {repository_name} should be installed, but is {galaxy_repository.status}'
[docs] def verify_installed_repository_metadata_unchanged(self, name, owner):
installed_repository = test_db_util.get_installed_repository_by_name_owner(name, owner)
metadata = installed_repository.metadata_
self.reset_installed_repository_metadata(installed_repository)
new_metadata = installed_repository.metadata_
assert metadata == new_metadata, f'Metadata for installed repository {name} differs after metadata reset.'
[docs] def verify_installed_repository_no_tool_panel_section(self, repository):
'''Verify that there is no 'tool_panel_section' entry in the repository metadata.'''
metadata = repository.metadata_
assert 'tool_panel_section' not in metadata, f'Tool panel section incorrectly found in metadata: {metadata}'
[docs] def verify_installed_repository_data_table_entries(self, required_data_table_entries):
# The value of the received required_data_table_entries will be something like: [ 'sam_fa_indexes' ]
data_tables, error_message = xml_util.parse_xml(self.shed_tool_data_table_conf)
found = False
# With the tool shed, the "path" attribute that is hard-coded into the tool_data_tble_conf.xml
# file is ignored. This is because the tool shed requires the directory location to which this
# path points to be empty except when a specific tool is loaded. The default location for this
# directory configured for the tool shed is <Galaxy root>/shed-tool-data. When a tool is loaded
# in the tool shed, all contained .loc.sample files are copied to this directory and the
# ToolDataTableManager parses and loads the files in the same way that Galaxy does with a very
# important exception. When the tool shed loads a tool and parses and loads the copied ,loc.sample
# files, the ToolDataTableManager is already instantiated, and so its add_new_entries_from_config_file()
# method is called and the tool_data_path parameter is used to over-ride the hard-coded "tool-data"
# directory that Galaxy always uses.
#
# Tool data table xml structure:
# <tables>
# <table comment_char="#" name="sam_fa_indexes">
# <columns>line_type, value, path</columns>
# <file path="tool-data/sam_fa_indices.loc" />
# </table>
# </tables>
required_data_table_entry = None
for table_elem in data_tables.findall('table'):
# The value of table_elem will be something like: <table comment_char="#" name="sam_fa_indexes">
for required_data_table_entry in required_data_table_entries:
# The value of required_data_table_entry will be something like: 'sam_fa_indexes'
if 'name' in table_elem.attrib and table_elem.attrib['name'] == required_data_table_entry:
found = True
# We're processing something like: sam_fa_indexes
file_elem = table_elem.find('file')
# We have something like: <file path="tool-data/sam_fa_indices.loc" />
# The "path" attribute of the "file" tag is the location that Galaxy always uses because the
# Galaxy ToolDataTableManager was implemented in such a way that the hard-coded path is used
# rather than allowing the location to be a configurable setting like the tool shed requires.
file_path = file_elem.get('path', None)
# The value of file_path will be something like: "tool-data/all_fasta.loc"
assert file_path is not None, f'The "path" attribute is missing for the {required_data_table_entry} entry.'
# The following test is probably not necesary, but the tool-data directory should exist!
galaxy_tool_data_dir, loc_file_name = os.path.split(file_path)
assert galaxy_tool_data_dir is not None, f'The hard-coded Galaxy tool-data directory is missing for the {required_data_table_entry} entry.'
assert os.path.exists(galaxy_tool_data_dir), 'The Galaxy tool-data directory does not exist.'
# Make sure the loc_file_name was correctly copied into the configured directory location.
configured_file_location = os.path.join(self.tool_data_path, loc_file_name)
assert os.path.isfile(configured_file_location), f'The expected copied file "{configured_file_location}" is missing.'
# We've found the value of the required_data_table_entry in data_tables, which is the parsed
# shed_tool_data_table_conf.xml, so all is well!
break
if found:
break
# We better have an entry like: <table comment_char="#" name="sam_fa_indexes"> in our parsed data_tables
# or we know that the repository was not correctly installed!
assert found, f'No entry for {required_data_table_entry} in {self.shed_tool_data_table_conf}.'
[docs] def verify_repository_reviews(self, repository, reviewer=None, strings_displayed=None, strings_not_displayed=None):
changeset_revision = self.get_repository_tip(repository)
# Verify that the currently logged in user has a repository review for the specified repository, reviewer, and changeset revision.
strings_displayed = [repository.name, reviewer.username]
self.display_reviewed_repositories_owned_by_user(strings_displayed=strings_displayed)
# Verify that the reviewer has reviewed the specified repository's changeset revision.
strings_displayed = [repository.name, repository.description]
self.display_repository_reviews_by_user(reviewer, strings_displayed=strings_displayed)
# Load the review and check for the components passed in strings_displayed.
review = test_db_util.get_repository_review_by_user_id_changeset_revision(reviewer.id, repository.id, changeset_revision)
self.browse_component_review(review, strings_displayed=strings_displayed)
[docs] def verify_tool_metadata_for_installed_repository(self, installed_repository, strings_displayed=None, strings_not_displayed=None):
if strings_displayed is None:
strings_displayed = []
if strings_not_displayed is None:
strings_not_displayed = []
repository_id = self.security.encode_id(installed_repository.id)
for tool in installed_repository.metadata_['tools']:
strings = list(strings_displayed)
strings.extend([tool['id'], tool['description'], tool['version'], tool['guid'], tool['name']])
params = dict(repository_id=repository_id, tool_id=tool['id'])
url = '/admin_toolshed/view_tool_metadata'
self.visit_galaxy_url(url, params)
self.check_for_strings(strings, strings_not_displayed)
[docs] def verify_unchanged_repository_metadata(self, repository):
old_metadata = dict()
new_metadata = dict()
for metadata in self.get_repository_metadata(repository):
old_metadata[metadata.changeset_revision] = metadata.metadata
self.reset_repository_metadata(repository)
for metadata in self.get_repository_metadata(repository):
new_metadata[metadata.changeset_revision] = metadata.metadata
# Python's dict comparison recursively compares sorted key => value pairs and returns true if any key or value differs,
# or if the number of keys differs.
assert old_metadata == new_metadata, f'Metadata changed after reset on repository {repository.name}.'
[docs] def visit_galaxy_url(self, url, params=None, doseq=False, allowed_codes=None):
if allowed_codes is None:
allowed_codes = [200]
url = f'{self.galaxy_url}{url}'
self.visit_url(url, params=params, doseq=doseq, allowed_codes=allowed_codes)
[docs] def wait_for_repository_installation(self, repository_ids):
final_states = [galaxy_model.ToolShedRepository.installation_status.ERROR,
galaxy_model.ToolShedRepository.installation_status.INSTALLED]
# Wait until all repositories are in a final state before returning. This ensures that subsequent tests
# are running against an installed repository, and not one that is still in the process of installing.
if repository_ids:
for repository_id in repository_ids:
galaxy_repository = test_db_util.get_installed_repository_by_id(self.security.decode_id(repository_id))
timeout_counter = 0
while galaxy_repository.status not in final_states:
test_db_util.ga_refresh(galaxy_repository)
timeout_counter = timeout_counter + 1
# This timeout currently defaults to 10 minutes.
if timeout_counter > repository_installation_timeout:
raise AssertionError('Repository installation timed out, %d seconds elapsed, repository state is %s.' %
(timeout_counter, galaxy_repository.status))
break
time.sleep(1)