Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.tool_shed.common_util
import errno
import json
import logging
import os
from urllib.parse import urljoin
from routes import url_for
from galaxy import util
from galaxy.util.tool_shed import encoding_util, xml_util
log = logging.getLogger(__name__)
REPOSITORY_OWNER = 'devteam'
# not valid for installed Galaxy, fix
MIGRATE_DIR = os.path.abspath(os.path.join(
os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'galaxy', 'tool_shed', 'galaxy_install', 'migrate'))
TOOL_MIGRATION_SCRIPTS_DIR = os.path.join(MIGRATE_DIR, 'scripts')
TOOL_MIGRATION_VERSIONS_DIR = os.path.join(MIGRATE_DIR, 'versions')
[docs]def accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies):
if tool_shed_accessible:
if tool_dependencies:
for tool_dependency in tool_dependencies:
if tool_dependency not in all_tool_dependencies:
all_tool_dependencies.append(tool_dependency)
return all_tool_dependencies
[docs]def check_for_missing_tools(app, tool_panel_configs, latest_tool_migration_script_number):
# Get the 000x_tools.xml file associated with the current migrate_tools version number.
tools_xml_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, 'galaxy_install',
'migrate', 'scripts',
'%04d_tools.xml' % latest_tool_migration_script_number))
# Parse the XML and load the file attributes for later checking against the proprietary tool_panel_config.
migrated_tool_configs_dict = {}
tree, error_message = xml_util.parse_xml(tools_xml_file_path)
if tree is None:
return False, {}
root = tree.getroot()
tool_shed = root.get('name')
tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
# The default behavior is that the tool shed is down.
tool_shed_accessible = False
missing_tool_configs_dict = {}
if tool_shed_url:
for elem in root:
if elem.tag == 'repository':
repository_dependencies = []
all_tool_dependencies = []
repository_name = elem.get('name')
changeset_revision = elem.get('changeset_revision')
tool_shed_accessible, repository_dependencies_dict = get_repository_dependencies(app,
tool_shed_url,
repository_name,
REPOSITORY_OWNER,
changeset_revision)
if tool_shed_accessible:
# Accumulate all tool dependencies defined for repository dependencies for display to the user.
for rd_key, rd_tups in repository_dependencies_dict.items():
if rd_key in ['root_key', 'description']:
continue
for rd_tup in rd_tups:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
parse_repository_dependency_tuple(rd_tup)
tool_shed_accessible, tool_dependencies = get_tool_dependencies(app,
tool_shed_url,
name,
owner,
changeset_revision)
all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies)
tool_shed_accessible, tool_dependencies = get_tool_dependencies(app,
tool_shed_url,
repository_name,
REPOSITORY_OWNER,
changeset_revision)
all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies)
for tool_elem in elem.findall('tool'):
tool_config_file_name = tool_elem.get('file')
if tool_config_file_name:
# We currently do nothing with repository dependencies except install them (we do not display repositories that will be
# installed to the user). However, we'll store them in the following dictionary in case we choose to display them in the
# future.
dependencies_dict = dict(tool_dependencies=all_tool_dependencies,
repository_dependencies=repository_dependencies)
migrated_tool_configs_dict[tool_config_file_name] = dependencies_dict
else:
break
if tool_shed_accessible:
# Parse the proprietary tool_panel_configs (the default is tool_conf.xml) and generate the list of missing tool config file names.
for tool_panel_config in tool_panel_configs:
tree, error_message = xml_util.parse_xml(tool_panel_config)
if tree:
root = tree.getroot()
for elem in root:
if elem.tag == 'tool':
missing_tool_configs_dict = check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict)
elif elem.tag == 'section':
for section_elem in elem:
if section_elem.tag == 'tool':
missing_tool_configs_dict = check_tool_tag_set(section_elem, migrated_tool_configs_dict, missing_tool_configs_dict)
else:
exception_msg = f'\n\nThe entry for the main Galaxy tool shed at {tool_shed} is missing from the {app.config.tool_sheds_config} file. '
exception_msg += 'The entry for this tool shed must always be available in this file, so re-add it before attempting to start your Galaxy server.\n'
raise Exception(exception_msg)
return tool_shed_accessible, missing_tool_configs_dict
[docs]def check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict):
file_path = elem.get('file', None)
if file_path:
name = os.path.basename(file_path)
for migrated_tool_config in migrated_tool_configs_dict.keys():
if migrated_tool_config in [file_path, name]:
missing_tool_configs_dict[name] = migrated_tool_configs_dict[migrated_tool_config]
return missing_tool_configs_dict
[docs]def generate_clone_url_for_installed_repository(app, repository):
"""Generate the URL for cloning a repository that has been installed into a Galaxy instance."""
tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, str(repository.tool_shed))
return util.build_url(tool_shed_url, pathspec=['repos', str(repository.owner), str(repository.name)])
[docs]def generate_clone_url_for_repository_in_tool_shed(user, repository):
"""Generate the URL for cloning a repository that is in the tool shed."""
base_url = url_for('/', qualified=True).rstrip('/')
if user:
protocol, base = base_url.split('://')
username = '%s@' % user.username
return f'{protocol}://{username}{base}/repos/{repository.user.username}/{repository.name}'
else:
return f'{base_url}/repos/{repository.user.username}/{repository.name}'
[docs]def generate_clone_url_from_repo_info_tup(app, repo_info_tup):
"""Generate the URL for cloning a repository given a tuple of toolshed, name, owner, changeset_revision."""
# Example tuple: ['http://localhost:9009', 'blast_datatypes', 'test', '461a4216e8ab', False]
toolshed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
parse_repository_dependency_tuple(repo_info_tup)
tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, toolshed)
# Don't include the changeset_revision in clone urls.
return util.build_url(tool_shed_url, pathspec=['repos', owner, name])
[docs]def get_non_shed_tool_panel_configs(app):
"""Get the non-shed related tool panel configs - there can be more than one, and the default is tool_conf.xml."""
config_filenames = []
for config_filename in app.config.tool_configs:
# Any config file that includes a tool_path attribute in the root tag set like the following is shed-related.
# <toolbox tool_path="database/shed_tools">
try:
tree, error_message = xml_util.parse_xml(config_filename)
except OSError as exc:
if (config_filename == app.config.shed_tool_conf and not
app.config.shed_tool_conf_set and
exc.errno == errno.ENOENT):
continue
raise
if tree is None:
continue
root = tree.getroot()
tool_path = root.get('tool_path', None)
if tool_path is None:
config_filenames.append(config_filename)
return config_filenames
[docs]def get_repository_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision):
repository_dependencies_dict = {}
tool_shed_accessible = True
params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision)
pathspec = ['repository', 'get_repository_dependencies']
try:
raw_text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params)
tool_shed_accessible = True
except Exception as e:
tool_shed_accessible = False
log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e)
if tool_shed_accessible:
if len(raw_text) > 2:
encoded_text = json.loads(util.unicodify(raw_text))
repository_dependencies_dict = encoding_util.tool_shed_decode(encoded_text)
return tool_shed_accessible, repository_dependencies_dict
[docs]def get_protocol_from_tool_shed_url(tool_shed_url):
"""Return the protocol from the received tool_shed_url if it exists."""
try:
if tool_shed_url.find('://') > 0:
return tool_shed_url.split('://')[0].lower()
except Exception:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if tool_shed_url is not None:
log.exception("Handled exception getting the protocol from Tool Shed URL %s", str(tool_shed_url))
# Default to HTTP protocol.
return 'http'
[docs]def get_tool_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision):
tool_dependencies = []
tool_shed_accessible = True
params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision)
pathspec = ['repository', 'get_tool_dependencies']
try:
text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params)
tool_shed_accessible = True
except Exception as e:
tool_shed_accessible = False
log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e)
if tool_shed_accessible:
if text:
tool_dependencies_dict = encoding_util.tool_shed_decode(text)
for requirements_dict in tool_dependencies_dict.values():
tool_dependency_name = requirements_dict['name']
tool_dependency_version = requirements_dict['version']
tool_dependency_type = requirements_dict['type']
tool_dependencies.append((tool_dependency_name, tool_dependency_version, tool_dependency_type))
return tool_shed_accessible, tool_dependencies
[docs]def get_tool_shed_repository_ids(as_string=False, **kwd):
tsrid = kwd.get('tool_shed_repository_id', None)
tsridslist = util.listify(kwd.get('tool_shed_repository_ids', None))
if not tsridslist:
tsridslist = util.listify(kwd.get('id', None))
if tsridslist is not None:
if tsrid is not None and tsrid not in tsridslist:
tsridslist.append(tsrid)
if as_string:
return ','.join(tsridslist)
return tsridslist
else:
tsridslist = util.listify(kwd.get('ordered_tsr_ids', None))
if tsridslist is not None:
if as_string:
return ','.join(tsridslist)
return tsridslist
if as_string:
return ''
return []
[docs]def get_tool_shed_url_from_tool_shed_registry(app, tool_shed):
"""
The value of tool_shed is something like: toolshed.g2.bx.psu.edu. We need the URL to this tool shed, which is
something like: http://toolshed.g2.bx.psu.edu/
"""
cleaned_tool_shed = remove_protocol_from_tool_shed_url(tool_shed)
for shed_url in app.tool_shed_registry.tool_sheds.values():
if shed_url.find(cleaned_tool_shed) >= 0:
if shed_url.endswith('/'):
shed_url = shed_url.rstrip('/')
return shed_url
# The tool shed from which the repository was originally installed must no longer be configured in tool_sheds_conf.xml.
return None
[docs]def get_tool_shed_repository_url(app, tool_shed, owner, name):
tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
if tool_shed_url:
# Append a slash to the tool shed URL, because urlparse.urljoin will eliminate
# the last part of a URL if it does not end with a forward slash.
tool_shed_url = '%s/' % tool_shed_url
return urljoin(tool_shed_url, f'view/{owner}/{name}')
return tool_shed_url
[docs]def get_user_by_username(app, username):
"""Get a user from the database by username."""
sa_session = app.model.context.current
try:
user = sa_session.query(app.model.User) \
.filter(app.model.User.table.c.username == username) \
.one()
return user
except Exception:
return None
[docs]def handle_galaxy_url(trans, **kwd):
galaxy_url = kwd.get('galaxy_url', None)
if galaxy_url:
trans.set_cookie(galaxy_url, name='toolshedgalaxyurl')
else:
galaxy_url = trans.get_cookie(name='toolshedgalaxyurl')
return galaxy_url
[docs]def handle_tool_shed_url_protocol(app, shed_url):
"""Handle secure and insecure HTTP protocol since they may change over time."""
try:
if app.name == 'galaxy':
url = remove_protocol_from_tool_shed_url(shed_url)
tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, url)
else:
tool_shed_url = str(url_for('/', qualified=True)).rstrip('/')
return tool_shed_url
except Exception:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if shed_url is not None:
log.exception("Handled exception removing protocol from URL %s", str(shed_url))
return shed_url
[docs]def parse_repository_dependency_tuple(repository_dependency_tuple, contains_error=False):
# Default both prior_installation_required and only_if_compiling_contained_td to False in cases where metadata should be reset on the
# repository containing the repository_dependency definition.
prior_installation_required = 'False'
only_if_compiling_contained_td = 'False'
if contains_error:
if len(repository_dependency_tuple) == 5:
tool_shed, name, owner, changeset_revision, error = repository_dependency_tuple
elif len(repository_dependency_tuple) == 6:
tool_shed, name, owner, changeset_revision, prior_installation_required, error = repository_dependency_tuple
elif len(repository_dependency_tuple) == 7:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error = \
repository_dependency_tuple
return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error
else:
if len(repository_dependency_tuple) == 4:
tool_shed, name, owner, changeset_revision = repository_dependency_tuple
elif len(repository_dependency_tuple) == 5:
tool_shed, name, owner, changeset_revision, prior_installation_required = repository_dependency_tuple
elif len(repository_dependency_tuple) == 6:
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = repository_dependency_tuple
return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td
[docs]def remove_port_from_tool_shed_url(tool_shed_url):
"""Return a partial Tool Shed URL, eliminating the port if it exists."""
try:
if tool_shed_url.find(':') > 0:
# Eliminate the port, if any, since it will result in an invalid directory name.
new_tool_shed_url = tool_shed_url.split(':')[0]
else:
new_tool_shed_url = tool_shed_url
return new_tool_shed_url.rstrip('/')
except Exception:
# We receive a lot of calls here where the tool_shed_url is None. The container_util uses
# that value when creating a header row. If the tool_shed_url is not None, we have a problem.
if tool_shed_url is not None:
log.exception("Handled exception removing the port from Tool Shed URL %s", str(tool_shed_url))
return tool_shed_url
[docs]def remove_protocol_and_port_from_tool_shed_url(tool_shed_url):
"""Return a partial Tool Shed URL, eliminating the protocol and/or port if either exists."""
tool_shed = remove_protocol_from_tool_shed_url(tool_shed_url)
tool_shed = remove_port_from_tool_shed_url(tool_shed)
return tool_shed
[docs]def remove_protocol_and_user_from_clone_url(repository_clone_url):
"""Return a URL that can be used to clone a repository, eliminating the protocol and user if either exists."""
if repository_clone_url.find('@') > 0:
# We have an url that includes an authenticated user, something like:
# http://test@bx.psu.edu:9009/repos/some_username/column
items = repository_clone_url.split('@')
tmp_url = items[1]
elif repository_clone_url.find('//') > 0:
# We have an url that includes only a protocol, something like:
# http://bx.psu.edu:9009/repos/some_username/column
items = repository_clone_url.split('//')
tmp_url = items[1]
else:
tmp_url = repository_clone_url
return tmp_url.rstrip('/')
[docs]def remove_protocol_from_tool_shed_url(tool_shed_url):
"""Return a partial Tool Shed URL, eliminating the protocol if it exists."""
return util.remove_protocol_from_url(tool_shed_url)