Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.shed_index

import logging
import os

from mercurial import hg, ui
from whoosh.writing import AsyncWriter

import tool_shed.webapp.model.mapping as ts_mapping
from galaxy.tool_util.loader_directory import load_tool_elements_from_path
from galaxy.tools.search import get_or_create_index
from galaxy.util import (
    directory_hash_id,
    ExecutionTimer,
    pretty_print_time_interval,
    unicodify
)
from tool_shed.util.hgweb_config import hgweb_config_manager
from tool_shed.webapp import model
from tool_shed.webapp.search.repo_search import schema as repo_schema
from tool_shed.webapp.search.tool_search import schema as tool_schema

log = logging.getLogger(__name__)


def _get_or_create_index(whoosh_index_dir):
    tool_index_dir = os.path.join(whoosh_index_dir, 'tools')
    if not os.path.exists(whoosh_index_dir):
        os.makedirs(whoosh_index_dir)
    if not os.path.exists(tool_index_dir):
        os.makedirs(tool_index_dir)
    return get_or_create_index(whoosh_index_dir, repo_schema), get_or_create_index(tool_index_dir, tool_schema)


[docs]def build_index(whoosh_index_dir, file_path, hgweb_config_dir, dburi, **kwargs): """ Build two search indexes simultaneously One is for repositories and the other for tools. Returns a tuple with number of repos and tools that were indexed. """ model = ts_mapping.init(file_path, dburi, engine_options={}, create_tables=False) sa_session = model.session repo_index, tool_index = _get_or_create_index(whoosh_index_dir) repo_index_writer = AsyncWriter(repo_index) tool_index_writer = AsyncWriter(tool_index) repos_indexed = 0 tools_indexed = 0 execution_timer = ExecutionTimer() with repo_index.searcher() as searcher: for repo in get_repos(sa_session, file_path, hgweb_config_dir, **kwargs): tools_list = repo.pop('tools_list') repo_id = repo['id'] indexed_document = searcher.document(id=repo_id) if indexed_document: if indexed_document['full_last_updated'] == repo.get('full_last_updated'): # We're done, since we sorted repos by update time break else: # Got an update, delete the previous document repo_index_writer.delete_by_term('id', repo_id) repo_index_writer.add_document(**repo) # Tools get their own index for tool in tools_list: tool_index_writer.add_document(**tool) tools_indexed += 1 repos_indexed += 1 tool_index_writer.commit() repo_index_writer.commit() log.info("Indexed repos: %s, tools: %s", repos_indexed, tools_indexed) log.info("Toolbox index finished %s", execution_timer) return repos_indexed, tools_indexed
[docs]def get_repos(sa_session, file_path, hgweb_config_dir, **kwargs): """ Load repos from DB and included tools from .xml configs. """ hgwcm = hgweb_config_manager hgwcm.hgweb_config_dir = hgweb_config_dir # Do not index deleted, deprecated, or "tool_dependency_definition" type repositories. q = sa_session.query(model.Repository).filter_by(deleted=False).filter_by(deprecated=False).order_by(model.Repository.update_time.desc()) q = q.filter(model.Repository.type != 'tool_dependency_definition') for repo in q: category_names = [] for rca in sa_session.query(model.RepositoryCategoryAssociation).filter(model.RepositoryCategoryAssociation.repository_id == repo.id): for category in sa_session.query(model.Category).filter(model.Category.id == rca.category.id): category_names.append(category.name.lower()) categories = (",").join(category_names) repo_id = repo.id name = repo.name description = repo.description long_description = repo.long_description homepage_url = repo.homepage_url remote_repository_url = repo.remote_repository_url times_downloaded = repo.times_downloaded or 0 repo_owner_username = '' if repo.user_id is not None: user = sa_session.query(model.User).filter(model.User.id == repo.user_id).one() repo_owner_username = user.username.lower() approved = 'no' for review in repo.reviews: if review.approved == 'yes': approved = 'yes' break last_updated = pretty_print_time_interval(repo.update_time) full_last_updated = repo.update_time.strftime("%Y-%m-%d %I:%M %p") # Load all changesets of the repo for lineage. repo_path = os.path.join(hgweb_config_dir, hgwcm.get_entry(os.path.join("repos", repo.user.username, repo.name))) hg_repo = hg.repository(ui.ui(), repo_path.encode('utf-8')) lineage = [] for changeset in hg_repo.changelog: lineage.append(f"{unicodify(changeset)}:{unicodify(hg_repo[changeset])}") repo_lineage = str(lineage) # Parse all the tools within repo for a separate index. tools_list = [] path = os.path.join(file_path, *directory_hash_id(repo.id)) path = os.path.join(path, "repo_%d" % repo.id) if os.path.exists(path): tools_list.extend(load_one_dir(path)) for root, dirs, _files in os.walk(path): if '.hg' in dirs: dirs.remove('.hg') for dirname in dirs: tools_in_dir = load_one_dir(os.path.join(root, dirname)) tools_list.extend(tools_in_dir) yield (dict(id=unicodify(repo_id), name=unicodify(name), description=unicodify(description), long_description=unicodify(long_description), homepage_url=unicodify(homepage_url), remote_repository_url=unicodify(remote_repository_url), repo_owner_username=unicodify(repo_owner_username), times_downloaded=unicodify(times_downloaded), approved=unicodify(approved), last_updated=unicodify(last_updated), full_last_updated=unicodify(full_last_updated), tools_list=tools_list, repo_lineage=unicodify(repo_lineage), categories=unicodify(categories)))
[docs]def debug_handler(path, exc_info): """ By default the underlying tool parsing logs warnings for each exception. This is very chatty hence this metod changes it to debug level. """ log.debug(f"Failed to load tool with path {path}.", exc_info=exc_info)
[docs]def load_one_dir(path): tools_in_dir = [] tool_elems = load_tool_elements_from_path(path, load_exception_handler=debug_handler) if tool_elems: for elem in tool_elems: root = elem[1].getroot() if root.tag == 'tool': tool = {} if root.find('help') is not None: tool.update(dict(help=unicodify(root.find('help').text))) if root.find('description') is not None: tool.update(dict(description=unicodify(root.find('description').text))) tool.update(dict(id=unicodify(root.attrib.get('id')), name=unicodify(root.attrib.get('name')), version=unicodify(root.attrib.get('version')))) tools_in_dir.append(tool) return tools_in_dir