Source code for tool_shed.util.shed_index

import logging
import os

from mercurial import (
    hg,
    ui,
)
from sqlalchemy import (
    false,
    select,
)
from whoosh.writing import AsyncWriter

import tool_shed.webapp.model.mapping as ts_mapping
from galaxy.tool_util.loader_directory import load_tool_elements_from_path
from galaxy.tools.search import get_or_create_index
from galaxy.util import (
    directory_hash_id,
    ExecutionTimer,
    pretty_print_time_interval,
    unicodify,
)
from tool_shed.util.hgweb_config import hgweb_config_manager
from tool_shed.webapp import model
from tool_shed.webapp.search.repo_search import schema as repo_schema
from tool_shed.webapp.search.tool_search import schema as tool_schema

log = logging.getLogger(__name__)


def _get_or_create_index(whoosh_index_dir):
    tool_index_dir = os.path.join(whoosh_index_dir, "tools")
    if not os.path.exists(whoosh_index_dir):
        os.makedirs(whoosh_index_dir)
    if not os.path.exists(tool_index_dir):
        os.makedirs(tool_index_dir)
    return get_or_create_index(whoosh_index_dir, repo_schema), get_or_create_index(tool_index_dir, tool_schema)


[docs]def build_index(whoosh_index_dir, file_path, hgweb_config_dir, hgweb_repo_prefix, dburi, **kwargs): """ Build two search indexes simultaneously One is for repositories and the other for tools. Returns a tuple with number of repos and tools that were indexed. """ model = ts_mapping.init(dburi, engine_options={}, create_tables=False) sa_session = model.session repo_index, tool_index = _get_or_create_index(whoosh_index_dir) repo_index_writer = AsyncWriter(repo_index) tool_index_writer = AsyncWriter(tool_index) repos_indexed = 0 tools_indexed = 0 execution_timer = ExecutionTimer() with repo_index.searcher() as searcher: for repo in get_repos(sa_session, file_path, hgweb_config_dir, hgweb_repo_prefix, **kwargs): tools_list = repo.pop("tools_list") repo_id = repo["id"] indexed_document = searcher.document(id=repo_id) if indexed_document: if indexed_document["full_last_updated"] == repo.get("full_last_updated"): # We're done, since we sorted repos by update time break else: # Got an update, delete the previous document repo_index_writer.delete_by_term("id", repo_id) repo_index_writer.add_document(**repo) # Tools get their own index tool_index_writer.delete_by_term("repo_id", repo_id) for tool in tools_list: tool_contents = tool.copy() tool_contents["repo_owner_username"] = repo.get("repo_owner_username") tool_contents["repo_name"] = repo.get("name") tool_contents["repo_id"] = repo_id tool_index_writer.add_document(**tool_contents) tools_indexed += 1 repos_indexed += 1 tool_index_writer.commit() repo_index_writer.commit() log.info("Indexed repos: %s, tools: %s", repos_indexed, tools_indexed) log.info("Toolbox index finished %s", execution_timer) return repos_indexed, tools_indexed
[docs]def get_repos(sa_session, file_path, hgweb_config_dir, hgweb_repo_prefix, **kwargs): """ Load repos from DB and included tools from .xml configs. """ hgwcm = hgweb_config_manager hgwcm.hgweb_config_dir = hgweb_config_dir for repo in get_repositories_for_indexing(sa_session): category_names = [] for rca in get_repo_cat_associations(sa_session, repo.id): category = sa_session.get(model.Category, rca.category.id) category_names.append(category.name.lower()) categories = (",").join(category_names) repo_id = repo.id name = repo.name description = repo.description long_description = repo.long_description homepage_url = repo.homepage_url remote_repository_url = repo.remote_repository_url times_downloaded = repo.times_downloaded or 0 repo_owner_username = "" if repo.user_id is not None: user = sa_session.get(model.User, repo.user_id) repo_owner_username = user.username.lower() last_updated = pretty_print_time_interval(repo.update_time) full_last_updated = repo.update_time.strftime("%Y-%m-%d %I:%M %p") # Load all changesets of the repo for lineage. repo_path = os.path.join( hgweb_config_dir, hgwcm.get_entry(os.path.join(hgweb_repo_prefix, repo.user.username, repo.name)) ) hg_repo = hg.repository(ui.ui(), repo_path.encode("utf-8")) lineage = [] for changeset in hg_repo.changelog: lineage.append(f"{unicodify(changeset)}:{unicodify(hg_repo[changeset])}") repo_lineage = str(lineage) # Parse all the tools within repo for a separate index. tools_list = [] path = os.path.join(file_path, *directory_hash_id(repo.id)) path = os.path.join(path, "repo_%d" % repo.id) if os.path.exists(path): tools_list.extend(load_one_dir(path)) for root, dirs, _files in os.walk(path): if ".hg" in dirs: dirs.remove(".hg") for dirname in dirs: tools_in_dir = load_one_dir(os.path.join(root, dirname)) tools_list.extend(tools_in_dir) yield ( dict( id=unicodify(repo_id), name=unicodify(name), description=unicodify(description), long_description=unicodify(long_description), homepage_url=unicodify(homepage_url), remote_repository_url=unicodify(remote_repository_url), repo_owner_username=unicodify(repo_owner_username), times_downloaded=unicodify(times_downloaded), approved=unicodify("no"), last_updated=unicodify(last_updated), full_last_updated=unicodify(full_last_updated), tools_list=tools_list, repo_lineage=unicodify(repo_lineage), categories=unicodify(categories), ) )
[docs]def debug_handler(path, exc_info): """ By default the underlying tool parsing logs warnings for each exception. This is very chatty hence this metod changes it to debug level. """ log.debug(f"Failed to load tool with path {path}.", exc_info=exc_info)
[docs]def load_one_dir(path): tools_in_dir = [] tool_elems = load_tool_elements_from_path(path, load_exception_handler=debug_handler) if tool_elems: for elem in tool_elems: root = elem[1].getroot() if root.tag == "tool": tool = {} if root.find("help") is not None: tool.update(dict(help=unicodify(root.find("help").text))) if root.find("description") is not None: tool.update(dict(description=unicodify(root.find("description").text))) tool.update( dict( id=unicodify(root.attrib.get("id")), name=unicodify(root.attrib.get("name")), version=unicodify(root.attrib.get("version")), ) ) tools_in_dir.append(tool) return tools_in_dir
[docs]def get_repositories_for_indexing(session): # Do not index deleted, deprecated, or "tool_dependency_definition" type repositories. Repository = model.Repository stmt = ( select(Repository) .where(Repository.deleted == false()) .where(Repository.deprecated == false()) .where(Repository.type != "tool_dependency_definition") .order_by(Repository.update_time.desc()) ) return session.scalars(stmt)
[docs]def get_repo_cat_associations(session, repository_id): stmt = select(model.RepositoryCategoryAssociation).where( model.RepositoryCategoryAssociation.repository_id == repository_id ) return session.scalars(stmt)