Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.history_contents
"""
Heterogenous lists/contents are difficult to query properly since unions are
not easily made.
"""
import logging
from typing import (
Any,
Dict,
List,
)
from sqlalchemy import (
asc,
desc,
false,
func,
literal,
nullsfirst,
nullslast,
sql,
true,
)
from sqlalchemy.orm import (
joinedload,
undefer,
)
from galaxy import exceptions as glx_exceptions
from galaxy import model
from galaxy.managers import (
annotatable,
base,
deletable,
hdas,
hdcas,
taggable,
tools,
)
from galaxy.structured_app import MinimalManagerApp
from .base import (
parse_bool,
raise_filter_err,
Serializer,
)
log = logging.getLogger(__name__)
# into its own class to have it's own filters, etc.
# TODO: but can't inherit from model manager (which assumes only one model)
[docs]class HistoryContentsManager(base.SortableManager):
root_container_class = model.History
contained_class = model.HistoryDatasetAssociation
contained_class_manager_class = hdas.HDAManager
contained_class_type_name = "dataset"
subcontainer_class = model.HistoryDatasetCollectionAssociation
subcontainer_class_manager_class = hdcas.HDCAManager
subcontainer_class_type_name = "dataset_collection"
#: the columns which are common to both subcontainers and non-subcontainers.
# (Also the attributes that may be filtered or orderered_by)
common_columns = (
"history_id",
"history_content_type",
"id",
"type_id",
"hid",
# joining columns
"extension",
"dataset_id",
"collection_id",
"name",
"state",
"size",
"deleted",
"purged",
"visible",
"create_time",
"update_time",
)
default_order_by = "hid"
[docs] def __init__(self, app: MinimalManagerApp):
self.app = app
self.contained_manager = app[self.contained_class_manager_class]
self.subcontainer_manager = app[self.subcontainer_class_manager_class]
# ---- interface
[docs] def contained(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs):
"""
Returns non-subcontainer objects within `container`.
"""
filter_to_inside_container = self._get_filter_for_contained(container, self.contained_class)
filters = base.munge_lists(filter_to_inside_container, filters)
return self.contained_manager.list(filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs)
[docs] def subcontainers(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs):
"""
Returns only the containers within `container`.
"""
filter_to_inside_container = self._get_filter_for_contained(container, self.subcontainer_class)
filters = base.munge_lists(filter_to_inside_container, filters)
# TODO: collections.DatasetCollectionManager doesn't have the list
# return self.subcontainer_manager.list( filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs )
return self._session().query(self.subcontainer_class).filter(filters).all()
[docs] def contents(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs):
"""
Returns a list of both/all types of contents, filtered and in some order.
"""
# TODO?: we could branch here based on 'if limit is None and offset is None' - to a simpler (non-union) query
# for now, I'm just using this (even for non-limited/offset queries) to reduce code paths
return self._union_of_contents(
container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs
)
[docs] def contents_count(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs):
"""
Returns a count of both/all types of contents, based on the given filters.
"""
return self.contents_query(
container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs
).count()
[docs] def contents_query(self, container, filters=None, limit=None, offset=None, order_by=None, **kwargs):
"""
Returns the contents union query for subqueries, etc.
"""
return self._union_of_contents_query(
container, filters=filters, limit=limit, offset=offset, order_by=order_by, **kwargs
)
# order_by parsing - similar to FilterParser but not enough yet to warrant a class?
[docs] def parse_order_by(self, order_by_string, default=None):
"""Return an ORM compatible order_by using the given string"""
available = ["create_time", "extension", "hid", "history_id", "name", "update_time", "size"]
for attribute in available:
attribute_dsc = f"{attribute}-dsc"
attribute_asc = f"{attribute}-asc"
if order_by_string in (attribute, attribute_dsc):
order_by = desc(attribute)
if attribute == "size":
return nullslast(order_by)
return order_by
if order_by_string == attribute_asc:
order_by = asc(attribute)
if attribute == "size":
return nullsfirst(order_by)
return order_by
if default:
return self.parse_order_by(default)
raise glx_exceptions.RequestParameterInvalidException(
"Unknown order_by", order_by=order_by_string, available=available
)
# history specific methods
[docs] def state_counts(self, history):
"""
Return a dictionary containing the counts of all contents in each state
keyed by the distinct states.
Note: does not include deleted/hidden contents.
"""
filters = [
base.ModelFilterParser.parsed_filter("orm", sql.column("deleted") == false()),
base.ModelFilterParser.parsed_filter("orm", sql.column("visible") == true()),
]
contents_subquery = self._union_of_contents_query(history, filters=filters).subquery()
statement = (
sql.select([sql.column("state"), func.count("*")])
.select_from(contents_subquery)
.group_by(sql.column("state"))
)
counts = self.app.model.context.execute(statement).fetchall()
return dict(counts)
[docs] def active_counts(self, history):
"""
Return a dictionary keyed with 'deleted', 'hidden', and 'active' with values
for each representing the count of contents in each state.
Note: counts for deleted and hidden overlap; In other words, a dataset that's
both deleted and hidden will be added to both totals.
"""
returned = dict(deleted=0, hidden=0, active=0)
contents_subquery = self._union_of_contents_query(history).subquery()
columns = [sql.column("deleted"), sql.column("visible"), func.count("*")]
statement = (
sql.select(columns).select_from(contents_subquery).group_by(sql.column("deleted"), sql.column("visible"))
)
groups = self.app.model.context.execute(statement).fetchall()
for deleted, visible, count in groups:
if deleted:
returned["deleted"] += count
if not visible:
returned["hidden"] += count
if not deleted and visible:
returned["active"] += count
return returned
[docs] def map_datasets(self, history, fn, **kwargs):
"""
Iterate over the datasets of a given history, recursing into collections, and
calling fn on each dataset.
Uses the same kwargs as `contents` above.
"""
returned = []
contents = self.contents(history, **kwargs)
for content in contents:
if isinstance(content, self.subcontainer_class):
processed_list = self.subcontainer_manager.map_datasets(content, fn)
returned.extend(processed_list)
else:
processed = fn(content)
returned.append(processed)
return returned
# ---- private
def _session(self):
return self.app.model.context
def _filter_to_contents_query(self, container, content_class, **kwargs):
# TODO: use list (or by_history etc.)
container_filter = self._get_filter_for_contained(container, content_class)
query = self._session().query(content_class).filter(container_filter)
return query
def _get_filter_for_contained(self, container, content_class):
return content_class.history == container
def _union_of_contents(self, container, expand_models=True, **kwargs):
"""
Returns a limited and offset list of both types of contents, filtered
and in some order.
"""
contents_results = self._union_of_contents_query(container, **kwargs).all()
if not expand_models:
return contents_results
# partition ids into a map of { component_class names -> list of ids } from the above union query
id_map: Dict[str, List[int]] = dict(
[(self.contained_class_type_name, []), (self.subcontainer_class_type_name, [])]
)
for result in contents_results:
result_type = self._get_union_type(result)
contents_id = self._get_union_id(result)
if result_type in id_map:
id_map[result_type].append(contents_id)
else:
raise TypeError("Unknown contents type:", result_type)
# query 2 & 3: use the ids to query each component_class, returning an id->full component model map
contained_ids = id_map[self.contained_class_type_name]
id_map[self.contained_class_type_name] = self._contained_id_map(contained_ids)
subcontainer_ids = id_map[self.subcontainer_class_type_name]
serialization_params = kwargs.get("serialization_params", None)
id_map[self.subcontainer_class_type_name] = self._subcontainer_id_map(
subcontainer_ids, serialization_params=serialization_params
)
# cycle back over the union query to create an ordered list of the objects returned in queries 2 & 3 above
contents = []
filters = kwargs.get("filters") or []
# TODO: or as generator?
for result in contents_results:
result_type = self._get_union_type(result)
contents_id = self._get_union_id(result)
content = id_map[result_type][contents_id]
if self.passes_filters(content, filters):
contents.append(content)
return contents
[docs] @staticmethod
def passes_filters(content, filters):
for filter_fn in filters:
if filter_fn.filter_type == "function":
if not filter_fn.filter(content):
return False
return True
def _union_of_contents_query(
self, container, filters=None, limit=None, offset=None, order_by=None, user_id=None, **kwargs
):
"""
Returns a query for a limited and offset list of both types of contents,
filtered and in some order.
"""
order_by = order_by if order_by is not None else self.default_order_by
order_by = order_by if isinstance(order_by, (tuple, list)) else (order_by,)
# TODO: 3 queries and 3 iterations over results - this is undoubtedly better solved in the actual SQL layer
# via one common table for contents, Some Yonder Resplendent and Fanciful Join, or ORM functionality
# Here's the (bizarre) strategy:
# 1. create a union of common columns between contents classes - filter, order, and limit/offset this
# 2. extract the ids returned from 1 for each class, query each content class by that id list
# 3. use the results/order from 1 to recombine/merge the 2+ query result lists from 2, return that
# note: I'm trying to keep these private functions as generic as possible in order to move them toward base later
# query 1: create a union of common columns for which the component_classes can be filtered/limited
contained_query = self._contents_common_query_for_contained(
history_id=container.id if container else None, user_id=user_id
)
subcontainer_query = self._contents_common_query_for_subcontainer(
history_id=container.id if container else None, user_id=user_id
)
filters = filters or []
# Apply filters that are specific to a model
for orm_filter in filters:
if orm_filter.filter_type == "orm_function":
contained_query = contained_query.filter(orm_filter.filter(self.contained_class))
subcontainer_query = subcontainer_query.filter(orm_filter.filter(self.subcontainer_class))
elif orm_filter.filter_type == "orm":
contained_query = self._apply_orm_filter(contained_query, orm_filter.filter)
subcontainer_query = self._apply_orm_filter(subcontainer_query, orm_filter.filter)
contents_query = contained_query.union_all(subcontainer_query)
contents_query = contents_query.order_by(*order_by)
if limit is not None:
contents_query = contents_query.limit(limit)
if offset is not None:
contents_query = contents_query.offset(offset)
return contents_query
def _apply_orm_filter(self, qry, orm_filter):
if isinstance(orm_filter, sql.elements.BinaryExpression):
for match in filter(lambda col: col["name"] == orm_filter.left.name, qry.column_descriptions):
column = match["expr"]
new_filter = orm_filter._clone()
new_filter.left = column
qry = qry.filter(new_filter)
return qry
def _contents_common_columns(self, component_class, **kwargs):
columns = []
# pull column from class by name or override with kwargs if listed there, then label
for column_name in self.common_columns:
if column_name in kwargs:
column = kwargs.get(column_name, None)
elif column_name == "model_class":
column = literal(component_class.__name__)
else:
column = getattr(component_class, column_name)
column = column.label(column_name)
columns.append(column)
return columns
def _contents_common_query_for_contained(self, history_id, user_id):
component_class = self.contained_class
# TODO: and now a join with Dataset - this is getting sad
columns = self._contents_common_columns(
component_class,
history_content_type=literal("dataset"),
size=model.Dataset.file_size,
state=model.Dataset.state,
# do not have inner collections
collection_id=literal(None),
)
subquery = self._session().query(*columns)
# for the HDA's we need to join the Dataset since it has an actual state column
subquery = subquery.join(model.Dataset, model.Dataset.id == component_class.table.c.dataset_id)
if history_id:
subquery = subquery.filter(component_class.table.c.history_id == history_id)
else:
# Make sure we only return items that are user-accessible by checking that they are in a history
# owned by the current user.
# TODO: move into filter mixin, and implement accessible logic as SQL query
subquery = subquery.filter(
component_class.table.c.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id
)
return subquery
def _contents_common_query_for_subcontainer(self, history_id, user_id):
component_class = self.subcontainer_class
columns = self._contents_common_columns(
component_class,
history_content_type=literal("dataset_collection"),
# do not have datasets
dataset_id=literal(None),
size=literal(None),
state=model.DatasetCollection.populated_state,
# TODO: should be purgable? fix
purged=literal(False),
extension=literal(None),
)
subquery = self._session().query(*columns)
# for the HDCA's we need to join the DatasetCollection since it has the populated_state
subquery = subquery.join(model.DatasetCollection, model.DatasetCollection.id == component_class.collection_id)
if history_id:
subquery = subquery.filter(component_class.history_id == history_id)
else:
subquery = subquery.filter(
component_class.history_id == model.History.table.c.id, model.History.table.c.user_id == user_id
)
return subquery
def _get_union_type(self, union):
"""Return the string name of the class for this row in the union results"""
return str(union[1])
def _get_union_id(self, union):
"""Return the id for this row in the union results"""
return union[2]
def _contained_id_map(self, id_list):
"""Return an id to model map of all contained-type models in the id_list."""
if not id_list:
return []
component_class = self.contained_class
query = (
self._session()
.query(component_class)
.filter(component_class.id.in_(id_list))
.options(undefer(component_class._metadata))
.options(joinedload("dataset.actions")) # TODO: use class attr after moving Dataset to declarative mapping.
.options(joinedload(component_class.tags))
.options(joinedload(component_class.annotations)) # type: ignore[attr-defined]
)
return {row.id: row for row in query.all()}
def _subcontainer_id_map(self, id_list, serialization_params=None):
"""Return an id to model map of all subcontainer-type models in the id_list."""
if not id_list:
return []
component_class = self.subcontainer_class
query = (
self._session()
.query(component_class)
.filter(component_class.id.in_(id_list))
.options(joinedload(component_class.collection))
.options(joinedload(component_class.tags))
.options(joinedload(component_class.annotations))
)
# This will conditionally join a potentially costly job_state summary
# All the paranoia if-checking makes me wonder if serialization_params
# should really be a property of the manager class instance
if serialization_params and serialization_params.keys:
if "job_state_summary" in serialization_params.keys:
query = query.options(joinedload(component_class.job_state_summary))
return {row.id: row for row in query.all()}
[docs]class HistoryContentsSerializer(base.ModelSerializer, deletable.PurgableSerializerMixin):
"""
Interface/service object for serializing histories into dictionaries.
"""
model_manager_class = HistoryContentsManager
[docs] def __init__(self, app: MinimalManagerApp, **kwargs):
super().__init__(app, **kwargs)
self.default_view = "summary"
self.add_view(
"summary",
[
"id",
"type_id",
"history_id",
"hid",
"history_content_type",
"visible",
"dataset_id",
"collection_id",
"name",
"state",
"deleted",
"purged",
"create_time",
"update_time",
],
)
# assumes: outgoing to json.dumps and sanitized
[docs] def add_serializers(self):
super().add_serializers()
deletable.PurgableSerializerMixin.add_serializers(self)
serializers: Dict[str, Serializer] = {
"type_id": self.serialize_type_id,
"history_id": self.serialize_id,
"dataset_id": self.serialize_id_or_skip,
"collection_id": self.serialize_id_or_skip,
}
self.serializers.update(serializers)
[docs] def serialize_id_or_skip(self, item: Any, key: str, **context):
"""Serialize id or skip if attribute with `key` is not present."""
if not hasattr(item, key):
raise base.SkipAttribute("no such attribute")
return self.serialize_id(item, key, **context)
[docs]class HistoryContentsFilters(
base.ModelFilterParser,
annotatable.AnnotatableFilterMixin,
deletable.PurgableFiltersMixin,
taggable.TaggableFilterMixin,
tools.ToolFilterMixin,
):
# surprisingly (but ominously), this works for both content classes in the union that's filtered
model_class = model.HistoryDatasetAssociation
def _parse_orm_filter(self, attr, op, val):
# we need to use some manual/text/column fu here since some where clauses on the union don't work
# using the model_class defined above - they need to be wrapped in their own .column()
# (and some of these are *not* a normal columns (especially 'state') anyway)
# TODO: genericize these - can probably extract a _get_column( attr, ... ) or something
# special cases...special cases everywhere
def get_filter(attr, op, val):
if attr == "history_content_type" and op == "eq":
if val in ("dataset", "dataset_collection"):
return sql.column("history_content_type") == val
raise_filter_err(attr, op, val, "bad op in filter")
if attr == "type_id":
if op == "eq":
return sql.column("type_id") == val
if op == "in":
return sql.column("type_id").in_(self.parse_type_id_list(val))
raise_filter_err(attr, op, val, "bad op in filter")
if attr in ("update_time", "create_time"):
if op == "ge":
return sql.column(attr) >= self.parse_date(val)
if op == "le":
return sql.column(attr) <= self.parse_date(val)
if op == "gt":
return sql.column(attr) > self.parse_date(val)
if op == "lt":
return sql.column(attr) < self.parse_date(val)
raise_filter_err(attr, op, val, "bad op in filter")
if attr == "state":
valid_states = model.Dataset.states.values()
if op == "eq":
if val not in valid_states:
raise_filter_err(attr, op, val, "invalid state in filter")
return sql.column("state") == val
if op == "in":
states = [s for s in val.split(",") if s]
for state in states:
if state not in valid_states:
raise_filter_err(attr, op, state, "invalid state in filter")
return sql.column("state").in_(states)
raise_filter_err(attr, op, val, "bad op in filter")
column_filter = get_filter(attr, op, val)
if column_filter is not None:
return self.parsed_filter(filter_type="orm", filter=column_filter)
return super()._parse_orm_filter(attr, op, val)
[docs] def decode_type_id(self, type_id):
TYPE_ID_SEP = "-"
split = type_id.split(TYPE_ID_SEP, 1)
return TYPE_ID_SEP.join((split[0], str(self.app.security.decode_id(split[1]))))
[docs] def parse_type_id_list(self, type_id_list_string, sep=","):
"""
Split `type_id_list_string` at `sep`.
"""
return [self.decode_type_id(type_id) for type_id in type_id_list_string.split(sep)]
def _add_parsers(self):
super()._add_parsers()
annotatable.AnnotatableFilterMixin._add_parsers(self)
deletable.PurgableFiltersMixin._add_parsers(self)
taggable.TaggableFilterMixin._add_parsers(self)
tools.ToolFilterMixin._add_parsers(self)
self.orm_filter_parsers.update(
{
"history_content_type": {"op": ("eq")},
"type_id": {"op": ("eq", "in"), "val": self.parse_type_id_list},
"hid": {"op": ("eq", "ge", "le", "gt", "lt"), "val": int},
# TODO: needs a different val parser - but no way to add to the above
# 'hid-in' : { 'op': ( 'in' ), 'val': self.parse_int_list },
"name": {"op": ("eq", "contains", "like")},
"state": {"op": ("eq", "in")},
"visible": {"op": ("eq"), "val": parse_bool},
"create_time": {"op": ("le", "ge", "lt", "gt"), "val": self.parse_date},
"update_time": {"op": ("le", "ge", "lt", "gt"), "val": self.parse_date},
}
)