Source code for galaxy.managers.history_contents

"""
Heterogenous lists/contents are difficult to query properly since unions are
not easily made.
"""

import json
import logging
from typing import (
    Any,
    Dict,
    List,
)

from sqlalchemy import (
    asc,
    cast,
    desc,
    false,
    func,
    Integer,
    literal,
    nullsfirst,
    nullslast,
    select,
    Select,
    sql,
    true,
    UnaryExpression,
)
from sqlalchemy.orm import (
    joinedload,
    undefer,
)

from galaxy import (
    exceptions as glx_exceptions,
    model,
)
from galaxy.managers import (
    annotatable,
    base,
    deletable,
    genomes,
    hdas,
    hdcas,
    taggable,
    tools,
)
from galaxy.managers.job_connections import JobConnectionsManager
from galaxy.schema import ValueFilterQueryParams
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import listify
from .base import (
    parse_bool,
    raise_filter_err,
    Serializer,
)

log = logging.getLogger(__name__)


# into its own class to have it's own filters, etc.
# TODO: but can't inherit from model manager (which assumes only one model)
[docs]class HistoryContentsManager(base.SortableManager): root_container_class = model.History contained_class = model.HistoryDatasetAssociation contained_class_manager_class = hdas.HDAManager contained_class_type_name = "dataset" subcontainer_class = model.HistoryDatasetCollectionAssociation subcontainer_class_manager_class = hdcas.HDCAManager subcontainer_class_type_name = "dataset_collection" #: the columns which are common to both subcontainers and non-subcontainers. # (Also the attributes that may be filtered or orderered_by) common_columns = ( "history_id", "history_content_type", "id", "type_id", "hid", # joining columns "extension", "dataset_id", "collection_id", "name", "state", "object_store_id", "size", "deleted", "purged", "visible", "create_time", "update_time", ) default_order_by = "hid"
[docs] def __init__(self, app: MinimalManagerApp): self.app = app self.contained_manager = app[self.contained_class_manager_class] self.subcontainer_manager = app[self.subcontainer_class_manager_class]
# ---- interface
[docs] def contents(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a list of both/all types of contents, filtered and in some order. """ # TODO?: we could branch here based on 'if limit is None and offset is None' - to a simpler (non-union) query # for now, I'm just using this (even for non-limited/offset queries) to reduce code paths return self._union_of_contents( container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs )
[docs] def contents_count(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns a count of both/all types of contents, based on the given filters. """ return self.contents_query( container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs ).count()
[docs] def contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs): """ Returns the contents union query for subqueries, etc. """ return self._union_of_contents_query( container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs )
# order_by parsing - similar to FilterParser but not enough yet to warrant a class?
[docs] def parse_order_by(self, order_by_string, default=None): """Return an ORM compatible order_by using the given string""" available = ["create_time", "extension", "hid", "history_id", "name", "update_time", "size"] for attribute in available: attribute_dsc = f"{attribute}-dsc" attribute_asc = f"{attribute}-asc" if order_by_string in (attribute, attribute_dsc): order_by: UnaryExpression = desc(attribute) if attribute == "size": return nullslast(order_by) return order_by if order_by_string == attribute_asc: order_by = asc(attribute) if attribute == "size": return nullsfirst(order_by) return order_by if default: return self.parse_order_by(default) raise glx_exceptions.RequestParameterInvalidException( "Unknown order_by", order_by=order_by_string, available=available )
# history specific methods
[docs] def state_counts(self, history): """ Return a dictionary containing the counts of all contents in each state keyed by the distinct states. Note: does not include deleted/hidden contents. """ filters = [ base.ModelFilterParser.parsed_filter("orm", sql.column("deleted") == false()), base.ModelFilterParser.parsed_filter("orm", sql.column("visible") == true()), ] contents_subquery = self._union_of_contents_query(history, filters=filters).subquery() statement: Select = ( select(sql.column("state"), func.count()).select_from(contents_subquery).group_by(sql.column("state")) ) counts = self.app.model.session().execute(statement).fetchall() return dict(counts)
[docs] def active_counts(self, history): """ Return a dictionary keyed with 'deleted', 'hidden', and 'active' with values for each representing the count of contents in each state. Note: counts for deleted and hidden overlap; In other words, a dataset that's both deleted and hidden will be added to both totals. """ hda_select = self._active_counts_statement(model.HistoryDatasetAssociation, history.id) hdca_select = self._active_counts_statement(model.HistoryDatasetCollectionAssociation, history.id) subquery = hda_select.union_all(hdca_select).subquery() statement = select( cast(func.coalesce(func.sum(subquery.c.deleted), 0), Integer).label("deleted"), cast(func.coalesce(func.sum(subquery.c.hidden), 0), Integer).label("hidden"), cast(func.coalesce(func.sum(subquery.c.active), 0), Integer).label("active"), ) returned = self.app.model.context.execute(statement).one() return dict(returned._mapping)
def _active_counts_statement(self, model_class, history_id): deleted_attr = model_class.deleted visible_attr = model_class.visible table_attr = model_class.table return ( select( func.sum(cast(deleted_attr, Integer)).label("deleted"), func.sum(cast(visible_attr == false(), Integer)).label("hidden"), func.sum(func.abs(cast(visible_attr, Integer) * (cast(deleted_attr, Integer) - 1))).label("active"), ) .select_from(table_attr) .filter_by(history_id=history_id) )
[docs] def map_datasets(self, history, fn, **kwargs): """ Iterate over the datasets of a given history, recursing into collections, and calling fn on each dataset. Uses the same kwargs as `contents` above. """ returned = [] contents = self.contents(history, **kwargs) for content in contents: if isinstance(content, self.subcontainer_class): processed_list = self.subcontainer_manager.map_datasets(content, fn) returned.extend(processed_list) else: processed = fn(content) returned.append(processed) return returned
# ---- private def _session(self): return self.app.model.context def _union_of_contents(self, container, expand_models=True, **kwargs): """ Returns a limited and offset list of both types of contents, filtered and in some order. """ contents_results = self._union_of_contents_query(container, **kwargs).all() if not expand_models: return contents_results # partition ids into a map of { component_class names -> list of ids } from the above union query id_map: Dict[str, List[int]] = {self.contained_class_type_name: [], self.subcontainer_class_type_name: []} for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) if result_type in id_map: id_map[result_type].append(contents_id) else: raise TypeError("Unknown contents type:", result_type) # query 2 & 3: use the ids to query each component_class, returning an id->full component model map contained_ids = id_map[self.contained_class_type_name] id_map[self.contained_class_type_name] = self._contained_id_map(contained_ids) subcontainer_ids = id_map[self.subcontainer_class_type_name] serialization_params = kwargs.get("serialization_params", None) id_map[self.subcontainer_class_type_name] = self._subcontainer_id_map( subcontainer_ids, serialization_params=serialization_params ) # cycle back over the union query to create an ordered list of the objects returned in queries 2 & 3 above contents = [] filters = kwargs.get("filters") or [] # TODO: or as generator? for result in contents_results: result_type = self._get_union_type(result) contents_id = self._get_union_id(result) content = id_map[result_type][contents_id] if self.passes_filters(content, filters): contents.append(content) return contents
[docs] @staticmethod def passes_filters(content, filters): for filter_fn in filters: if filter_fn.filter_type == "function": if not filter_fn.filter(content): return False return True
def _union_of_contents_query( self, container, filters=None, limit=None, offset=None, order_by=None, user_id=None, **kwargs ): """ Returns a query for a limited and offset list of both types of contents, filtered and in some order. """ order_by = order_by if order_by is not None else self.default_order_by order_by = order_by if isinstance(order_by, (tuple, list)) else (order_by,) # TODO: 3 queries and 3 iterations over results - this is undoubtedly better solved in the actual SQL layer # via one common table for contents, Some Yonder Resplendent and Fanciful Join, or ORM functionality # Here's the (bizarre) strategy: # 1. create a union of common columns between contents classes - filter, order, and limit/offset this # 2. extract the ids returned from 1 for each class, query each content class by that id list # 3. use the results/order from 1 to recombine/merge the 2+ query result lists from 2, return that # note: I'm trying to keep these private functions as generic as possible in order to move them toward base later # query 1: create a union of common columns for which the component_classes can be filtered/limited contained_query = self._contents_common_query_for_contained( history_id=container.id if container else None, user_id=user_id ) subcontainer_query = self._contents_common_query_for_subcontainer( history_id=container.id if container else None, user_id=user_id ) filters = filters or [] # Apply filters that are specific to a model for orm_filter in filters: if orm_filter.filter_type == "orm_function": contained_query = contained_query.filter(orm_filter.filter(self.contained_class)) subcontainer_query = subcontainer_query.filter(orm_filter.filter(self.subcontainer_class)) elif orm_filter.filter_type == "orm": contained_query = self._apply_orm_filter(contained_query, orm_filter) subcontainer_query = self._apply_orm_filter(subcontainer_query, orm_filter) contents_query = contained_query.union_all(subcontainer_query) contents_query = contents_query.order_by(*order_by) if limit is not None: contents_query = contents_query.limit(limit) if offset is not None: contents_query = contents_query.offset(offset) return contents_query def _apply_orm_filter(self, qry, orm_filter): if isinstance(orm_filter.filter, sql.elements.BinaryExpression): for match in filter(lambda col: col["name"] == orm_filter.filter.left.name, qry.column_descriptions): column = match["expr"] new_filter = orm_filter.filter._clone() if orm_filter.case_insensitive: column = func.lower(column) new_filter.left = column qry = qry.filter(new_filter) return qry def _contents_common_columns(self, component_class, **kwargs): columns = [] # pull column from class by name or override with kwargs if listed there, then label for column_name in self.common_columns: if column_name in kwargs: column = kwargs.get(column_name, None) elif column_name == "model_class": column = literal(component_class.__name__) else: column = getattr(component_class, column_name) column = column.label(column_name) columns.append(column) return columns def _contents_common_query_for_contained(self, history_id, user_id): component_class = self.contained_class # TODO: and now a join with Dataset - this is getting sad columns = self._contents_common_columns( component_class, history_content_type=literal("dataset"), size=model.Dataset.file_size, state=model.Dataset.state, object_store_id=model.Dataset.object_store_id, quota_source_label=model.Dataset.object_store_id, # do not have inner collections collection_id=literal(None), ) subquery = self._session().query(*columns) # for the HDA's we need to join the Dataset since it has an actual state column subquery = subquery.join(model.Dataset, model.Dataset.id == component_class.table.c.dataset_id) if history_id: subquery = subquery.filter(component_class.table.c.history_id == history_id) else: # Make sure we only return items that are user-accessible by checking that they are in a history # owned by the current user. # TODO: move into filter mixin, and implement accessible logic as SQL query subquery = subquery.filter( component_class.table.c.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id ) return subquery def _contents_common_query_for_subcontainer(self, history_id, user_id): component_class = self.subcontainer_class columns = self._contents_common_columns( component_class, history_content_type=literal("dataset_collection"), # do not have datasets dataset_id=literal(None), size=literal(None), state=model.DatasetCollection.populated_state, object_store_id=literal(None), quota_source_label=literal(None), # TODO: should be purgable? fix purged=literal(False), extension=literal(None), ) subquery = self._session().query(*columns) # for the HDCA's we need to join the DatasetCollection since it has the populated_state subquery = subquery.join(model.DatasetCollection, model.DatasetCollection.id == component_class.collection_id) if history_id: subquery = subquery.filter(component_class.history_id == history_id) else: subquery = subquery.filter( component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id ) return subquery def _get_union_type(self, union): """Return the string name of the class for this row in the union results""" return str(union[1]) def _get_union_id(self, union): """Return the id for this row in the union results""" return union[2] def _contained_id_map(self, id_list): """Return an id to model map of all contained-type models in the id_list.""" if not id_list: return [] component_class = self.contained_class stmt = ( select(component_class) .where(component_class.id.in_(id_list)) .options(undefer(component_class._metadata)) .options(joinedload(component_class.dataset).joinedload(model.Dataset.actions)) .options(joinedload(component_class.tags)) # type: ignore[attr-defined] .options(joinedload(component_class.annotations)) # type: ignore[attr-defined] ) result = self._session().scalars(stmt).unique() return {row.id: row for row in result} def _subcontainer_id_map(self, id_list, serialization_params=None): """Return an id to model map of all subcontainer-type models in the id_list.""" if not id_list: return [] component_class = self.subcontainer_class stmt = ( select(component_class) .where(component_class.id.in_(id_list)) .options(joinedload(component_class.collection)) .options(joinedload(component_class.tags)) .options(joinedload(component_class.annotations)) ) result = self._session().scalars(stmt).unique() return {row.id: row for row in result}
[docs]class HistoryContentsSerializer(base.ModelSerializer, deletable.PurgableSerializerMixin): """ Interface/service object for serializing histories into dictionaries. """ model_manager_class = HistoryContentsManager
[docs] def __init__(self, app: MinimalManagerApp, **kwargs): super().__init__(app, **kwargs) self.default_view = "summary" self.add_view( "summary", [ "id", "type_id", "history_id", "hid", "history_content_type", "visible", "dataset_id", "collection_id", "name", "state", "deleted", "purged", "create_time", "update_time", ], )
# assumes: outgoing to json.dumps and sanitized
[docs] def add_serializers(self): super().add_serializers() deletable.PurgableSerializerMixin.add_serializers(self) serializers: Dict[str, Serializer] = { "type_id": self.serialize_type_id, "history_id": self.serialize_id, "dataset_id": self.serialize_id_or_skip, "collection_id": self.serialize_id_or_skip, } self.serializers.update(serializers)
[docs] def serialize_id_or_skip(self, item: Any, key: str, **context): """Serialize id or skip if attribute with `key` is not present.""" if not hasattr(item, key): raise base.SkipAttribute("no such attribute") return self.serialize_id(item, key, **context)
[docs]class HistoryContentsFilters( base.ModelFilterParser, annotatable.AnnotatableFilterMixin, deletable.PurgableFiltersMixin, genomes.GenomeFilterMixin, taggable.TaggableFilterMixin, tools.ToolFilterMixin, ): # surprisingly (but ominously), this works for both content classes in the union that's filtered model_class = model.HistoryDatasetAssociation
[docs] def parse_query_filters_with_relations(self, query_filters: ValueFilterQueryParams, history_id): """Parse query filters but consider case where related filter is included.""" has_related_q = [q for q in ("related-eq", "related") if query_filters.q and q in query_filters.q] if query_filters.q and query_filters.qv and has_related_q: qv_index = query_filters.q.index(has_related_q[0]) qv_hid = query_filters.qv[qv_index] # Type check whether hid is int if not qv_hid.isdigit(): raise glx_exceptions.RequestParameterInvalidException( "unparsable value for related filter", column="related", operation="eq", value=qv_hid, ValueError="invalid type in filter", ) query_filters_with_relations = self.get_query_filters_with_relations( query_filters=query_filters, related_q=has_related_q[0], history_id=history_id ) return super().parse_query_filters(query_filters_with_relations) return super().parse_query_filters(query_filters)
def _parse_orm_filter(self, attr, op, val): # we need to use some manual/text/column fu here since some where clauses on the union don't work # using the model_class defined above - they need to be wrapped in their own .column() # (and some of these are *not* a normal columns (especially 'state') anyway) # TODO: genericize these - can probably extract a _get_column( attr, ... ) or something # special cases...special cases everywhere def get_filter(attr, op, val): if attr == "history_content_type" and op == "eq": if val in ("dataset", "dataset_collection"): return sql.column("history_content_type") == val raise_filter_err(attr, op, val, "bad op in filter") if attr == "related": if op == "eq": # unclear if multiple related values make sense, maybe this should be `.eq_` instead return sql.column("hid").in_(listify(json.loads(val))) raise_filter_err(attr, op, val, "bad op in filter") if attr == "type_id": if op == "eq": return sql.column("type_id") == val if op == "in": return sql.column("type_id").in_(self.parse_type_id_list(val)) raise_filter_err(attr, op, val, "bad op in filter") if attr in ("update_time", "create_time"): if op == "ge": return sql.column(attr) >= self.parse_date(val) if op == "le": return sql.column(attr) <= self.parse_date(val) if op == "gt": return sql.column(attr) > self.parse_date(val) if op == "lt": return sql.column(attr) < self.parse_date(val) raise_filter_err(attr, op, val, "bad op in filter") if attr == "state": valid_states = model.Dataset.states.values() if op == "eq": if val not in valid_states: raise_filter_err(attr, op, val, "invalid state in filter") return sql.column("state") == val if op == "in": states = [s for s in val.split(",") if s] for state in states: if state not in valid_states: raise_filter_err(attr, op, state, "invalid state in filter") return sql.column("state").in_(states) raise_filter_err(attr, op, val, "bad op in filter") if attr == "object_store_id": if op == "eq": return sql.column("object_store_id") == val if op == "in": object_store_ids = [s for s in val.split(",") if s] return sql.column("object_store_id").in_(object_store_ids) raise_filter_err(attr, op, val, "bad op in filter") if attr == "quota_source_label": if op == "eq": ids = self.app.object_store.get_quota_source_map().ids_per_quota_source( include_default_quota_source=True ) if val == "__null__": val = None if val not in ids: raise ValueError(f"Could not find key {val} in object store keys {list(ids.keys())}") object_store_ids = ids[val] return sql.column("object_store_id").in_(object_store_ids) raise_filter_err(attr, op, val, "bad op in filter") if (column_filter := get_filter(attr, op, val)) is not None: return self.parsed_filter(filter_type="orm", filter=column_filter) return super()._parse_orm_filter(attr, op, val)
[docs] def get_query_filters_with_relations(self, query_filters: ValueFilterQueryParams, related_q: str, history_id): """Return `query_filters_with_relations` changing `related:hid` to `related:[hid1, hid2, ...]`.""" if query_filters.q and query_filters.qv: qv_index = query_filters.q.index(related_q) qv_hid = query_filters.qv[qv_index] # Make new q and qv excluding related filter new_q = [x for i, x in enumerate(query_filters.q) if i != qv_index] new_qv = [x for i, x in enumerate(query_filters.qv) if i != qv_index] # Get list of related item hids from job_connections manager job_connections_manager = JobConnectionsManager(self.app.model.session) related = job_connections_manager.get_related_hids(history_id, int(qv_hid)) # Make new query_filters with updated list of related hids for given hid new_q.append("related-eq") new_qv.append(json.dumps(related)) query_filters_with_relations = ValueFilterQueryParams( q=new_q, qv=new_qv, ) return query_filters_with_relations return query_filters
[docs] def decode_type_id(self, type_id): TYPE_ID_SEP = "-" split = type_id.split(TYPE_ID_SEP, 1) return TYPE_ID_SEP.join((split[0], str(self.app.security.decode_id(split[1]))))
[docs] def parse_type_id_list(self, type_id_list_string, sep=","): """ Split `type_id_list_string` at `sep`. """ return [self.decode_type_id(type_id) for type_id in type_id_list_string.split(sep)]
def _add_parsers(self): super()._add_parsers() database_connection: str = self.app.config.database_connection annotatable.AnnotatableFilterMixin._add_parsers(self) genomes.GenomeFilterMixin._add_parsers(self, database_connection) deletable.PurgableFiltersMixin._add_parsers(self) taggable.TaggableFilterMixin._add_parsers(self) tools.ToolFilterMixin._add_parsers(self) self.orm_filter_parsers.update( { "history_content_type": {"op": ("eq")}, "type_id": {"op": ("eq", "in"), "val": self.parse_type_id_list}, "hid": {"op": ("eq", "ge", "le", "gt", "lt"), "val": int}, # TODO: needs a different val parser - but no way to add to the above # 'hid-in' : { 'op': ( 'in' ), 'val': self.parse_int_list }, "name": {"op": ("eq", "contains", "like")}, "state": {"op": ("eq", "in")}, "object_store_id": {"op": ("eq", "in")}, "quota_source_label": {"op": ("eq")}, "visible": {"op": ("eq"), "val": parse_bool}, "create_time": {"op": ("le", "ge", "lt", "gt"), "val": self.parse_date}, "update_time": {"op": ("le", "ge", "lt", "gt"), "val": self.parse_date}, } )