Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.tool_shed.common_util

import errno
import json
import logging
import os
from urllib.parse import urljoin

from routes import url_for

from galaxy import util
from galaxy.util.tool_shed import encoding_util, xml_util

log = logging.getLogger(__name__)

REPOSITORY_OWNER = 'devteam'
# not valid for installed Galaxy, fix
MIGRATE_DIR = os.path.abspath(os.path.join(
    os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'galaxy', 'tool_shed', 'galaxy_install', 'migrate'))
TOOL_MIGRATION_SCRIPTS_DIR = os.path.join(MIGRATE_DIR, 'scripts')
TOOL_MIGRATION_VERSIONS_DIR = os.path.join(MIGRATE_DIR, 'versions')


[docs]def accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies): if tool_shed_accessible: if tool_dependencies: for tool_dependency in tool_dependencies: if tool_dependency not in all_tool_dependencies: all_tool_dependencies.append(tool_dependency) return all_tool_dependencies
[docs]def check_for_missing_tools(app, tool_panel_configs, latest_tool_migration_script_number): # Get the 000x_tools.xml file associated with the current migrate_tools version number. tools_xml_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, 'galaxy_install', 'migrate', 'scripts', '%04d_tools.xml' % latest_tool_migration_script_number)) # Parse the XML and load the file attributes for later checking against the proprietary tool_panel_config. migrated_tool_configs_dict = {} tree, error_message = xml_util.parse_xml(tools_xml_file_path) if tree is None: return False, {} root = tree.getroot() tool_shed = root.get('name') tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed) # The default behavior is that the tool shed is down. tool_shed_accessible = False missing_tool_configs_dict = {} if tool_shed_url: for elem in root: if elem.tag == 'repository': repository_dependencies = [] all_tool_dependencies = [] repository_name = elem.get('name') changeset_revision = elem.get('changeset_revision') tool_shed_accessible, repository_dependencies_dict = get_repository_dependencies(app, tool_shed_url, repository_name, REPOSITORY_OWNER, changeset_revision) if tool_shed_accessible: # Accumulate all tool dependencies defined for repository dependencies for display to the user. for rd_key, rd_tups in repository_dependencies_dict.items(): if rd_key in ['root_key', 'description']: continue for rd_tup in rd_tups: tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \ parse_repository_dependency_tuple(rd_tup) tool_shed_accessible, tool_dependencies = get_tool_dependencies(app, tool_shed_url, name, owner, changeset_revision) all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies) tool_shed_accessible, tool_dependencies = get_tool_dependencies(app, tool_shed_url, repository_name, REPOSITORY_OWNER, changeset_revision) all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies) for tool_elem in elem.findall('tool'): tool_config_file_name = tool_elem.get('file') if tool_config_file_name: # We currently do nothing with repository dependencies except install them (we do not display repositories that will be # installed to the user). However, we'll store them in the following dictionary in case we choose to display them in the # future. dependencies_dict = dict(tool_dependencies=all_tool_dependencies, repository_dependencies=repository_dependencies) migrated_tool_configs_dict[tool_config_file_name] = dependencies_dict else: break if tool_shed_accessible: # Parse the proprietary tool_panel_configs (the default is tool_conf.xml) and generate the list of missing tool config file names. for tool_panel_config in tool_panel_configs: tree, error_message = xml_util.parse_xml(tool_panel_config) if tree: root = tree.getroot() for elem in root: if elem.tag == 'tool': missing_tool_configs_dict = check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict) elif elem.tag == 'section': for section_elem in elem: if section_elem.tag == 'tool': missing_tool_configs_dict = check_tool_tag_set(section_elem, migrated_tool_configs_dict, missing_tool_configs_dict) else: exception_msg = f'\n\nThe entry for the main Galaxy tool shed at {tool_shed} is missing from the {app.config.tool_sheds_config} file. ' exception_msg += 'The entry for this tool shed must always be available in this file, so re-add it before attempting to start your Galaxy server.\n' raise Exception(exception_msg) return tool_shed_accessible, missing_tool_configs_dict
[docs]def check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict): file_path = elem.get('file', None) if file_path: name = os.path.basename(file_path) for migrated_tool_config in migrated_tool_configs_dict.keys(): if migrated_tool_config in [file_path, name]: missing_tool_configs_dict[name] = migrated_tool_configs_dict[migrated_tool_config] return missing_tool_configs_dict
[docs]def generate_clone_url_for_installed_repository(app, repository): """Generate the URL for cloning a repository that has been installed into a Galaxy instance.""" tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, str(repository.tool_shed)) return util.build_url(tool_shed_url, pathspec=['repos', str(repository.owner), str(repository.name)])
[docs]def generate_clone_url_for_repository_in_tool_shed(user, repository): """Generate the URL for cloning a repository that is in the tool shed.""" base_url = url_for('/', qualified=True).rstrip('/') if user: protocol, base = base_url.split('://') username = '%s@' % user.username return f'{protocol}://{username}{base}/repos/{repository.user.username}/{repository.name}' else: return f'{base_url}/repos/{repository.user.username}/{repository.name}'
[docs]def generate_clone_url_from_repo_info_tup(app, repo_info_tup): """Generate the URL for cloning a repository given a tuple of toolshed, name, owner, changeset_revision.""" # Example tuple: ['http://localhost:9009', 'blast_datatypes', 'test', '461a4216e8ab', False] toolshed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \ parse_repository_dependency_tuple(repo_info_tup) tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, toolshed) # Don't include the changeset_revision in clone urls. return util.build_url(tool_shed_url, pathspec=['repos', owner, name])
[docs]def get_non_shed_tool_panel_configs(app): """Get the non-shed related tool panel configs - there can be more than one, and the default is tool_conf.xml.""" config_filenames = [] for config_filename in app.config.tool_configs: # Any config file that includes a tool_path attribute in the root tag set like the following is shed-related. # <toolbox tool_path="database/shed_tools"> try: tree, error_message = xml_util.parse_xml(config_filename) except OSError as exc: if (config_filename == app.config.shed_tool_conf and not app.config.shed_tool_conf_set and exc.errno == errno.ENOENT): continue raise if tree is None: continue root = tree.getroot() tool_path = root.get('tool_path', None) if tool_path is None: config_filenames.append(config_filename) return config_filenames
[docs]def get_repository_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision): repository_dependencies_dict = {} tool_shed_accessible = True params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision) pathspec = ['repository', 'get_repository_dependencies'] try: raw_text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params) tool_shed_accessible = True except Exception as e: tool_shed_accessible = False log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e) if tool_shed_accessible: if len(raw_text) > 2: encoded_text = json.loads(util.unicodify(raw_text)) repository_dependencies_dict = encoding_util.tool_shed_decode(encoded_text) return tool_shed_accessible, repository_dependencies_dict
[docs]def get_protocol_from_tool_shed_url(tool_shed_url): """Return the protocol from the received tool_shed_url if it exists.""" try: if tool_shed_url.find('://') > 0: return tool_shed_url.split('://')[0].lower() except Exception: # We receive a lot of calls here where the tool_shed_url is None. The container_util uses # that value when creating a header row. If the tool_shed_url is not None, we have a problem. if tool_shed_url is not None: log.exception("Handled exception getting the protocol from Tool Shed URL %s", str(tool_shed_url)) # Default to HTTP protocol. return 'http'
[docs]def get_tool_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision): tool_dependencies = [] tool_shed_accessible = True params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision) pathspec = ['repository', 'get_tool_dependencies'] try: text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params) tool_shed_accessible = True except Exception as e: tool_shed_accessible = False log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e) if tool_shed_accessible: if text: tool_dependencies_dict = encoding_util.tool_shed_decode(text) for requirements_dict in tool_dependencies_dict.values(): tool_dependency_name = requirements_dict['name'] tool_dependency_version = requirements_dict['version'] tool_dependency_type = requirements_dict['type'] tool_dependencies.append((tool_dependency_name, tool_dependency_version, tool_dependency_type)) return tool_shed_accessible, tool_dependencies
[docs]def get_tool_shed_repository_ids(as_string=False, **kwd): tsrid = kwd.get('tool_shed_repository_id', None) tsridslist = util.listify(kwd.get('tool_shed_repository_ids', None)) if not tsridslist: tsridslist = util.listify(kwd.get('id', None)) if tsridslist is not None: if tsrid is not None and tsrid not in tsridslist: tsridslist.append(tsrid) if as_string: return ','.join(tsridslist) return tsridslist else: tsridslist = util.listify(kwd.get('ordered_tsr_ids', None)) if tsridslist is not None: if as_string: return ','.join(tsridslist) return tsridslist if as_string: return '' return []
[docs]def get_tool_shed_url_from_tool_shed_registry(app, tool_shed): """ The value of tool_shed is something like: toolshed.g2.bx.psu.edu. We need the URL to this tool shed, which is something like: http://toolshed.g2.bx.psu.edu/ """ cleaned_tool_shed = remove_protocol_from_tool_shed_url(tool_shed) for shed_url in app.tool_shed_registry.tool_sheds.values(): if shed_url.find(cleaned_tool_shed) >= 0: if shed_url.endswith('/'): shed_url = shed_url.rstrip('/') return shed_url # The tool shed from which the repository was originally installed must no longer be configured in tool_sheds_conf.xml. return None
[docs]def get_tool_shed_repository_url(app, tool_shed, owner, name): tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed) if tool_shed_url: # Append a slash to the tool shed URL, because urlparse.urljoin will eliminate # the last part of a URL if it does not end with a forward slash. tool_shed_url = '%s/' % tool_shed_url return urljoin(tool_shed_url, f'view/{owner}/{name}') return tool_shed_url
[docs]def get_user_by_username(app, username): """Get a user from the database by username.""" sa_session = app.model.context.current try: user = sa_session.query(app.model.User) \ .filter(app.model.User.table.c.username == username) \ .one() return user except Exception: return None
[docs]def handle_galaxy_url(trans, **kwd): galaxy_url = kwd.get('galaxy_url', None) if galaxy_url: trans.set_cookie(galaxy_url, name='toolshedgalaxyurl') else: galaxy_url = trans.get_cookie(name='toolshedgalaxyurl') return galaxy_url
[docs]def handle_tool_shed_url_protocol(app, shed_url): """Handle secure and insecure HTTP protocol since they may change over time.""" try: if app.name == 'galaxy': url = remove_protocol_from_tool_shed_url(shed_url) tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, url) else: tool_shed_url = str(url_for('/', qualified=True)).rstrip('/') return tool_shed_url except Exception: # We receive a lot of calls here where the tool_shed_url is None. The container_util uses # that value when creating a header row. If the tool_shed_url is not None, we have a problem. if shed_url is not None: log.exception("Handled exception removing protocol from URL %s", str(shed_url)) return shed_url
[docs]def parse_repository_dependency_tuple(repository_dependency_tuple, contains_error=False): # Default both prior_installation_required and only_if_compiling_contained_td to False in cases where metadata should be reset on the # repository containing the repository_dependency definition. prior_installation_required = 'False' only_if_compiling_contained_td = 'False' if contains_error: if len(repository_dependency_tuple) == 5: tool_shed, name, owner, changeset_revision, error = repository_dependency_tuple elif len(repository_dependency_tuple) == 6: tool_shed, name, owner, changeset_revision, prior_installation_required, error = repository_dependency_tuple elif len(repository_dependency_tuple) == 7: tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error = \ repository_dependency_tuple return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error else: if len(repository_dependency_tuple) == 4: tool_shed, name, owner, changeset_revision = repository_dependency_tuple elif len(repository_dependency_tuple) == 5: tool_shed, name, owner, changeset_revision, prior_installation_required = repository_dependency_tuple elif len(repository_dependency_tuple) == 6: tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = repository_dependency_tuple return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td
[docs]def remove_port_from_tool_shed_url(tool_shed_url): """Return a partial Tool Shed URL, eliminating the port if it exists.""" try: if tool_shed_url.find(':') > 0: # Eliminate the port, if any, since it will result in an invalid directory name. new_tool_shed_url = tool_shed_url.split(':')[0] else: new_tool_shed_url = tool_shed_url return new_tool_shed_url.rstrip('/') except Exception: # We receive a lot of calls here where the tool_shed_url is None. The container_util uses # that value when creating a header row. If the tool_shed_url is not None, we have a problem. if tool_shed_url is not None: log.exception("Handled exception removing the port from Tool Shed URL %s", str(tool_shed_url)) return tool_shed_url
[docs]def remove_protocol_and_port_from_tool_shed_url(tool_shed_url): """Return a partial Tool Shed URL, eliminating the protocol and/or port if either exists.""" tool_shed = remove_protocol_from_tool_shed_url(tool_shed_url) tool_shed = remove_port_from_tool_shed_url(tool_shed) return tool_shed
[docs]def remove_protocol_and_user_from_clone_url(repository_clone_url): """Return a URL that can be used to clone a repository, eliminating the protocol and user if either exists.""" if repository_clone_url.find('@') > 0: # We have an url that includes an authenticated user, something like: # http://test@bx.psu.edu:9009/repos/some_username/column items = repository_clone_url.split('@') tmp_url = items[1] elif repository_clone_url.find('//') > 0: # We have an url that includes only a protocol, something like: # http://bx.psu.edu:9009/repos/some_username/column items = repository_clone_url.split('//') tmp_url = items[1] else: tmp_url = repository_clone_url return tmp_url.rstrip('/')
[docs]def remove_protocol_from_tool_shed_url(tool_shed_url): """Return a partial Tool Shed URL, eliminating the protocol if it exists.""" return util.remove_protocol_from_url(tool_shed_url)