Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.shed_index
import logging
import os
from mercurial import hg, ui
from whoosh.writing import AsyncWriter
import tool_shed.webapp.model.mapping as ts_mapping
from galaxy.tool_util.loader_directory import load_tool_elements_from_path
from galaxy.tools.search import get_or_create_index
from galaxy.util import (
directory_hash_id,
ExecutionTimer,
pretty_print_time_interval,
unicodify
)
from tool_shed.util.hgweb_config import hgweb_config_manager
from tool_shed.webapp import model
from tool_shed.webapp.search.repo_search import schema as repo_schema
from tool_shed.webapp.search.tool_search import schema as tool_schema
log = logging.getLogger(__name__)
def _get_or_create_index(whoosh_index_dir):
tool_index_dir = os.path.join(whoosh_index_dir, 'tools')
if not os.path.exists(whoosh_index_dir):
os.makedirs(whoosh_index_dir)
if not os.path.exists(tool_index_dir):
os.makedirs(tool_index_dir)
return get_or_create_index(whoosh_index_dir, repo_schema), get_or_create_index(tool_index_dir, tool_schema)
[docs]def build_index(whoosh_index_dir, file_path, hgweb_config_dir, dburi, **kwargs):
"""
Build two search indexes simultaneously
One is for repositories and the other for tools.
Returns a tuple with number of repos and tools that were indexed.
"""
model = ts_mapping.init(file_path, dburi, engine_options={}, create_tables=False)
sa_session = model.context.current
repo_index, tool_index = _get_or_create_index(whoosh_index_dir)
repo_index_writer = AsyncWriter(repo_index)
tool_index_writer = AsyncWriter(tool_index)
repos_indexed = 0
tools_indexed = 0
execution_timer = ExecutionTimer()
with repo_index.searcher() as searcher:
for repo in get_repos(sa_session, file_path, hgweb_config_dir, **kwargs):
tools_list = repo.pop('tools_list')
repo_id = repo['id']
indexed_document = searcher.document(id=repo_id)
if indexed_document:
if indexed_document['full_last_updated'] == repo.get('full_last_updated'):
# We're done, since we sorted repos by update time
break
else:
# Got an update, delete the previous document
repo_index_writer.delete_by_term('id', repo_id)
repo_index_writer.add_document(**repo)
# Tools get their own index
for tool in tools_list:
tool_index_writer.add_document(**tool)
tools_indexed += 1
repos_indexed += 1
tool_index_writer.commit()
repo_index_writer.commit()
log.info("Indexed repos: %s, tools: %s", repos_indexed, tools_indexed)
log.info("Toolbox index finished %s", execution_timer)
return repos_indexed, tools_indexed
[docs]def get_repos(sa_session, file_path, hgweb_config_dir, **kwargs):
"""
Load repos from DB and included tools from .xml configs.
"""
hgwcm = hgweb_config_manager
hgwcm.hgweb_config_dir = hgweb_config_dir
# Do not index deleted, deprecated, or "tool_dependency_definition" type repositories.
q = sa_session.query(model.Repository).filter_by(deleted=False).filter_by(deprecated=False).order_by(model.Repository.update_time.desc())
q = q.filter(model.Repository.type != 'tool_dependency_definition')
for repo in q:
category_names = []
for rca in sa_session.query(model.RepositoryCategoryAssociation).filter(model.RepositoryCategoryAssociation.repository_id == repo.id):
for category in sa_session.query(model.Category).filter(model.Category.id == rca.category.id):
category_names.append(category.name.lower())
categories = (",").join(category_names)
repo_id = repo.id
name = repo.name
description = repo.description
long_description = repo.long_description
homepage_url = repo.homepage_url
remote_repository_url = repo.remote_repository_url
times_downloaded = repo.times_downloaded or 0
repo_owner_username = ''
if repo.user_id is not None:
user = sa_session.query(model.User).filter(model.User.id == repo.user_id).one()
repo_owner_username = user.username.lower()
approved = 'no'
for review in repo.reviews:
if review.approved == 'yes':
approved = 'yes'
break
last_updated = pretty_print_time_interval(repo.update_time)
full_last_updated = repo.update_time.strftime("%Y-%m-%d %I:%M %p")
# Load all changesets of the repo for lineage.
repo_path = os.path.join(hgweb_config_dir, hgwcm.get_entry(os.path.join("repos", repo.user.username, repo.name)))
hg_repo = hg.repository(ui.ui(), repo_path.encode('utf-8'))
lineage = []
for changeset in hg_repo.changelog:
lineage.append(unicodify(changeset) + ":" + unicodify(hg_repo[changeset]))
repo_lineage = str(lineage)
# Parse all the tools within repo for a separate index.
tools_list = []
path = os.path.join(file_path, *directory_hash_id(repo.id))
path = os.path.join(path, "repo_%d" % repo.id)
if os.path.exists(path):
tools_list.extend(load_one_dir(path))
for root, dirs, files in os.walk(path):
if '.hg' in dirs:
dirs.remove('.hg')
for dirname in dirs:
tools_in_dir = load_one_dir(os.path.join(root, dirname))
tools_list.extend(tools_in_dir)
yield (dict(id=unicodify(repo_id),
name=unicodify(name),
description=unicodify(description),
long_description=unicodify(long_description),
homepage_url=unicodify(homepage_url),
remote_repository_url=unicodify(remote_repository_url),
repo_owner_username=unicodify(repo_owner_username),
times_downloaded=unicodify(times_downloaded),
approved=unicodify(approved),
last_updated=unicodify(last_updated),
full_last_updated=unicodify(full_last_updated),
tools_list=tools_list,
repo_lineage=unicodify(repo_lineage),
categories=unicodify(categories)))
[docs]def debug_handler(path, exc_info):
"""
By default the underlying tool parsing logs warnings for each exception.
This is very chatty hence this metod changes it to debug level.
"""
log.debug("Failed to load tool with path %s." % path, exc_info=exc_info)
[docs]def load_one_dir(path):
tools_in_dir = []
tool_elems = load_tool_elements_from_path(path, load_exception_handler=debug_handler)
if tool_elems:
for elem in tool_elems:
root = elem[1].getroot()
if root.tag == 'tool':
tool = {}
if root.find('help') is not None:
tool.update(dict(help=unicodify(root.find('help').text)))
if root.find('description') is not None:
tool.update(dict(description=unicodify(root.find('description').text)))
tool.update(dict(id=unicodify(root.attrib.get('id')),
name=unicodify(root.attrib.get('name')),
version=unicodify(root.attrib.get('version'))))
tools_in_dir.append(tool)
return tools_in_dir