Source code for galaxy.model.search

"""
The GQL (Galaxy Query Language) search engine parsers a simple 'SQL-Like' query
syntax to obtain items from the Galaxy installations.
Rather then allow/force the user to do queries on the Galaxy schema, it uses
a small set of 'Views' which are simple table representations of complex galaxy ideas.
So while a history and its tags may exist in seperate tables in the real schema, in
GQL they exist in the same view

Example Queries:

select name, id, file_size from hda

select name from hda

select name, model_class from ldda

select * from history

select * from workflow

select id, name from history where name='Unnamed history'

select * from history where name='Unnamed history'

"""

import logging
import re
from json import dumps

import parsley
from sqlalchemy import and_
from sqlalchemy.orm import aliased

from galaxy.model import (
    ExtendedMetadata,
    ExtendedMetadataIndex,
    History,
    HistoryAnnotationAssociation,
    HistoryDatasetAssociation,
    HistoryDatasetAssociationTagAssociation,
    HistoryTagAssociation,
    Job,
    JobParameter,
    JobToInputDatasetAssociation,
    JobToInputLibraryDatasetAssociation,
    JobToOutputDatasetAssociation,
    Library,
    LibraryDataset,
    LibraryDatasetDatasetAssociation,
    LibraryFolder,
    Page,
    PageRevision,
    StoredWorkflow,
    StoredWorkflowTagAssociation
)
from galaxy.model.tool_shed_install import ToolVersion

log = logging.getLogger(__name__)


