Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.hdcas
"""
Manager and Serializer for HDCAs.
HistoryDatasetCollectionAssociations (HDCAs) are datasets contained or created in a
history.
"""
import logging
from typing import Dict
from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.managers import (
annotatable,
base,
deletable,
hdas,
secured,
taggable,
)
from galaxy.managers.collections_util import get_hda_and_element_identifiers
from galaxy.model.tags import GalaxyTagHandler
from galaxy.structured_app import (
MinimalManagerApp,
StructuredApp,
)
from galaxy.util.zipstream import ZipstreamWrapper
log = logging.getLogger(__name__)
[docs]def stream_dataset_collection(dataset_collection_instance, upstream_mod_zip=False, upstream_gzip=False):
archive_name = f"{dataset_collection_instance.hid}: {dataset_collection_instance.name}"
archive = ZipstreamWrapper(
archive_name=archive_name,
upstream_mod_zip=upstream_mod_zip,
upstream_gzip=upstream_gzip,
)
write_dataset_collection(dataset_collection_instance, archive)
return archive
[docs]def write_dataset_collection(dataset_collection_instance, archive):
if not dataset_collection_instance.collection.populated_optimized:
raise RequestParameterInvalidException("Attempt to write dataset collection that has not been populated yet")
names, hdas = get_hda_and_element_identifiers(dataset_collection_instance)
for name, hda in zip(names, hdas):
if hda.state != hda.states.OK or hda.purged or hda.dataset.purged:
continue
for file_path, relpath in hda.datatype.to_archive(dataset=hda, name=name):
archive.write(file_path, relpath)
return archive
[docs]def set_collection_attributes(dataset_element, *payload):
for attribute, value in payload:
setattr(dataset_element, attribute[1], value[1])
# TODO: to DatasetCollectionInstanceManager
[docs]class HDCAManager(
base.ModelManager,
secured.AccessibleManagerMixin,
secured.OwnableManagerMixin,
deletable.PurgableManagerMixin,
annotatable.AnnotatableManagerMixin,
):
"""
Interface/service object for interacting with HDCAs.
"""
model_class = model.HistoryDatasetCollectionAssociation
foreign_key_name = "history_dataset_collection_association"
tag_assoc = model.HistoryDatasetCollectionTagAssociation
annotation_assoc = model.HistoryDatasetCollectionAssociationAnnotationAssociation
[docs] def __init__(self, app: MinimalManagerApp):
"""
Set up and initialize other managers needed by hdas.
"""
super().__init__(app)
self.tag_handler = app[GalaxyTagHandler]
[docs] def map_datasets(self, content, fn, *parents):
"""
Iterate over the datasets of a given collection, recursing into collections, and
calling fn on each dataset.
Uses the same kwargs as `contents` above.
"""
returned = []
# lots of nesting going on within the nesting
collection = content.collection if hasattr(content, "collection") else content
this_parents = (content,) + parents
for element in collection.elements:
next_parents = (element,) + this_parents
if element.is_collection:
processed_list = self.map_datasets(element.child_collection, fn, *next_parents)
returned.extend(processed_list)
else:
processed = fn(element.dataset_instance, *next_parents)
returned.append(processed)
return returned
[docs] def update_attributes(self, content, payload: Dict):
# pre-requisite checked that attributes are valid
self.map_datasets(content, fn=lambda item, *args: set_collection_attributes(item, payload.items()))
# serializers
# -----------------------------------------------------------------------------
[docs]class DCESerializer(base.ModelSerializer):
"""
Serializer for DatasetCollectionElements.
"""
[docs] def __init__(self, app: StructuredApp):
super().__init__(app)
self.hda_serializer = hdas.HDASerializer(app)
self.dc_serializer = DCSerializer(app, dce_serializer=self)
self.default_view = "summary"
self.add_view("summary", ["id", "model_class", "element_index", "element_identifier", "element_type", "object"])
[docs] def add_serializers(self):
super().add_serializers()
self.serializers.update(
{"model_class": lambda *a, **c: "DatasetCollectionElement", "object": self.serialize_object}
)
[docs] def serialize_object(self, item, key, **context):
if item.hda:
return self.hda_serializer.serialize_to_view(item.hda, view="summary", **context)
if item.child_collection:
return self.dc_serializer.serialize_to_view(item.child_collection, view="detailed", **context)
return "object"
[docs]class DCSerializer(base.ModelSerializer):
"""
Serializer for DatasetCollections.
"""
[docs] def __init__(self, app: StructuredApp, dce_serializer=None):
super().__init__(app)
self.dce_serializer = dce_serializer or DCESerializer(app)
self.default_view = "summary"
self.add_view(
"summary",
[
"id",
"create_time",
"update_time",
"collection_type",
"populated_state",
"populated_state_message",
"element_count",
],
)
self.add_view(
"detailed",
[
"populated",
"elements",
],
include_keys_from="summary",
)
[docs] def add_serializers(self):
super().add_serializers()
self.serializers.update(
{
"model_class": lambda *a, **c: "DatasetCollection",
"elements": self.serialize_elements,
}
)
[docs] def serialize_elements(self, item, key, **context):
returned = []
for element in item.elements:
serialized = self.dce_serializer.serialize_to_view(element, view="summary", **context)
returned.append(serialized)
return returned
[docs]class DCASerializer(base.ModelSerializer):
"""
Base (abstract) Serializer class for HDCAs and LDCAs.
"""
app: StructuredApp
[docs] def __init__(self, app: StructuredApp, dce_serializer=None):
super().__init__(app)
self.dce_serializer = dce_serializer or DCESerializer(app)
self.default_view = "summary"
self.add_view(
"summary",
[
"id",
"create_time",
"update_time",
"collection_type",
"populated_state",
"populated_state_message",
"element_count",
],
)
self.add_view(
"detailed",
[
"populated",
"elements",
],
include_keys_from="summary",
)
[docs] def add_serializers(self):
super().add_serializers()
# most attributes are (kinda) proxied from DCs - we need a serializer to proxy to
self.dc_serializer = DCSerializer(self.app)
# then set the serializers to point to it for those attrs
collection_keys = [
"create_time",
"update_time",
"collection_type",
"populated",
"populated_state",
"populated_state_message",
"elements",
"element_count",
]
for key in collection_keys:
self.serializers[key] = self._proxy_to_dataset_collection(key=key)
def _proxy_to_dataset_collection(self, serializer=None, key=None):
# dataset_collection associations are (rough) proxies to datasets - access their serializer using this remapping fn
# remapping done by either kwarg key: IOW dataset attr key (e.g. populated_state)
# or by kwarg serializer: a function that's passed in (e.g. elements)
if key:
return lambda i, k, **c: self.dc_serializer.serialize(i.collection, [k], **c)[k]
if serializer:
return lambda i, k, **c: serializer(i.collection, key or k, **c)
raise TypeError("kwarg serializer or key needed")
[docs]class HDCASerializer(DCASerializer, taggable.TaggableSerializerMixin, annotatable.AnnotatableSerializerMixin):
"""
Serializer for HistoryDatasetCollectionAssociations.
"""
[docs] def __init__(self, app: StructuredApp):
super().__init__(app)
self.hdca_manager = HDCAManager(app)
self.default_view = "summary"
self.add_view(
"summary",
[
"id",
"type_id",
"name",
"history_id",
"collection_id",
"hid",
"history_content_type",
"collection_type",
"populated_state",
"populated_state_message",
"element_count",
"job_source_id",
"job_source_type",
"job_state_summary",
"name",
"deleted",
"visible",
"type",
"url",
"create_time",
"update_time",
"tags",
"contents_url",
],
)
self.add_view(
"detailed",
[
"populated",
"elements",
"elements_datatypes",
],
include_keys_from="summary",
)
[docs] def add_serializers(self):
super().add_serializers()
taggable.TaggableSerializerMixin.add_serializers(self)
annotatable.AnnotatableSerializerMixin.add_serializers(self)
serializers: Dict[str, base.Serializer] = {
"model_class": lambda item, key, **context: self.hdca_manager.model_class.__class__.__name__,
# TODO: remove
"type": lambda item, key, **context: "collection",
# part of a history and container
"history_id": self.serialize_id,
"history_content_type": lambda item, key, **context: self.hdca_manager.model_class.content_type,
"type_id": self.serialize_type_id,
"job_source_id": self.serialize_id,
"url": lambda item, key, **context: self.url_for(
"history_content_typed",
history_id=self.app.security.encode_id(item.history_id),
id=self.app.security.encode_id(item.id),
type=self.hdca_manager.model_class.content_type,
context=context,
),
"contents_url": self.generate_contents_url,
"job_state_summary": self.serialize_job_state_summary,
"elements_datatypes": self.serialize_elements_datatypes,
"collection_id": self.serialize_id,
}
self.serializers.update(serializers)
[docs] def generate_contents_url(self, item, key, **context):
encode_id = self.app.security.encode_id
trans = context.get("trans")
url_for = trans.url_builder if trans and trans.url_builder else self.url_for
contents_url = url_for(
"contents_dataset_collection", hdca_id=encode_id(item.id), parent_id=encode_id(item.collection_id)
)
return contents_url
[docs] def serialize_job_state_summary(self, item, key, **context):
return item.job_state_summary_dict
[docs] def serialize_elements_datatypes(self, item, key, **context):
extensions_set = item.dataset_dbkeys_and_extensions_summary[1]
return list(extensions_set)