Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.history_contents

"""
Heterogenous lists/contents are difficult to query properly since unions are
not easily made.
"""
import logging

from sqlalchemy import (
    asc,
    desc,
    false,
    func,
    literal,
    sql,
    true
)
from sqlalchemy.orm import (
    eagerload,
    undefer
)

from galaxy import (
    exceptions as glx_exceptions,
    model
)
from galaxy.managers import (
    annotatable,
    base,
    containers,
    deletable,
    hdas,
    hdcas,
    taggable,
    tools
)

log = logging.getLogger(__name__)


# into its own class to have it's own filters, etc.
# TODO: but can't inherit from model manager (which assumes only one model)
[docs]class HistoryContentsManager(containers.ContainerManagerMixin): root_container_class = model.History contained_class = model.HistoryDatasetAssociation contained_class_manager_class = hdas.HDAManager contained_class_type_name = 'dataset' subcontainer_class = model.HistoryDatasetCollectionAssociation subcontainer_class_manager_class = hdcas.HDCAManager subcontainer_class_type_name = 'dataset_collection' #: the columns which are common to both subcontainers and non-subcontainers. # (Also the attributes that may be filtered or orderered_by) common_columns = ( "history_id", "history_content_type", "id", "type_id", "hid", # joining columns "extension", "dataset_id", "collection_id", "name", "state", "deleted", "purged", "visible", "create_time", "update_time", ) default_order_by = 'hid'
[docs] def __init__(self, app): self.app = app self.contained_manager = self.contained_class_manager_class(app) self.subcontainer_manager = self.subcontainer_class_manager_class(app)
# ---- interface
[docs] def contained(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns non-subcontainer objects within `container`. """ filter_to_inside_container = self._get_filter_for_contained(container, self.contained_class) filters = base.munge_lists(filter_to_inside_container, filters) return self.contained_manager.list(filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
[docs] def subcontainers(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns only the containers within `container`. """ filter_to_inside_container = self._get_filter_for_contained(container, self.subcontainer_class) filters = base.munge_lists(filter_to_inside_container, filters) # TODO: collections.DatasetCollectionManager doesn't have the list # return self.subcontainer_manager.list( filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs ) return self._session().query(self.subcontainer_class).filter(filters).all()
[docs] def contents(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a list of both/all types of contents, filtered and in some order. """ # TODO?: we could branch here based on 'if limit is None and offset is None' - to a simpler (non-union) query # for now, I'm just using this (even for non-limited/offset queries) to reduce code paths return self._union_of_contents(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
[docs] def contents_count(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a count of both/all types of contents, based on the given filters. """ return self.contents_query(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs).count()
[docs] def contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns the contents union query for subqueries, etc. """ return self._union_of_contents_query(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
# order_by parsing - similar to FilterParser but not enough yet to warrant a class?
[docs] def parse_order_by(self, order_by_string, default=None): """Return an ORM compatible order_by using the given string""" available = ['create_time', 'extension', 'hid', 'history_id', 'name', 'update_time'] for attribute in available: attribute_dsc = '%s-dsc' % attribute attribute_asc = '%s-asc' % attribute if order_by_string in (attribute, attribute_dsc): return desc(attribute) if order_by_string == attribute_asc: return asc(attribute) if default: return self.parse_order_by(default) raise glx_exceptions.RequestParameterInvalidException('Unknown order_by', order_by=order_by_string, available=available)
# history specific methods
[docs] def state_counts(self, history): """ Return a dictionary containing the counts of all contents in each state keyed by the distinct states. Note: does not include deleted/hidden contents. """ filters = [ base.ModelFilterParser.parsed_filter("orm", sql.column('deleted') == false()), base.ModelFilterParser.parsed_filter("orm", sql.column('visible') == true()) ] contents_subquery = self._union_of_contents_query(history, filters=filters).subquery() statement = (sql.select([sql.column('state'), func.count('*')]) .select_from(contents_subquery) .group_by(sql.column('state'))) counts = self.app.model.context.execute(statement).fetchall() return dict(counts)
[docs] def active_counts(self, history): """ Return a dictionary keyed with 'deleted', 'hidden', and 'active' with values for each representing the count of contents in each state. Note: counts for deleted and hidden overlap; In other words, a dataset that's both deleted and hidden will be added to both totals. """ returned = dict(deleted=0, hidden=0, active=0) contents_subquery = self._union_of_contents_query(history).subquery() columns = [ sql.column('deleted'), sql.column('visible'), func.count('*') ] statement = (sql.select(columns) .select_from(contents_subquery) .group_by(sql.column('deleted'), sql.column('visible'))) groups = self.app.model.context.execute(statement).fetchall() for deleted, visible, count in groups: if deleted: returned['deleted'] += count if not visible: returned['hidden'] += count if not deleted and visible: returned['active'] += count return returned
[docs] def map_datasets(self, history, fn, **kwargs): """ Iterate over the datasets of a given history, recursing into collections, and calling fn on each dataset. Uses the same kwargs as `contents` above. """ returned = [] contents = self.contents(history, **kwargs) for content in contents: if isinstance(content, self.subcontainer_class): processed_list = self.subcontainer_manager.map_datasets(content, fn) returned.extend(processed_list) else: processed = fn(content) returned.append(processed) return returned
# ---- private def _session(self): return self.app.model.context def _filter_to_contents_query(self, container, content_class, **kwargs): # TODO: use list (or by_history etc.) container_filter = self._get_filter_for_contained(container, content_class) query = self._session().query(content_class).filter(container_filter) return query def _get_filter_for_contained(self, container, content_class): return content_class.history == container def _union_of_contents(self, container, expand_models=True, **kwargs): """ Returns a limited and offset list of both types of contents, filtered and in some order. """ contents_results = self._union_of_contents_query(container, **kwargs).all() if not expand_models: return contents_results # partition ids into a map of { component_class names -> list of ids } from the above union query id_map = dict(((self.contained_class_type_name, []), (self.subcontainer_class_type_name, []))) for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) if result_type in id_map: id_map[result_type].append(contents_id) else: raise TypeError('Unknown contents type:', result_type) # query 2 & 3: use the ids to query each component_class, returning an id->full component model map contained_ids = id_map[self.contained_class_type_name] id_map[self.contained_class_type_name] = self._contained_id_map(contained_ids) subcontainer_ids = id_map[self.subcontainer_class_type_name] serialization_params = kwargs.get('serialization_params', None) id_map[self.subcontainer_class_type_name] = self._subcontainer_id_map(subcontainer_ids, serialization_params=serialization_params) # cycle back over the union query to create an ordered list of the objects returned in queries 2 & 3 above contents = [] filters = kwargs.get('filters') or [] # TODO: or as generator? for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) content = id_map[result_type][contents_id] if self.passes_filters(content, filters): contents.append(content) return contents
[docs] @staticmethod def passes_filters(content, filters): for filter_fn in filters: if filter_fn.filter_type == 'function': if not filter_fn.filter(content): return False return True
def _union_of_contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, user_id=None, **kwargs): """ Returns a query for a limited and offset list of both types of contents, filtered and in some order. """ order_by = order_by if order_by is not None else self.default_order_by order_by = order_by if isinstance(order_by, (tuple, list)) else (order_by, ) # TODO: 3 queries and 3 iterations over results - this is undoubtedly better solved in the actual SQL layer # via one common table for contents, Some Yonder Resplendent and Fanciful Join, or ORM functionality # Here's the (bizarre) strategy: # 1. create a union of common columns between contents classes - filter, order, and limit/offset this # 2. extract the ids returned from 1 for each class, query each content class by that id list # 3. use the results/order from 1 to recombine/merge the 2+ query result lists from 2, return that # note: I'm trying to keep these private functions as generic as possible in order to move them toward base later # query 1: create a union of common columns for which the component_classes can be filtered/limited contained_query = self._contents_common_query_for_contained(history_id=container.id if container else None, user_id=user_id) subcontainer_query = self._contents_common_query_for_subcontainer(history_id=container.id if container else None, user_id=user_id) filters = filters or [] # Apply filters that are specific to a model for orm_filter in filters: if orm_filter.filter_type == "orm_function": contained_query = contained_query.filter(orm_filter.filter(self.contained_class)) subcontainer_query = subcontainer_query.filter(orm_filter.filter(self.subcontainer_class)) elif orm_filter.filter_type == "orm": contained_query = self._apply_orm_filter(contained_query, orm_filter.filter) subcontainer_query = self._apply_orm_filter(subcontainer_query, orm_filter.filter) contents_query = contained_query.union_all(subcontainer_query) contents_query = contents_query.order_by(*order_by) if limit is not None: contents_query = contents_query.limit(limit) if offset is not None: contents_query = contents_query.offset(offset) return contents_query def _apply_orm_filter(self, qry, orm_filter): if isinstance(orm_filter, sql.elements.BinaryExpression): for match in filter(lambda col: col['name'] == orm_filter.left.name, qry.column_descriptions): column = match['expr'] new_filter = orm_filter._clone() new_filter.left = column qry = qry.filter(new_filter) return qry def _contents_common_columns(self, component_class, **kwargs): columns = [] # pull column from class by name or override with kwargs if listed there, then label for column_name in self.common_columns: if column_name in kwargs: column = kwargs.get(column_name, None) elif column_name == "model_class": column = literal(component_class.__name__) else: column = getattr(component_class, column_name) column = column.label(column_name) columns.append(column) return columns def _contents_common_query_for_contained(self, history_id, user_id): component_class = self.contained_class # TODO: and now a join with Dataset - this is getting sad columns = self._contents_common_columns(component_class, history_content_type=literal('dataset'), state=model.Dataset.state, # do not have inner collections collection_id=literal(None) ) subquery = self._session().query(*columns) # for the HDA's we need to join the Dataset since it has an actual state column subquery = subquery.join(model.Dataset, model.Dataset.id == component_class.dataset_id) if history_id: subquery = subquery.filter(component_class.history_id == history_id) else: # Make sure we only return items that are user-accessible by checking that they are in a history # owned by the current user. # TODO: move into filter mixin, and implement accessible logic as SQL query subquery = subquery.filter(component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id) return subquery def _contents_common_query_for_subcontainer(self, history_id, user_id): component_class = self.subcontainer_class columns = self._contents_common_columns(component_class, history_content_type=literal('dataset_collection'), # do not have datasets dataset_id=literal(None), state=model.DatasetCollection.populated_state, # TODO: should be purgable? fix purged=literal(False), extension=literal(None), # these are attached instead to the inner collection joined below create_time=model.DatasetCollection.create_time, update_time=model.DatasetCollection.update_time ) subquery = self._session().query(*columns) # for the HDCA's we need to join the DatasetCollection since it has update/create times subquery = subquery.join(model.DatasetCollection, model.DatasetCollection.id == component_class.collection_id) if history_id: subquery = subquery.filter(component_class.history_id == history_id) else: subquery = subquery.filter(component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id) return subquery def _get_union_type(self, union): """Return the string name of the class for this row in the union results""" return str(union[1]) def _get_union_id(self, union): """Return the id for this row in the union results""" return union[2] def _contained_id_map(self, id_list): """Return an id to model map of all contained-type models in the id_list.""" if not id_list: return [] component_class = self.contained_class query = (self._session().query(component_class) .filter(component_class.id.in_(id_list)) .options(undefer('_metadata')) .options(eagerload('dataset.actions')) .options(eagerload('tags')) .options(eagerload('annotations'))) return {row.id: row for row in query.all()} def _subcontainer_id_map(self, id_list, serialization_params=None): """Return an id to model map of all subcontainer-type models in the id_list.""" if not id_list: return [] component_class = self.subcontainer_class query = (self._session().query(component_class) .filter(component_class.id.in_(id_list)) .options(eagerload('collection')) .options(eagerload('tags')) .options(eagerload('annotations'))) # This will conditionally join a potentially costly job_state summary # All the paranoia if-checking makes me wonder if serialization_params # should really be a property of the manager class instance if serialization_params and serialization_params['keys']: if 'job_state_summary' in serialization_params['keys']: query = query.options(eagerload('job_state_summary')) return {row.id: row for row in query.all()}
[docs]class HistoryContentsSerializer(base.ModelSerializer, deletable.PurgableSerializerMixin): """ Interface/service object for serializing histories into dictionaries. """ model_manager_class = HistoryContentsManager
[docs] def __init__(self, app, **kwargs): super().__init__(app, **kwargs) self.default_view = 'summary' self.add_view('summary', [ "id", "type_id", "history_id", "hid", "history_content_type", "visible", "dataset_id", "collection_id", "name", "state", "deleted", "purged", "create_time", "update_time", ])
# assumes: outgoing to json.dumps and sanitized
[docs] def add_serializers(self): super().add_serializers() deletable.PurgableSerializerMixin.add_serializers(self) self.serializers.update({ 'type_id' : self.serialize_type_id, 'history_id' : self.serialize_id, 'dataset_id' : self.serialize_id_or_skip, 'collection_id' : self.serialize_id_or_skip, })
[docs] def serialize_id_or_skip(self, content, key, **context): """Serialize id or skip if attribute with `key` is not present.""" if not hasattr(content, key): raise base.SkipAttribute('no such attribute') return self.serialize_id(content, key, **context)
[docs]class HistoryContentsFilters(base.ModelFilterParser, annotatable.AnnotatableFilterMixin, deletable.PurgableFiltersMixin, taggable.TaggableFilterMixin, tools.ToolFilterMixin): # surprisingly (but ominously), this works for both content classes in the union that's filtered model_class = model.HistoryDatasetAssociation subcontainer_model_class = model.HistoryDatasetCollectionAssociation def _parse_orm_filter(self, attr, op, val): # we need to use some manual/text/column fu here since some where clauses on the union don't work # using the model_class defined above - they need to be wrapped in their own .column() # (and some of these are *not* a normal columns (especially 'state') anyway) # TODO: genericize these - can probably extract a _get_column( attr, ... ) or something # special cases...special cases everywhere def get_filter(attr, op, val): if attr == 'history_content_type' and op == 'eq': if val in ('dataset', 'dataset_collection'): return sql.column('history_content_type') == val self.raise_filter_err(attr, op, val, 'bad op in filter') if attr == 'type_id': if op == 'eq': return sql.column('type_id') == val if op == 'in': return sql.column('type_id').in_(self.parse_type_id_list(val)) self.raise_filter_err(attr, op, val, 'bad op in filter') if attr in ('update_time', 'create_time'): if op == 'ge': return sql.column(attr) >= self.parse_date(val) if op == 'le': return sql.column(attr) <= self.parse_date(val) if op == 'gt': return sql.column(attr) > self.parse_date(val) if op == 'lt': return sql.column(attr) < self.parse_date(val) self.raise_filter_err(attr, op, val, 'bad op in filter') if attr == 'state': valid_states = model.Dataset.states.values() if op == 'eq': if val not in valid_states: self.raise_filter_err(attr, op, val, 'invalid state in filter') return sql.column('state') == val if op == 'in': states = [s for s in val.split(',') if s] for state in states: if state not in valid_states: self.raise_filter_err(attr, op, state, 'invalid state in filter') return sql.column('state').in_(states) self.raise_filter_err(attr, op, val, 'bad op in filter') column_filter = get_filter(attr, op, val) if column_filter is not None: return self.parsed_filter(filter_type='orm', filter=column_filter) return super()._parse_orm_filter(attr, op, val)
[docs] def decode_type_id(self, type_id): TYPE_ID_SEP = '-' split = type_id.split(TYPE_ID_SEP, 1) return TYPE_ID_SEP.join((split[0], str(self.app.security.decode_id(split[1]))))
[docs] def parse_type_id_list(self, type_id_list_string, sep=','): """ Split `type_id_list_string` at `sep`. """ return [self.decode_type_id(type_id) for type_id in type_id_list_string.split(sep)]
def _add_parsers(self): super()._add_parsers() annotatable.AnnotatableFilterMixin._add_parsers(self) deletable.PurgableFiltersMixin._add_parsers(self) taggable.TaggableFilterMixin._add_parsers(self) tools.ToolFilterMixin._add_parsers(self) self.orm_filter_parsers.update({ 'history_content_type' : {'op': ('eq')}, 'type_id' : {'op': ('eq', 'in'), 'val': self.parse_type_id_list}, 'hid' : {'op': ('eq', 'ge', 'le'), 'val': int}, # TODO: needs a different val parser - but no way to add to the above # 'hid-in' : { 'op': ( 'in' ), 'val': self.parse_int_list }, 'name' : {'op': ('eq', 'contains', 'like')}, 'state' : {'op': ('eq', 'in')}, 'visible' : {'op': ('eq'), 'val': self.parse_bool}, 'create_time' : {'op': ('le', 'ge', 'lt', 'gt'), 'val': self.parse_date}, 'update_time' : {'op': ('le', 'ge', 'lt', 'gt'), 'val': self.parse_date}, })