[docs]class ViewField(object): """ A ViewField defines a field in a view that filter operations can be applied to These filter operations are either handled with standard sqlalchemy filter calls, or passed to specialized handlers (such as when a table join would be needed to do the filtering) Parameters: sqlalchemy_field - Simple filtering using existing table columns, the argument is an sqlalchemy column that the right hand value will be compared against handler - Requires more specialized code to do filtering, usually requires a table join in order to process the conditional post_filter - Unable to do simple sqlalchemy based table filtering, filter is applied to loaded object Thus methods avalible to the object can be used for filtering. example: a library folder must climb its chain of parents to find out which library it belongs to """
[docs] def __init__(self, name, sqlalchemy_field=None, handler=None, post_filter=None, id_decode=False): self.name = name self.sqlalchemy_field = sqlalchemy_field self.handler = handler self.post_filter = post_filter self.id_decode = id_decode
[docs]class ViewQueryBaseClass(object): FIELDS = {} VIEW_NAME = "undefined"
[docs] def __init__(self): self.query = None self.do_query = False self.state = {} self.post_filter = []
[docs] def decode_query_ids(self, trans, conditional): if conditional.operator == 'and': self.decode_query_ids(trans, conditional.left) self.decode_query_ids(trans, conditional.right) else: left_base = conditional.left.split('.')[0] if left_base in self.FIELDS: field = self.FIELDS[left_base] if field.id_decode: conditional.right = trans.security.decode_id(conditional.right)
[docs] def filter(self, left, operator, right): if operator == 'and': self.filter(left.left, left.operator, left.right) self.filter(right.left, right.operator, right.right) else: left_base = left.split('.')[0] if left_base in self.FIELDS: self.do_query = True field = self.FIELDS[left_base] if field.sqlalchemy_field is not None: clazz, attribute = field.sqlalchemy_field sqlalchemy_field_value = getattr(clazz, attribute) if operator == "=": self.query = self.query.filter(sqlalchemy_field_value == right) elif operator == "!=": self.query = self.query.filter(sqlalchemy_field_value != right) elif operator == "like": self.query = self.query.filter(sqlalchemy_field_value.like(right)) else: raise GalaxyParseError("Invalid comparison operator: %s" % (operator)) elif field.handler is not None: field.handler(self, left, operator, right) elif field.post_filter is not None: self.post_filter.append([field.post_filter, left, operator, right]) else: raise GalaxyParseError("Unable to filter on field: %s" % (left)) else: raise GalaxyParseError("Unknown field: %s" % (left))
[docs] def search(self, trans): raise GalaxyParseError("Unable to search view: %s" % (self.VIEW_NAME))
[docs] def get_results(self, force_query=False): if self.query is not None and (force_query or self.do_query): for row in self.query.distinct().all(): selected = True for f in self.post_filter: if not f[0](row, f[1], f[2], f[3]): selected = False if selected: yield row
################## # Library Dataset Searching ##################
[docs]def library_extended_metadata_filter(view, left, operator, right): view.do_query = True if 'extended_metadata_joined' not in view.state: view.query = view.query.join(ExtendedMetadata) view.state['extended_metadata_joined'] = True alias = aliased(ExtendedMetadataIndex) field = "/%s" % ("/".join(left.split(".")[1:])) view.query = view.query.filter( and_( ExtendedMetadata.id == alias.extended_metadata_id, alias.path == field, alias.value == str(right) ) )
[docs]def ldda_parent_library_filter(item, left, operator, right): if operator == '=': return right == item.library_dataset.folder.parent_library.id elif operator == '!=': return right != item.library_dataset.folder.parent_library.id raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]class LibraryDatasetDatasetView(ViewQueryBaseClass): VIEW_NAME = "library_dataset_dataset" FIELDS = { 'extended_metadata': ViewField('extended_metadata', handler=library_extended_metadata_filter), 'name': ViewField('name', sqlalchemy_field=(LibraryDatasetDatasetAssociation, "name")), 'id': ViewField('id', sqlalchemy_field=(LibraryDatasetDatasetAssociation, 'id'), id_decode=True), 'deleted': ViewField('deleted', sqlalchemy_field=(LibraryDatasetDatasetAssociation, "deleted")), 'parent_library_id': ViewField('parent_library_id', id_decode=True, post_filter=ldda_parent_library_filter), 'data_type': ViewField('data_type', sqlalchemy_field=(LibraryDatasetDatasetAssociation, "extension")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryDatasetDatasetAssociation)
################## # Library Searching ##################
[docs]class LibraryView(ViewQueryBaseClass): VIEW_NAME = "library" FIELDS = { 'name': ViewField('name', sqlalchemy_field=(Library, "name")), 'id': ViewField('id', sqlalchemy_field=(Library, 'id'), id_decode=True), 'deleted': ViewField('deleted', sqlalchemy_field=(Library, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(Library)
################## # Library Folder Searching ##################
[docs]def library_folder_parent_library_id_filter(item, left, operator, right): if operator == '=': return item.parent_library.id == right if operator == '!=': return item.parent_library.id != right raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]def library_path_filter(item, left, operator, right): lpath = "/" + "/".join(item.library_path) if operator == '=': return lpath == right if operator == '!=': return lpath != right raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]class LibraryFolderView(ViewQueryBaseClass): VIEW_NAME = "library_folder" FIELDS = { 'name': ViewField('name', sqlalchemy_field=(LibraryFolder, "name")), 'id': ViewField('id', sqlalchemy_field=(LibraryFolder, "id"), id_decode=True), 'parent_id': ViewField('parent_id', sqlalchemy_field=(LibraryFolder, "parent_id"), id_decode=True), 'parent_library_id': ViewField('parent_library_id', post_filter=library_folder_parent_library_id_filter, id_decode=True), 'library_path': ViewField('library_path', post_filter=library_path_filter) }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryFolder)
################## # Library Dataset Searching ##################
[docs]def library_dataset_name_filter(item, left, operator, right): if operator == '=': return item.name == right if operator == '!=': return item.name != right raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]class LibraryDatasetView(ViewQueryBaseClass): VIEW_NAME = "library_dataset" FIELDS = { 'name': ViewField('name', post_filter=library_dataset_name_filter), 'id': ViewField('id', sqlalchemy_field=(LibraryDataset, "id"), id_decode=True), 'folder_id': ViewField('folder_id', sqlalchemy_field=(LibraryDataset, "folder_id"), id_decode=True) }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryDataset)
################## # Tool Searching ##################
[docs]class ToolView(ViewQueryBaseClass): VIEW_NAME = "tool" FIELDS = { 'tool_id': ViewField('name', sqlalchemy_field=(ToolVersion, "tool_id")), 'id': ViewField('id', sqlalchemy_field=(ToolVersion, "id")), }
[docs] def search(self, trans): self.query = trans.install_model.context.query(ToolVersion)
################## # History Dataset Searching ##################
[docs]def history_dataset_handle_tag(view, left, operator, right): if operator == "=": view.do_query = True # aliasing the tag association table, so multiple links to different tags can be formed during a single query tag_table = aliased(HistoryDatasetAssociationTagAssociation) view.query = view.query.filter( HistoryDatasetAssociation.id == tag_table.history_dataset_association_id ) tmp = right.split(":") view.query = view.query.filter(tag_table.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(tag_table.user_value == tmp[1]) else: raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]def history_dataset_extended_metadata_filter(view, left, operator, right): view.do_query = True if 'extended_metadata_joined' not in view.state: view.query = view.query.join(ExtendedMetadata) view.state['extended_metadata_joined'] = True alias = aliased(ExtendedMetadataIndex) field = "/%s" % ("/".join(left.split(".")[1:])) view.query = view.query.filter( and_( ExtendedMetadata.id == alias.extended_metadata_id, alias.path == field, alias.value == str(right) ) )
[docs]class HistoryDatasetView(ViewQueryBaseClass): DOMAIN = "history_dataset" FIELDS = { 'name': ViewField('name', sqlalchemy_field=(HistoryDatasetAssociation, "name")), 'id': ViewField('id', sqlalchemy_field=(HistoryDatasetAssociation, "id"), id_decode=True), 'history_id': ViewField('history_id', sqlalchemy_field=(HistoryDatasetAssociation, "history_id"), id_decode=True), 'tag': ViewField("tag", handler=history_dataset_handle_tag), 'copied_from_ldda_id': ViewField("copied_from_ldda_id", sqlalchemy_field=(HistoryDatasetAssociation, "copied_from_library_dataset_dataset_association_id"), id_decode=True), 'copied_from_hda_id': ViewField("copied_from_hda_id", sqlalchemy_field=(HistoryDatasetAssociation, "copied_from_history_dataset_association_id"), id_decode=True), 'deleted': ViewField('deleted', sqlalchemy_field=(HistoryDatasetAssociation, "deleted")), 'extended_metadata': ViewField('extended_metadata', handler=history_dataset_extended_metadata_filter) }
[docs] def search(self, trans): self.query = trans.sa_session.query(HistoryDatasetAssociation)
################## # History Searching ##################
[docs]def history_handle_tag(view, left, operator, right): if operator == "=": view.do_query = True tag_table = aliased(HistoryTagAssociation) view.query = view.query.filter( History.id == tag_table.history_id ) tmp = right.split(":") view.query = view.query.filter(tag_table.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(tag_table.user_value == tmp[1]) else: raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]def history_handle_annotation(view, left, operator, right): if operator == "=": view.do_query = True view.query = view.query.filter(and_( HistoryAnnotationAssociation.history_id == History.id, HistoryAnnotationAssociation.annotation == right )) elif operator == "like": view.do_query = True view.query = view.query.filter(and_( HistoryAnnotationAssociation.history_id == History.id, HistoryAnnotationAssociation.annotation.like(right) )) else: raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]class HistoryView(ViewQueryBaseClass): DOMAIN = "history" FIELDS = { 'name': ViewField('name', sqlalchemy_field=(History, "name")), 'id': ViewField('id', sqlalchemy_field=(History, "id"), id_decode=True), 'tag': ViewField("tag", handler=history_handle_tag), 'annotation': ViewField("annotation", handler=history_handle_annotation), 'deleted': ViewField('deleted', sqlalchemy_field=(History, "deleted")) }
[docs] def search(self, trans): self.query = trans.sa_session.query(History)
################## # Workflow Searching ##################
[docs]def workflow_tag_handler(view, left, operator, right): if operator == "=": view.do_query = True view.query = view.query.filter( StoredWorkflow.id == StoredWorkflowTagAssociation.stored_workflow_id ) tmp = right.split(":") view.query = view.query.filter(StoredWorkflowTagAssociation.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(StoredWorkflowTagAssociation.user_value == tmp[1]) else: raise GalaxyParseError("Invalid comparison operator: %s" % (operator))
[docs]class WorkflowView(ViewQueryBaseClass): DOMAIN = "workflow" FIELDS = { 'name': ViewField('name', sqlalchemy_field=(StoredWorkflow, "name")), 'id': ViewField('id', sqlalchemy_field=(StoredWorkflow, "id"), id_decode=True), 'tag': ViewField('tag', handler=workflow_tag_handler), 'deleted': ViewField('deleted', sqlalchemy_field=(StoredWorkflow, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(StoredWorkflow)
################## # Job Searching ##################
[docs]def job_param_filter(view, left, operator, right): view.do_query = True alias = aliased(JobParameter) param_name = re.sub(r'^param.', '', left) view.query = view.query.filter( and_( Job.id == alias.job_id, alias.name == param_name, alias.value == dumps(right) ) )
[docs]def job_input_hda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToInputDatasetAssociation) param_name = re.sub(r'^input_hda.', '', left) view.query = view.query.filter( and_( Job.id == alias.job_id, alias.name == param_name, alias.dataset_id == right ) )
[docs]def job_input_ldda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToInputLibraryDatasetAssociation) param_name = re.sub(r'^input_ldda.', '', left) view.query = view.query.filter( and_( Job.id == alias.job_id, alias.name == param_name, alias.ldda_id == right ) )
[docs]def job_output_hda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToOutputDatasetAssociation) param_name = re.sub(r'^output_hda.', '', left) view.query = view.query.filter( and_( Job.id == alias.job_id, alias.name == param_name, alias.dataset_id == right ) )
[docs]class JobView(ViewQueryBaseClass): DOMAIN = "job" FIELDS = { 'tool_name': ViewField('tool_name', sqlalchemy_field=(Job, "tool_id")), 'state': ViewField('state', sqlalchemy_field=(Job, "state")), 'param': ViewField('param', handler=job_param_filter), 'input_ldda': ViewField('input_ldda', handler=job_input_ldda_filter, id_decode=True), 'input_hda': ViewField('input_hda', handler=job_input_hda_filter, id_decode=True), 'output_hda': ViewField('output_hda', handler=job_output_hda_filter, id_decode=True) }
[docs] def search(self, trans): self.query = trans.sa_session.query(Job)
################## # Page Searching ##################
[docs]class PageView(ViewQueryBaseClass): DOMAIN = "page" FIELDS = { 'id': ViewField('id', sqlalchemy_field=(Page, "id"), id_decode=True), 'slug': ViewField('slug', sqlalchemy_field=(Page, "slug")), 'title': ViewField('title', sqlalchemy_field=(Page, "title")), 'deleted': ViewField('deleted', sqlalchemy_field=(Page, "deleted")) }
[docs] def search(self, trans): self.query = trans.sa_session.query(Page)
################## # Page Revision Searching ##################
[docs]class PageRevisionView(ViewQueryBaseClass): DOMAIN = "page_revision" FIELDS = { 'id': ViewField('id', sqlalchemy_field=(PageRevision, "id"), id_decode=True), 'title': ViewField('title', sqlalchemy_field=(PageRevision, "title")), 'page_id': ViewField('page_id', sqlalchemy_field=(PageRevision, "page_id"), id_decode=True), }
[docs] def search(self, trans): self.query = trans.sa_session.query(PageRevision)
# The view mapping takes a user's name for a table and maps it to a View class # that will handle queries. view_mapping = { 'library': LibraryView, 'library_folder': LibraryFolderView, 'library_dataset_dataset': LibraryDatasetDatasetView, 'library_dataset': LibraryDatasetView, 'lda': LibraryDatasetView, 'ldda': LibraryDatasetDatasetView, 'history_dataset': HistoryDatasetView, 'hda': HistoryDatasetView, 'history': HistoryView, 'workflow': WorkflowView, 'tool': ToolView, 'job': JobView, 'page': PageView, 'page_revision': PageRevisionView, } # The GQL gramar is defined in Parsley syntax ( https://parsley.readthedocs.io/ ) gqlGrammar = r""" expr = 'select' bs field_desc:f bs 'from' bs word:t ( bs 'where' bs conditional:c ws -> GalaxyQuery(f,t,c) | ws -> GalaxyQuery(f, t, None) ) bs = ' '+ ws = ' '* field_desc = ( '*' -> ['*'] | field_list ) field_list = field_name:x ( ws ',' ws field_list:y -> [x] + y | -> [x] ) conditional = logic_statement:x ( bs 'and' bs conditional:y -> GalaxyQueryAnd(x,y) | -> x ) word = alphanum+:x -> "".join(x) field_name = word:x ( '.' quote_word:y -> x + "." + y |-> x ) alphanum = anything:x ?(re.search(r'\w', x) is not None) -> x logic_statement = field_name:left ws comparison:comp ws value_word:right -> GalaxyQueryComparison(left, comp, right) value_word = ( 'false' -> False | 'False' -> False | 'true' -> True | 'True' -> True | 'None' -> None | quote_word ) comparison = ( '=' -> '=' | '>' -> '>' | '<' -> '<' | '!=' -> '!=' | '>=' -> '>=' | '<=' -> '<=' | 'like' -> 'like' ) quote_word = "'" not_quote*:x "'" -> "".join(x) not_quote = anything:x ?(x != "'") -> x not_dquote = anything:x ?(x != '"') -> x """
[docs]class GalaxyQuery(object): """ This class represents a data structure of a compiled GQL query """
[docs] def __init__(self, field_list, table_name, conditional): self.field_list = field_list self.table_name = table_name self.conditional = conditional
[docs]class GalaxyQueryComparison(object): """ This class represents the data structure of the comparison arguments of a compiled GQL query (ie where name='Untitled History') """
[docs] def __init__(self, left, operator, right): self.left = left self.operator = operator self.right = right
[docs]class GalaxyQueryAnd(object): """ This class represents the data structure of the comparison arguments of a compiled GQL query (ie where name='Untitled History') """
[docs] def __init__(self, left, right): self.left = left self.operator = 'and' self.right = right
[docs]class GalaxyParseError(Exception): pass
[docs]class SearchQuery(object):
[docs] def __init__(self, view, query): self.view = view self.query = query
[docs] def decode_query_ids(self, trans): if self.query.conditional is not None: self.view.decode_query_ids(trans, self.query.conditional)
[docs] def process(self, trans): self.view.search(trans) if self.query.conditional is not None: self.view.filter( self.query.conditional.left, self.query.conditional.operator, self.query.conditional.right ) return self.view.get_results(True)
[docs] def item_to_api_value(self, item): r = item.to_dict(view='element') if self.query.field_list.count("*"): return r o = {} for a in r: if a in self.query.field_list: o[a] = r[a] return o
[docs]class GalaxySearchEngine(object): """ Primary class for searching. Parses GQL (Galaxy Query Language) queries and returns a 'SearchQuery' class """
[docs] def __init__(self): self.parser = parsley.makeGrammar(gqlGrammar, { 're': re, 'GalaxyQuery': GalaxyQuery, 'GalaxyQueryComparison': GalaxyQueryComparison, 'GalaxyQueryAnd': GalaxyQueryAnd })
[docs] def query(self, query_text): q = self.parser(query_text).expr() if q.table_name in view_mapping: view = view_mapping[q.table_name]() return SearchQuery(view, q) raise GalaxyParseError("No such table %s" % (q.table_name))