Source code for

Module for building and searching the index of tools
installed within this Galaxy. Before changing index-building
or searching related parts it is deeply recommended to read
through the library docs at
import logging
import os
import re

from whoosh import (
from whoosh.analysis import StandardAnalyzer
from whoosh.fields import (
from whoosh.qparser import (
from whoosh.scoring import (
from whoosh.writing import AsyncWriter

from galaxy.util import ExecutionTimer
from galaxy.web.framework.helpers import to_unicode

log = logging.getLogger(__name__)

[docs]def get_or_create_index(index_dir, schema): if not os.path.exists(index_dir): os.makedirs(index_dir) if index.exists_in(index_dir): idx = index.open_dir(index_dir) try: assert idx.schema == schema return idx except AssertionError: log.warning("Index at '%s' uses outdated schema, creating new index", index_dir) return index.create_in(index_dir, schema=schema)
[docs]class ToolBoxSearch: """ Support searching tools in a toolbox. This implementation uses the Whoosh search library. """
[docs] def __init__(self, toolbox, index_dir=None, index_help=True): self.schema = Schema(id=ID(stored=True, unique=True), old_id=ID, stub=KEYWORD, name=TEXT(analyzer=analysis.SimpleAnalyzer()), description=TEXT, section=TEXT, help=TEXT, labels=KEYWORD) self.rex = analysis.RegexTokenizer() self.index_dir = index_dir self.toolbox = toolbox self.index = self._index_setup() # We keep track of how many times the tool index has been rebuilt. # We start at -1, so that after the first index the count is at 0, # which is the same as the toolbox reload count. This way we can skip # reindexing if the index count is equal to the toolbox reload count. self.index_count = -1
def _index_setup(self): return get_or_create_index(index_dir=self.index_dir, schema=self.schema)
[docs] def build_index(self, tool_cache, index_help=True): """ Prepare search index for tools loaded in toolbox. Use `tool_cache` to determine which tools need indexing and which tools should be expired. """ log.debug('Starting to build toolbox index.') self.index_count += 1 execution_timer = ExecutionTimer() with self.index.reader() as reader: # Index ocasionally contains empty stored fields indexed_tool_ids = {f['id'] for f in reader.all_stored_fields() if f} tool_ids_to_remove = (indexed_tool_ids - set(tool_cache._tool_paths_by_id.keys())).union(tool_cache._removed_tool_ids) for indexed_tool_id in indexed_tool_ids: indexed_tool = tool_cache.get_tool_by_id(indexed_tool_id) if indexed_tool: if indexed_tool.is_latest_version: continue latest_version = indexed_tool.latest_version if latest_version and latest_version.hidden: continue tool_ids_to_remove.add(indexed_tool_id) with AsyncWriter(self.index) as writer: for tool_id in tool_ids_to_remove: writer.delete_by_term('id', tool_id) for tool_id in tool_cache._new_tool_ids - indexed_tool_ids: tool = tool_cache.get_tool_by_id(tool_id) if tool and tool.is_latest_version: if tool.hidden: # we check if there is an older tool we can return if tool.lineage: for tool_version in reversed(tool.lineage.get_versions()): tool = tool_cache.get_tool_by_id( if tool and not tool.hidden: tool_id = break else: continue else: continue add_doc_kwds = self._create_doc(tool_id=tool_id, tool=tool, index_help=index_help) writer.update_document(**add_doc_kwds) log.debug("Toolbox index finished %s", execution_timer)
def _create_doc(self, tool_id, tool, index_help=True): # Do not add data managers to the public index if tool.tool_type == 'manage_data': return {} add_doc_kwds = { "id": tool_id, "description": to_unicode(tool.description), "section": to_unicode(tool.get_panel_section()[1] if len(tool.get_panel_section()) == 2 else ''), "help": to_unicode("") } if'-') != -1: # Replace hyphens, since they are wildcards in Whoosh causing false positives add_doc_kwds['name'] = (' ').join(token.text for token in self.rex(to_unicode( else: add_doc_kwds['name'] = to_unicode( if tool.guid: # Create a stub consisting of owner, repo, and tool from guid slash_indexes = [m.start() for m in re.finditer('/', tool.guid)] id_stub = tool.guid[(slash_indexes[1] + 1): slash_indexes[4]] add_doc_kwds['stub'] = (' ').join(token.text for token in self.rex(to_unicode(id_stub))) else: add_doc_kwds['stub'] = to_unicode(id) if tool.labels: add_doc_kwds['labels'] = to_unicode(" ".join(tool.labels)) if index_help: raw_help = tool.raw_help if raw_help: try: add_doc_kwds['help'] = to_unicode(raw_help) except Exception: # Don't fail to build index just because help can't be converted. pass return add_doc_kwds
[docs] def search(self, q, tool_name_boost, tool_id_boost, tool_section_boost, tool_description_boost, tool_label_boost, tool_stub_boost, tool_help_boost, tool_search_limit, tool_enable_ngram_search, tool_ngram_minsize, tool_ngram_maxsize): """ Perform search on the in-memory index. Weight in the given boosts. """ # Change field boosts for searcher self.searcher = self.index.searcher( weighting=MultiWeighting(BM25F(), old_id=BM25F(old_id_B=float(tool_id_boost)), name=BM25F(name_B=float(tool_name_boost)), section=BM25F(section_B=float(tool_section_boost)), description=BM25F(description_B=float(tool_description_boost)), labels=BM25F(labels_B=float(tool_label_boost)), stub=BM25F(stub_B=float(tool_stub_boost)), help=BM25F(help_B=float(tool_help_boost)) ) ) # Use OrGroup to change the default operation for joining multiple terms to logical OR. # This means e.g. for search 'bowtie of king arthur' a document that only has 'bowtie' will be a match. # # However this changes scoring i.e. searching 'bowtie of king arthur' a document with 'arthur arthur arthur' # would have a higher score than a document with 'bowtie arthur' which is usually unexpected for a user. # Hence we introduce a bonus on multi-hits using the 'factory()' method using a scaling factor between 0-1. # # Adding the FuzzyTermPlugin to account for misspellings and typos, using a max distance of 2 og = OrGroup.factory(0.9) self.parser = MultifieldParser(['name', 'old_id', 'description', 'section', 'help', 'labels', 'stub'], schema=self.schema, group=og) cleaned_query = q.lower() if tool_enable_ngram_search is True: rval = self._search_ngrams(cleaned_query, tool_ngram_minsize, tool_ngram_maxsize, tool_search_limit) return rval else: cleaned_query = ' '.join(token.text for token in self.rex(cleaned_query)) # Use asterisk Whoosh wildcard so e.g. 'bow' easily matches 'bowtie' parsed_query = self.parser.parse('*' + cleaned_query + '*') hits =, limit=float(tool_search_limit), sortedby='') return [hit['id'] for hit in hits]
def _search_ngrams(self, cleaned_query, tool_ngram_minsize, tool_ngram_maxsize, tool_search_limit): """ Break tokens into ngrams and search on those instead. This should make searching more resistant to typos and unfinished words. See docs at """ hits_with_score = {} token_analyzer = StandardAnalyzer() | analysis.NgramFilter(minsize=int(tool_ngram_minsize), maxsize=int(tool_ngram_maxsize)) ngrams = [token.text for token in token_analyzer(cleaned_query)] for query in ngrams: # Get the tool list with respective scores for each qgram curr_hits ='*' + query + '*'), limit=float(tool_search_limit)) for i, curr_hit in enumerate(curr_hits): is_present = False for prev_hit in hits_with_score: # Check if the tool appears again for the next qgram search if curr_hit['id'] == prev_hit: is_present = True # Add the current score with the previous one if the # tool appears again for the next qgram hits_with_score[prev_hit] = curr_hits.score(i) + hits_with_score[prev_hit] # Add the tool if not present to the collection with its score if not is_present: hits_with_score[curr_hit['id']] = curr_hits.score(i) # Sort the results based on aggregated BM25 score in decreasing order of scores hits_with_score = sorted(hits_with_score.items(), key=lambda x: x[1], reverse=True) # Return the tool ids return [item[0] for item in hits_with_score[0:int(tool_search_limit)]]