Source code for galaxy.model.search

"""
The GQL (Galaxy Query Language) search engine parsers a simple 'SQL-Like' query
syntax to obtain items from the Galaxy installations.
Rather then allow/force the user to do queries on the Galaxy schema, it uses
a small set of 'Views' which are simple table representations of complex galaxy ideas.
So while a history and its tags may exist in seperate tables in the real schema, in
GQL they exist in the same view

Example Queries:

select name, id, file_size from hda

select name from hda

select name, model_class from ldda

select * from history

select * from workflow

select id, name from history where name='Unnamed history'

select * from history where name='Unnamed history'

"""

import logging
import re
from json import dumps
from typing import Dict

import parsley
from sqlalchemy import and_
from sqlalchemy.orm import aliased

from galaxy.model import (
    ExtendedMetadata,
    ExtendedMetadataIndex,
    History,
    HistoryAnnotationAssociation,
    HistoryDatasetAssociation,
    HistoryDatasetAssociationTagAssociation,
    HistoryTagAssociation,
    Job,
    JobParameter,
    JobToInputDatasetAssociation,
    JobToInputLibraryDatasetAssociation,
    JobToOutputDatasetAssociation,
    Library,
    LibraryDataset,
    LibraryDatasetDatasetAssociation,
    LibraryFolder,
    Page,
    PageRevision,
    StoredWorkflow,
    StoredWorkflowTagAssociation,
)
from galaxy.model.tool_shed_install import ToolVersion

log = logging.getLogger(__name__)


