Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.history_contents

"""
Heterogenous lists/contents are difficult to query properly since unions are
not easily made.
"""
import logging

from sqlalchemy import (
    asc,
    desc,
    false,
    func,
    literal,
    sql,
    true
)
from sqlalchemy.orm import (
    eagerload,
    undefer
)

from galaxy import (
    exceptions as glx_exceptions,
    model
)
from galaxy.managers import (
    annotatable,
    base,
    containers,
    deletable,
    hdas,
    hdcas,
    taggable,
    tools
)

log = logging.getLogger(__name__)


# into its own class to have it's own filters, etc.
# TODO: but can't inherit from model manager (which assumes only one model)
[docs]class HistoryContentsManager(containers.ContainerManagerMixin): root_container_class = model.History contained_class = model.HistoryDatasetAssociation contained_class_manager_class = hdas.HDAManager contained_class_type_name = 'dataset' subcontainer_class = model.HistoryDatasetCollectionAssociation subcontainer_class_manager_class = hdcas.HDCAManager subcontainer_class_type_name = 'dataset_collection' #: the columns which are common to both subcontainers and non-subcontainers. # (Also the attributes that may be filtered or orderered_by) common_columns = ( "history_id", "history_content_type", "id", "type_id", "hid", # joining columns "extension", "dataset_id", "collection_id", "name", "state", "deleted", "purged", "visible", "create_time", "update_time", ) default_order_by = 'hid'
[docs] def __init__(self, app): self.app = app self.contained_manager = self.contained_class_manager_class(app) self.subcontainer_manager = self.subcontainer_class_manager_class(app)
# ---- interface
[docs] def contained(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns non-subcontainer objects within `container`. """ filter_to_inside_container = self._get_filter_for_contained(container, self.contained_class) filters = base.munge_lists(filter_to_inside_container, filters) return self.contained_manager.list(filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
[docs] def subcontainers(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns only the containers within `container`. """ filter_to_inside_container = self._get_filter_for_contained(container, self.subcontainer_class) filters = base.munge_lists(filter_to_inside_container, filters) # TODO: collections.DatasetCollectionManager doesn't have the list # return self.subcontainer_manager.list( filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs ) return self._session().query(self.subcontainer_class).filter(filters).all()
[docs] def contents(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a list of both/all types of contents, filtered and in some order. """ # TODO?: we could branch here based on 'if limit is None and offset is None' - to a simpler (non-union) query # for now, I'm just using this (even for non-limited/offset queries) to reduce code paths return self._union_of_contents(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
[docs] def contents_count(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a count of both/all types of contents, based on the given filters. """ return self.contents_query(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs).count()
[docs] def contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns the contents union query for subqueries, etc. """ return self._union_of_contents_query(container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
# order_by parsing - similar to FilterParser but not enough yet to warrant a class?
[docs] def parse_order_by(self, order_by_string, default=None): """Return an ORM compatible order_by using the given string""" available = ['create_time', 'extension', 'hid', 'history_id', 'name', 'update_time'] for attribute in available: attribute_dsc = '%s-dsc' % attribute attribute_asc = '%s-asc' % attribute if order_by_string in (attribute, attribute_dsc): return desc(attribute) if order_by_string == attribute_asc: return asc(attribute) if default: return self.parse_order_by(default) raise glx_exceptions.RequestParameterInvalidException('Unknown order_by', order_by=order_by_string, available=available)
# history specific methods
[docs] def state_counts(self, history): """ Return a dictionary containing the counts of all contents in each state keyed by the distinct states. Note: does not include deleted/hidden contents. """ filters = [ base.ModelFilterParser.parsed_filter("orm", sql.column('deleted') == false()), base.ModelFilterParser.parsed_filter("orm", sql.column('visible') == true()) ] contents_subquery = self._union_of_contents_query(history, filters=filters).subquery() statement = (sql.select([sql.column('state'), func.count('*')]) .select_from(contents_subquery) .group_by(sql.column('state'))) counts = self.app.model.context.execute(statement).fetchall() return dict(counts)
[docs] def active_counts(self, history): """ Return a dictionary keyed with 'deleted', 'hidden', and 'active' with values for each representing the count of contents in each state. Note: counts for deleted and hidden overlap; In other words, a dataset that's both deleted and hidden will be added to both totals. """ returned = dict(deleted=0, hidden=0, active=0) contents_subquery = self._union_of_contents_query(history).subquery() columns = [ sql.column('deleted'), sql.column('visible'), func.count('*') ] statement = (sql.select(columns) .select_from(contents_subquery) .group_by(sql.column('deleted'), sql.column('visible'))) groups = self.app.model.context.execute(statement).fetchall() for deleted, visible, count in groups: if deleted: returned['deleted'] += count if not visible: returned['hidden'] += count if not deleted and visible: returned['active'] += count return returned
[docs] def map_datasets(self, history, fn, **kwargs): """ Iterate over the datasets of a given history, recursing into collections, and calling fn on each dataset. Uses the same kwargs as `contents` above. """ returned = [] contents = self.contents(history, **kwargs) for content in contents: if isinstance(content, self.subcontainer_class): processed_list = self.subcontainer_manager.map_datasets(content, fn) returned.extend(processed_list) else: processed = fn(content) returned.append(processed) return returned
# ---- private def _session(self): return self.app.model.context def _filter_to_contents_query(self, container, content_class, **kwargs): # TODO: use list (or by_history etc.) container_filter = self._get_filter_for_contained(container, content_class) query = self._session().query(content_class).filter(container_filter) return query def _get_filter_for_contained(self, container, content_class): return content_class.history == container def _union_of_contents(self, container, expand_models=True, **kwargs): """ Returns a limited and offset list of both types of contents, filtered and in some order. """ contents_results = self._union_of_contents_query(container, **kwargs).all() if not expand_models: return contents_results # partition ids into a map of { component_class names -> list of ids } from the above union query id_map = dict(((self.contained_class_type_name, []), (self.subcontainer_class_type_name, []))) for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) if result_type in id_map: id_map[result_type].append(contents_id) else: raise TypeError('Unknown contents type:', result_type) # query 2 & 3: use the ids to query each component_class, returning an id->full component model map contained_ids = id_map[self.contained_class_type_name] id_map[self.contained_class_type_name] = self._contained_id_map(contained_ids) subcontainer_ids = id_map[self.subcontainer_class_type_name] serialization_params = kwargs.get('serialization_params', None) id_map[self.subcontainer_class_type_name] = self._subcontainer_id_map(subcontainer_ids, serialization_params=serialization_params) # cycle back over the union query to create an ordered list of the objects returned in queries 2 & 3 above contents = [] filters = kwargs.get('filters') or [] # TODO: or as generator? for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) content = id_map[result_type][contents_id] if self.passes_filters(content, filters): contents.append(content) return contents
[docs] @staticmethod def passes_filters(content, filters): for filter_fn in filters: if filter_fn.filter_type == 'function': if not filter_fn.filter(content): return False return True
def _union_of_contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, user_id=None, **kwargs): """ Returns a query for a limited and offset list of both types of contents, filtered and in some order. """ order_by = order_by if order_by is not None else self.default_order_by order_by = order_by if isinstance(order_by, (tuple, list)) else (order_by, ) # TODO: 3 queries and 3 iterations over results - this is undoubtedly better solved in the actual SQL layer # via one common table for contents, Some Yonder Resplendent and Fanciful Join, or ORM functionality # Here's the (bizarre) strategy: # 1. create a union of common columns between contents classes - filter, order, and limit/offset this # 2. extract the ids returned from 1 for each class, query each content class by that id list # 3. use the results/order from 1 to recombine/merge the 2+ query result lists from 2, return that # note: I'm trying to keep these private functions as generic as possible in order to move them toward base later # query 1: create a union of common columns for which the component_classes can be filtered/limited contained_query = self._contents_common_query_for_contained(history_id=container.id if container else None, user_id=user_id) subcontainer_query = self._contents_common_query_for_subcontainer(history_id=container.id if container else None, user_id=user_id) filters = filters or [] # Apply filters that are specific to a model for orm_filter in filters: if orm_filter.filter_type == "orm_function": contained_query = contained_query.filter(orm_filter.filter(self.contained_class)) subcontainer_query = subcontainer_query.filter(orm_filter.filter(self.subcontainer_class)) elif orm_filter.filter_type == "orm": contained_query = self._apply_orm_filter(contained_query, orm_filter.filter) subcontainer_query = self._apply_orm_filter(subcontainer_query, orm_filter.filter) contents_query = contained_query.union_all(subcontainer_query) contents_query = contents_query.order_by(*order_by) if limit is not None: contents_query = contents_query.limit(limit) if offset is not None: contents_query = contents_query.offset(offset) return contents_query def _apply_orm_filter(self, qry, orm_filter): if isinstance(orm_filter, sql.elements.BinaryExpression): for match in filter(lambda col: col['name'] == orm_filter.left.name, qry.column_descriptions): column = match['expr'] new_filter = orm_filter._clone() new_filter.left = column qry = qry.filter(new_filter) return qry def _contents_common_columns(self, component_class, **kwargs): columns = [] # pull column from class by name or override with kwargs if listed there, then label for column_name in self.common_columns: if column_name in kwargs: column = kwargs.get(column_name, None) elif column_name == "model_class": column = literal(component_class.__name__) else: column = getattr(component_class, column_name) column = column.label(column_name) columns.append(column) return columns def _contents_common_query_for_contained(self, history_id, user_id): component_class = self.contained_class # TODO: and now a join with Dataset - this is getting sad columns = self._contents_common_columns(component_class, history_content_type=literal('dataset'), state=model.Dataset.state, # do not have inner collections collection_id=literal(None) ) subquery = self._session().query(*columns) # for the HDA's we need to join the Dataset since it has an actual state column subquery = subquery.join(model.Dataset, model.Dataset.id == component_class.dataset_id) if history_id: subquery = subquery.filter(component_class.history_id == history_id) else: # Make sure we only return items that are user-accessible by checking that they are in a history # owned by the current user. # TODO: move into filter mixin, and implement accessible logic as SQL query subquery = subquery.filter(component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id) return subquery def _contents_common_query_for_subcontainer(self, history_id, user_id): component_class = self.subcontainer_class columns = self._contents_common_columns(component_class, history_content_type=literal('dataset_collection'), # do not have datasets dataset_id=literal(None), state=model.DatasetCollection.populated_state, # TODO: should be purgable? fix purged=literal(False), extension=literal(None), # these are attached instead to the inner collection joined below create_time=model.DatasetCollection.create_time, update_time=model.DatasetCollection.update_time ) subquery = self._session().query(*columns) # for the HDCA's we need to join the DatasetCollection since it has update/create times subquery = subquery.join(model.DatasetCollection, model.DatasetCollection.id == component_class.collection_id) if history_id: subquery = subquery.filter(component_class.history_id == history_id) else: subquery = subquery.filter(component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id) return subquery def _get_union_type(self, union): """Return the string name of the class for this row in the union results""" return str(union[1]) def _get_union_id(self, union): """Return the id for this row in the union results""" return union[2] def _contained_id_map(self, id_list): """Return an id to model map of all contained-type models in the id_list.""" if not id_list: return [] component_class = self.contained_class query = (self._session().query(component_class) .filter(component_class.id.in_(id_list)) .options(undefer('_metadata')) .options(eagerload('dataset.actions')) .options(eagerload('tags')) .options(eagerload('annotations'))) return {row.id: row for row in query.all()} def _subcontainer_id_map(self, id_list, serialization_params=None): """Return an id to model map of all subcontainer-type models in the id_list.""" if not id_list: return [] component_class = self.subcontainer_class query = (self._session().query(component_class) .filter(component_class.id.in_(id_list)) .options(eagerload('collection')) .options(eagerload('tags')) .options(eagerload('annotations'))) # This will conditionally join a potentially costly job_state summary # All the paranoia if-checking makes me wonder if serialization_params # should really be a property of the manager class instance if serialization_params and serialization_params['keys']: if 'job_state_summary' in serialization_params['keys']: query = query.options(eagerload('job_state_summary')) return {row.id: row for row in query.all()}
[docs]class HistoryContentsSerializer(base.ModelSerializer, deletable.PurgableSerializerMixin): """ Interface/service object for serializing histories into dictionaries. """ model_manager_class = HistoryContentsManager
[docs] def __init__(self, app, **kwargs): super().__init__(app, **kwargs) self.default_view = 'summary' self.add_view('summary', [ "id", "type_id", "history_id", "hid", "history_content_type", "visible", "dataset_id", "collection_id", "name", "state", "deleted", "purged", "create_time", "update_time", ])
# assumes: outgoing to json.dumps and sanitized
[docs] def add_serializers(self): super().add_serializers() deletable.PurgableSerializerMixin.add_serializers(self) self.serializers.update({ 'type_id' : self.serialize_type_id, 'history_id' : self.serialize_id, 'dataset_id' : self.serialize_id_or_skip, 'collection_id' : self.serialize_id_or_skip, })
[docs] def serialize_id_or_skip(self, content, key, **context): """Serialize id or skip if attribute with `key` is not present.""" if not hasattr(content, key): raise base.SkipAttribute('no such attribute') return self.serialize_id(content, key, **context)
[docs]class HistoryContentsFilters(base.ModelFilterParser, annotatable.AnnotatableFilterMixin, deletable.PurgableFiltersMixin, taggable.TaggableFilterMixin, tools.ToolFilterMixin): # surprisingly (but ominously), this works for both content classes in the union that's filtered model_class = model.HistoryDatasetAssociation def _parse_orm_filter(self, attr, op, val): # we need to use some manual/text/column fu here since some where clauses on the union don't work # using the model_class defined above - they need to be wrapped in their own .column() # (and some of these are *not* a normal columns (especially 'state') anyway) # TODO: genericize these - can probably extract a _get_column( attr, ... ) or something # special cases...special cases everywhere def get_filter(attr, op, val): if attr == 'history_content_type' and op == 'eq': if val in ('dataset', 'dataset_collection'): return sql.column('history_content_type') == val self.raise_filter_err(attr, op, val, 'bad op in filter') if attr == 'type_id': if op == 'eq': return sql.column('type_id') == val if op == 'in': return sql.column('type_id').in_(self.parse_type_id_list(val)) self.raise_filter_err(attr, op, val, 'bad op in filter') if attr in ('update_time', 'create_time'): if op == 'ge': return sql.column(attr) >= self.parse_date(val) if op == 'le': return sql.column(attr) <= self.parse_date(val) if op == 'gt': return sql.column(attr) > self.parse_date(val) if op == 'lt': return sql.column(attr) < self.parse_date(val) self.raise_filter_err(attr, op, val, 'bad op in filter') if attr == 'state': valid_states = model.Dataset.states.values() if op == 'eq': if val not in valid_states: self.raise_filter_err(attr, op, val, 'invalid state in filter') return sql.column('state') == val if op == 'in': states = [s for s in val.split(',') if s] for state in states: if state not in valid_states: self.raise_filter_err(attr, op, state, 'invalid state in filter') return sql.column('state').in_(states) self.raise_filter_err(attr, op, val, 'bad op in filter') column_filter = get_filter(attr, op, val) if column_filter is not None: return self.parsed_filter(filter_type='orm', filter=column_filter) return super()._parse_orm_filter(attr, op, val)
[docs] def decode_type_id(self, type_id): TYPE_ID_SEP = '-' split = type_id.split(TYPE_ID_SEP, 1) return TYPE_ID_SEP.join((split[0], str(self.app.security.decode_id(split[1]))))
[docs] def parse_type_id_list(self, type_id_list_string, sep=','): """ Split `type_id_list_string` at `sep`. """ return [self.decode_type_id(type_id) for type_id in type_id_list_string.split(sep)]
def _add_parsers(self): super()._add_parsers() annotatable.AnnotatableFilterMixin._add_parsers(self) deletable.PurgableFiltersMixin._add_parsers(self) taggable.TaggableFilterMixin._add_parsers(self) tools.ToolFilterMixin._add_parsers(self) self.orm_filter_parsers.update({ 'history_content_type' : {'op': ('eq')}, 'type_id' : {'op': ('eq', 'in'), 'val': self.parse_type_id_list}, 'hid' : {'op': ('eq', 'ge', 'le', 'gt', 'lt'), 'val': int}, # TODO: needs a different val parser - but no way to add to the above # 'hid-in' : { 'op': ( 'in' ), 'val': self.parse_int_list }, 'name' : {'op': ('eq', 'contains', 'like')}, 'state' : {'op': ('eq', 'in')}, 'visible' : {'op': ('eq'), 'val': self.parse_bool}, 'create_time' : {'op': ('le', 'ge', 'lt', 'gt'), 'val': self.parse_date}, 'update_time' : {'op': ('le', 'ge', 'lt', 'gt'), 'val': self.parse_date}, })