Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.shed_index
import logging
import os
from mercurial import (
hg,
ui,
)
from whoosh.writing import AsyncWriter
import tool_shed.webapp.model.mapping as ts_mapping
from galaxy.tool_util.loader_directory import load_tool_elements_from_path
from galaxy.tools.search import get_or_create_index
from galaxy.util import (
directory_hash_id,
ExecutionTimer,
pretty_print_time_interval,
unicodify,
)
from tool_shed.util.hgweb_config import hgweb_config_manager
from tool_shed.webapp import model
from tool_shed.webapp.search.repo_search import schema as repo_schema
from tool_shed.webapp.search.tool_search import schema as tool_schema
log = logging.getLogger(__name__)
def _get_or_create_index(whoosh_index_dir):
tool_index_dir = os.path.join(whoosh_index_dir, "tools")
if not os.path.exists(whoosh_index_dir):
os.makedirs(whoosh_index_dir)
if not os.path.exists(tool_index_dir):
os.makedirs(tool_index_dir)
return get_or_create_index(whoosh_index_dir, repo_schema), get_or_create_index(tool_index_dir, tool_schema)
[docs]def build_index(whoosh_index_dir, file_path, hgweb_config_dir, hgweb_repo_prefix, dburi, **kwargs):
"""
Build two search indexes simultaneously
One is for repositories and the other for tools.
Returns a tuple with number of repos and tools that were indexed.
"""
model = ts_mapping.init(dburi, engine_options={}, create_tables=False)
sa_session = model.session
repo_index, tool_index = _get_or_create_index(whoosh_index_dir)
repo_index_writer = AsyncWriter(repo_index)
tool_index_writer = AsyncWriter(tool_index)
repos_indexed = 0
tools_indexed = 0
execution_timer = ExecutionTimer()
with repo_index.searcher() as searcher:
for repo in get_repos(sa_session, file_path, hgweb_config_dir, hgweb_repo_prefix, **kwargs):
tools_list = repo.pop("tools_list")
repo_id = repo["id"]
indexed_document = searcher.document(id=repo_id)
if indexed_document:
if indexed_document["full_last_updated"] == repo.get("full_last_updated"):
# We're done, since we sorted repos by update time
break
else:
# Got an update, delete the previous document
repo_index_writer.delete_by_term("id", repo_id)
repo_index_writer.add_document(**repo)
# Tools get their own index
tool_index_writer.delete_by_term("repo_id", repo_id)
for tool in tools_list:
tool_contents = tool.copy()
tool_contents["repo_owner_username"] = repo.get("repo_owner_username")
tool_contents["repo_name"] = repo.get("name")
tool_contents["repo_id"] = repo_id
tool_index_writer.add_document(**tool_contents)
tools_indexed += 1
repos_indexed += 1
tool_index_writer.commit()
repo_index_writer.commit()
log.info("Indexed repos: %s, tools: %s", repos_indexed, tools_indexed)
log.info("Toolbox index finished %s", execution_timer)
return repos_indexed, tools_indexed
[docs]def get_repos(sa_session, file_path, hgweb_config_dir, hgweb_repo_prefix, **kwargs):
"""
Load repos from DB and included tools from .xml configs.
"""
hgwcm = hgweb_config_manager
hgwcm.hgweb_config_dir = hgweb_config_dir
# Do not index deleted, deprecated, or "tool_dependency_definition" type repositories.
q = (
sa_session.query(model.Repository)
.filter_by(deleted=False)
.filter_by(deprecated=False)
.order_by(model.Repository.update_time.desc())
)
q = q.filter(model.Repository.type != "tool_dependency_definition")
for repo in q:
category_names = []
for rca in sa_session.query(model.RepositoryCategoryAssociation).filter(
model.RepositoryCategoryAssociation.repository_id == repo.id
):
for category in sa_session.query(model.Category).filter(model.Category.id == rca.category.id):
category_names.append(category.name.lower())
categories = (",").join(category_names)
repo_id = repo.id
name = repo.name
description = repo.description
long_description = repo.long_description
homepage_url = repo.homepage_url
remote_repository_url = repo.remote_repository_url
times_downloaded = repo.times_downloaded or 0
repo_owner_username = ""
if repo.user_id is not None:
user = sa_session.query(model.User).filter(model.User.id == repo.user_id).one()
repo_owner_username = user.username.lower()
last_updated = pretty_print_time_interval(repo.update_time)
full_last_updated = repo.update_time.strftime("%Y-%m-%d %I:%M %p")
# Load all changesets of the repo for lineage.
repo_path = os.path.join(
hgweb_config_dir, hgwcm.get_entry(os.path.join(hgweb_repo_prefix, repo.user.username, repo.name))
)
hg_repo = hg.repository(ui.ui(), repo_path.encode("utf-8"))
lineage = []
for changeset in hg_repo.changelog:
lineage.append(f"{unicodify(changeset)}:{unicodify(hg_repo[changeset])}")
repo_lineage = str(lineage)
# Parse all the tools within repo for a separate index.
tools_list = []
path = os.path.join(file_path, *directory_hash_id(repo.id))
path = os.path.join(path, "repo_%d" % repo.id)
if os.path.exists(path):
tools_list.extend(load_one_dir(path))
for root, dirs, _files in os.walk(path):
if ".hg" in dirs:
dirs.remove(".hg")
for dirname in dirs:
tools_in_dir = load_one_dir(os.path.join(root, dirname))
tools_list.extend(tools_in_dir)
yield (
dict(
id=unicodify(repo_id),
name=unicodify(name),
description=unicodify(description),
long_description=unicodify(long_description),
homepage_url=unicodify(homepage_url),
remote_repository_url=unicodify(remote_repository_url),
repo_owner_username=unicodify(repo_owner_username),
times_downloaded=unicodify(times_downloaded),
approved=unicodify("no"),
last_updated=unicodify(last_updated),
full_last_updated=unicodify(full_last_updated),
tools_list=tools_list,
repo_lineage=unicodify(repo_lineage),
categories=unicodify(categories),
)
)
[docs]def debug_handler(path, exc_info):
"""
By default the underlying tool parsing logs warnings for each exception.
This is very chatty hence this metod changes it to debug level.
"""
log.debug(f"Failed to load tool with path {path}.", exc_info=exc_info)
[docs]def load_one_dir(path):
tools_in_dir = []
tool_elems = load_tool_elements_from_path(path, load_exception_handler=debug_handler)
if tool_elems:
for elem in tool_elems:
root = elem[1].getroot()
if root.tag == "tool":
tool = {}
if root.find("help") is not None:
tool.update(dict(help=unicodify(root.find("help").text)))
if root.find("description") is not None:
tool.update(dict(description=unicodify(root.find("description").text)))
tool.update(
dict(
id=unicodify(root.attrib.get("id")),
name=unicodify(root.attrib.get("name")),
version=unicodify(root.attrib.get("version")),
)
)
tools_in_dir.append(tool)
return tools_in_dir