[docs]class ViewField: """ A ViewField defines a field in a view that filter operations can be applied to These filter operations are either handled with standard sqlalchemy filter calls, or passed to specialized handlers (such as when a table join would be needed to do the filtering) Parameters: sqlalchemy_field - Simple filtering using existing table columns, the argument is an sqlalchemy column that the right hand value will be compared against handler - Requires more specialized code to do filtering, usually requires a table join in order to process the conditional post_filter - Unable to do simple sqlalchemy based table filtering, filter is applied to loaded object Thus methods avalible to the object can be used for filtering. example: a library folder must climb its chain of parents to find out which library it belongs to """
[docs] def __init__(self, name, sqlalchemy_field=None, handler=None, post_filter=None, id_decode=False): self.name = name self.sqlalchemy_field = sqlalchemy_field self.handler = handler self.post_filter = post_filter self.id_decode = id_decode
[docs]class ViewQueryBaseClass: FIELDS: Dict[str, ViewField] = {} VIEW_NAME = "undefined"
[docs] def __init__(self): self.query = None self.do_query = False self.state = {} self.post_filter = []
[docs] def decode_query_ids(self, trans, conditional): if conditional.operator == "and": self.decode_query_ids(trans, conditional.left) self.decode_query_ids(trans, conditional.right) else: left_base = conditional.left.split(".")[0] if left_base in self.FIELDS: field = self.FIELDS[left_base] if field.id_decode: conditional.right = trans.security.decode_id(conditional.right)
[docs] def filter(self, left, operator, right): if operator == "and": self.filter(left.left, left.operator, left.right) self.filter(right.left, right.operator, right.right) else: left_base = left.split(".")[0] if left_base in self.FIELDS: self.do_query = True field = self.FIELDS[left_base] if field.sqlalchemy_field is not None: clazz, attribute = field.sqlalchemy_field sqlalchemy_field_value = getattr(clazz, attribute) if operator == "=": self.query = self.query.filter(sqlalchemy_field_value == right) elif operator == "!=": self.query = self.query.filter(sqlalchemy_field_value != right) elif operator == "like": self.query = self.query.filter(sqlalchemy_field_value.like(right)) else: raise GalaxyParseError(f"Invalid comparison operator: {operator}") elif field.handler is not None: field.handler(self, left, operator, right) elif field.post_filter is not None: self.post_filter.append([field.post_filter, left, operator, right]) else: raise GalaxyParseError(f"Unable to filter on field: {left}") else: raise GalaxyParseError(f"Unknown field: {left}")
[docs] def search(self, trans): raise GalaxyParseError(f"Unable to search view: {self.VIEW_NAME}")
[docs] def get_results(self, force_query=False): if self.query is not None and (force_query or self.do_query): for row in self.query.distinct().all(): selected = True for f in self.post_filter: if not f[0](row, f[1], f[2], f[3]): selected = False if selected: yield row
################## # Library Dataset Searching ##################
[docs]def library_extended_metadata_filter(view, left, operator, right): view.do_query = True if "extended_metadata_joined" not in view.state: view.query = view.query.join(ExtendedMetadata) view.state["extended_metadata_joined"] = True alias = aliased(ExtendedMetadataIndex) field = f"/{'/'.join(left.split('.')[1:])}" view.query = view.query.filter( and_(ExtendedMetadata.id == alias.extended_metadata_id, alias.path == field, alias.value == str(right)) )
[docs]def ldda_parent_library_filter(item, left, operator, right): if operator == "=": return right == item.library_dataset.folder.parent_library.id elif operator == "!=": return right != item.library_dataset.folder.parent_library.id raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]class LibraryDatasetDatasetView(ViewQueryBaseClass): VIEW_NAME = "library_dataset_dataset" FIELDS = { "extended_metadata": ViewField("extended_metadata", handler=library_extended_metadata_filter), "name": ViewField("name", sqlalchemy_field=(LibraryDatasetDatasetAssociation, "name")), "id": ViewField("id", sqlalchemy_field=(LibraryDatasetDatasetAssociation, "id"), id_decode=True), "deleted": ViewField("deleted", sqlalchemy_field=(LibraryDatasetDatasetAssociation, "deleted")), "parent_library_id": ViewField("parent_library_id", id_decode=True, post_filter=ldda_parent_library_filter), "data_type": ViewField("data_type", sqlalchemy_field=(LibraryDatasetDatasetAssociation, "extension")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryDatasetDatasetAssociation)
################## # Library Searching ##################
[docs]class LibraryView(ViewQueryBaseClass): VIEW_NAME = "library" FIELDS = { "name": ViewField("name", sqlalchemy_field=(Library, "name")), "id": ViewField("id", sqlalchemy_field=(Library, "id"), id_decode=True), "deleted": ViewField("deleted", sqlalchemy_field=(Library, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(Library)
################## # Library Folder Searching ##################
[docs]def library_folder_parent_library_id_filter(item, left, operator, right): if operator == "=": return item.parent_library.id == right if operator == "!=": return item.parent_library.id != right raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]def library_path_filter(item, left, operator, right): lpath = f"/{'/'.join(item.library_path)}" if operator == "=": return lpath == right if operator == "!=": return lpath != right raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]class LibraryFolderView(ViewQueryBaseClass): VIEW_NAME = "library_folder" FIELDS = { "name": ViewField("name", sqlalchemy_field=(LibraryFolder, "name")), "id": ViewField("id", sqlalchemy_field=(LibraryFolder, "id"), id_decode=True), "parent_id": ViewField("parent_id", sqlalchemy_field=(LibraryFolder, "parent_id"), id_decode=True), "parent_library_id": ViewField( "parent_library_id", post_filter=library_folder_parent_library_id_filter, id_decode=True ), "library_path": ViewField("library_path", post_filter=library_path_filter), }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryFolder)
################## # Library Dataset Searching ##################
[docs]def library_dataset_name_filter(item, left, operator, right): if operator == "=": return item.name == right if operator == "!=": return item.name != right raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]class LibraryDatasetView(ViewQueryBaseClass): VIEW_NAME = "library_dataset" FIELDS = { "name": ViewField("name", post_filter=library_dataset_name_filter), "id": ViewField("id", sqlalchemy_field=(LibraryDataset, "id"), id_decode=True), "folder_id": ViewField("folder_id", sqlalchemy_field=(LibraryDataset, "folder_id"), id_decode=True), }
[docs] def search(self, trans): self.query = trans.sa_session.query(LibraryDataset)
################## # Tool Searching ##################
[docs]class ToolView(ViewQueryBaseClass): VIEW_NAME = "tool" FIELDS = { "tool_id": ViewField("name", sqlalchemy_field=(ToolVersion, "tool_id")), "id": ViewField("id", sqlalchemy_field=(ToolVersion, "id")), }
[docs] def search(self, trans): self.query = trans.install_model.context.query(ToolVersion)
################## # History Dataset Searching ##################
[docs]def history_dataset_handle_tag(view, left, operator, right): if operator == "=": view.do_query = True # aliasing the tag association table, so multiple links to different tags can be formed during a single query tag_table = aliased(HistoryDatasetAssociationTagAssociation) view.query = view.query.filter(HistoryDatasetAssociation.id == tag_table.history_dataset_association_id) tmp = right.split(":") view.query = view.query.filter(tag_table.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(tag_table.user_value == tmp[1]) else: raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]def history_dataset_extended_metadata_filter(view, left, operator, right): view.do_query = True if "extended_metadata_joined" not in view.state: view.query = view.query.join(ExtendedMetadata) view.state["extended_metadata_joined"] = True alias = aliased(ExtendedMetadataIndex) field = f"/{'/'.join(left.split('.')[1:])}" view.query = view.query.filter( and_(ExtendedMetadata.id == alias.extended_metadata_id, alias.path == field, alias.value == str(right)) )
[docs]class HistoryDatasetView(ViewQueryBaseClass): DOMAIN = "history_dataset" FIELDS = { "name": ViewField("name", sqlalchemy_field=(HistoryDatasetAssociation, "name")), "id": ViewField("id", sqlalchemy_field=(HistoryDatasetAssociation, "id"), id_decode=True), "history_id": ViewField( "history_id", sqlalchemy_field=(HistoryDatasetAssociation, "history_id"), id_decode=True ), "tag": ViewField("tag", handler=history_dataset_handle_tag), "copied_from_ldda_id": ViewField( "copied_from_ldda_id", sqlalchemy_field=(HistoryDatasetAssociation, "copied_from_library_dataset_dataset_association_id"), id_decode=True, ), "copied_from_hda_id": ViewField( "copied_from_hda_id", sqlalchemy_field=(HistoryDatasetAssociation, "copied_from_history_dataset_association_id"), id_decode=True, ), "deleted": ViewField("deleted", sqlalchemy_field=(HistoryDatasetAssociation, "deleted")), "extended_metadata": ViewField("extended_metadata", handler=history_dataset_extended_metadata_filter), }
[docs] def search(self, trans): self.query = trans.sa_session.query(HistoryDatasetAssociation)
################## # History Searching ##################
[docs]def history_handle_tag(view, left, operator, right): if operator == "=": view.do_query = True tag_table = aliased(HistoryTagAssociation) view.query = view.query.filter(History.id == tag_table.history_id) tmp = right.split(":") view.query = view.query.filter(tag_table.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(tag_table.user_value == tmp[1]) else: raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]def history_handle_annotation(view, left, operator, right): if operator == "=": view.do_query = True view.query = view.query.filter( and_( HistoryAnnotationAssociation.history_id == History.id, HistoryAnnotationAssociation.annotation == right ) ) elif operator == "like": view.do_query = True view.query = view.query.filter( and_( HistoryAnnotationAssociation.history_id == History.id, HistoryAnnotationAssociation.annotation.like(right), ) ) else: raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]class HistoryView(ViewQueryBaseClass): DOMAIN = "history" FIELDS = { "name": ViewField("name", sqlalchemy_field=(History, "name")), "id": ViewField("id", sqlalchemy_field=(History, "id"), id_decode=True), "tag": ViewField("tag", handler=history_handle_tag), "annotation": ViewField("annotation", handler=history_handle_annotation), "deleted": ViewField("deleted", sqlalchemy_field=(History, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(History)
################## # Workflow Searching ##################
[docs]def workflow_tag_handler(view, left, operator, right): if operator == "=": view.do_query = True view.query = view.query.filter(StoredWorkflow.id == StoredWorkflowTagAssociation.stored_workflow_id) tmp = right.split(":") view.query = view.query.filter(StoredWorkflowTagAssociation.user_tname == tmp[0]) if len(tmp) > 1: view.query = view.query.filter(StoredWorkflowTagAssociation.user_value == tmp[1]) else: raise GalaxyParseError(f"Invalid comparison operator: {operator}")
[docs]class WorkflowView(ViewQueryBaseClass): DOMAIN = "workflow" FIELDS = { "name": ViewField("name", sqlalchemy_field=(StoredWorkflow, "name")), "id": ViewField("id", sqlalchemy_field=(StoredWorkflow, "id"), id_decode=True), "tag": ViewField("tag", handler=workflow_tag_handler), "deleted": ViewField("deleted", sqlalchemy_field=(StoredWorkflow, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(StoredWorkflow)
################## # Job Searching ##################
[docs]def job_param_filter(view, left, operator, right): view.do_query = True alias = aliased(JobParameter) param_name = re.sub(r"^param.", "", left) view.query = view.query.filter(and_(Job.id == alias.job_id, alias.name == param_name, alias.value == dumps(right)))
[docs]def job_input_hda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToInputDatasetAssociation) param_name = re.sub(r"^input_hda.", "", left) view.query = view.query.filter(and_(Job.id == alias.job_id, alias.name == param_name, alias.dataset_id == right))
[docs]def job_input_ldda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToInputLibraryDatasetAssociation) param_name = re.sub(r"^input_ldda.", "", left) view.query = view.query.filter(and_(Job.id == alias.job_id, alias.name == param_name, alias.ldda_id == right))
[docs]def job_output_hda_filter(view, left, operator, right): view.do_query = True alias = aliased(JobToOutputDatasetAssociation) param_name = re.sub(r"^output_hda.", "", left) view.query = view.query.filter(and_(Job.id == alias.job_id, alias.name == param_name, alias.dataset_id == right))
[docs]class JobView(ViewQueryBaseClass): DOMAIN = "job" FIELDS = { "tool_name": ViewField("tool_name", sqlalchemy_field=(Job, "tool_id")), "state": ViewField("state", sqlalchemy_field=(Job, "state")), "param": ViewField("param", handler=job_param_filter), "input_ldda": ViewField("input_ldda", handler=job_input_ldda_filter, id_decode=True), "input_hda": ViewField("input_hda", handler=job_input_hda_filter, id_decode=True), "output_hda": ViewField("output_hda", handler=job_output_hda_filter, id_decode=True), }
[docs] def search(self, trans): self.query = trans.sa_session.query(Job)
################## # Page Searching ##################
[docs]class PageView(ViewQueryBaseClass): DOMAIN = "page" FIELDS = { "id": ViewField("id", sqlalchemy_field=(Page, "id"), id_decode=True), "slug": ViewField("slug", sqlalchemy_field=(Page, "slug")), "title": ViewField("title", sqlalchemy_field=(Page, "title")), "deleted": ViewField("deleted", sqlalchemy_field=(Page, "deleted")), }
[docs] def search(self, trans): self.query = trans.sa_session.query(Page)
################## # Page Revision Searching ##################
[docs]class PageRevisionView(ViewQueryBaseClass): DOMAIN = "page_revision" FIELDS = { "id": ViewField("id", sqlalchemy_field=(PageRevision, "id"), id_decode=True), "title": ViewField("title", sqlalchemy_field=(PageRevision, "title")), "page_id": ViewField("page_id", sqlalchemy_field=(PageRevision, "page_id"), id_decode=True), }
[docs] def search(self, trans): self.query = trans.sa_session.query(PageRevision)
# The view mapping takes a user's name for a table and maps it to a View class # that will handle queries. view_mapping = { "library": LibraryView, "library_folder": LibraryFolderView, "library_dataset_dataset": LibraryDatasetDatasetView, "library_dataset": LibraryDatasetView, "lda": LibraryDatasetView, "ldda": LibraryDatasetDatasetView, "history_dataset": HistoryDatasetView, "hda": HistoryDatasetView, "history": HistoryView, "workflow": WorkflowView, "tool": ToolView, "job": JobView, "page": PageView, "page_revision": PageRevisionView, } # The GQL gramar is defined in Parsley syntax ( https://parsley.readthedocs.io/ ) gqlGrammar = r""" expr = 'select' bs field_desc:f bs 'from' bs word:t ( bs 'where' bs conditional:c ws -> GalaxyQuery(f,t,c) | ws -> GalaxyQuery(f, t, None) ) bs = ' '+ ws = ' '* field_desc = ( '*' -> ['*'] | field_list ) field_list = field_name:x ( ws ',' ws field_list:y -> [x] + y | -> [x] ) conditional = logic_statement:x ( bs 'and' bs conditional:y -> GalaxyQueryAnd(x,y) | -> x ) word = alphanum+:x -> "".join(x) field_name = word:x ( '.' quote_word:y -> x + "." + y |-> x ) alphanum = anything:x ?(re.search(r'\w', x) is not None) -> x logic_statement = field_name:left ws comparison:comp ws value_word:right -> GalaxyQueryComparison(left, comp, right) value_word = ( 'false' -> False | 'False' -> False | 'true' -> True | 'True' -> True | 'None' -> None | quote_word ) comparison = ( '=' -> '=' | '>' -> '>' | '<' -> '<' | '!=' -> '!=' | '>=' -> '>=' | '<=' -> '<=' | 'like' -> 'like' ) quote_word = "'" not_quote*:x "'" -> "".join(x) not_quote = anything:x ?(x != "'") -> x not_dquote = anything:x ?(x != '"') -> x """
[docs]class GalaxyQuery: """ This class represents a data structure of a compiled GQL query """
[docs] def __init__(self, field_list, table_name, conditional): self.field_list = field_list self.table_name = table_name self.conditional = conditional
[docs]class GalaxyQueryComparison: """ This class represents the data structure of the comparison arguments of a compiled GQL query (ie where name='Untitled History') """
[docs] def __init__(self, left, operator, right): self.left = left self.operator = operator self.right = right
[docs]class GalaxyQueryAnd: """ This class represents the data structure of the comparison arguments of a compiled GQL query (ie where name='Untitled History') """
[docs] def __init__(self, left, right): self.left = left self.operator = "and" self.right = right
[docs]class GalaxyParseError(Exception): pass
[docs]class SearchQuery:
[docs] def __init__(self, view, query): self.view = view self.query = query
[docs] def decode_query_ids(self, trans): if self.query.conditional is not None: self.view.decode_query_ids(trans, self.query.conditional)
[docs] def process(self, trans): self.view.search(trans) if self.query.conditional is not None: self.view.filter(self.query.conditional.left, self.query.conditional.operator, self.query.conditional.right) return self.view.get_results(True)
[docs] def item_to_api_value(self, item): r = item.to_dict(view="element") if self.query.field_list.count("*"): return r o = {} for a in r: if a in self.query.field_list: o[a] = r[a] return o
[docs]class GalaxySearchEngine: """ Primary class for searching. Parses GQL (Galaxy Query Language) queries and returns a 'SearchQuery' class """
[docs] def __init__(self): self.parser = parsley.makeGrammar( gqlGrammar, { "re": re, "GalaxyQuery": GalaxyQuery, "GalaxyQueryComparison": GalaxyQueryComparison, "GalaxyQueryAnd": GalaxyQueryAnd, }, )
[docs] def query(self, query_text): q = self.parser(query_text).expr() if q.table_name in view_mapping: view = view_mapping[q.table_name]() return SearchQuery(view, q) raise GalaxyParseError(f"No such table {q.table_name}")