Source code for galaxy.managers.hdcas

"""
Manager and Serializer for HDCAs.

HistoryDatasetCollectionAssociations (HDCAs) are datasets contained or created in a
history.
"""

import logging
from typing import Dict

from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.managers import (
    annotatable,
    base,
    deletable,
    hdas,
    secured,
    taggable,
)
from galaxy.managers.collections_util import get_hda_and_element_identifiers
from galaxy.model.tags import GalaxyTagHandler
from galaxy.structured_app import (
    MinimalManagerApp,
    StructuredApp,
)
from galaxy.util.zipstream import ZipstreamWrapper

log = logging.getLogger(__name__)


[docs]def stream_dataset_collection(dataset_collection_instance, upstream_mod_zip=False, upstream_gzip=False): archive_name = f"{dataset_collection_instance.hid}: {dataset_collection_instance.name}" archive = ZipstreamWrapper( archive_name=archive_name, upstream_mod_zip=upstream_mod_zip, upstream_gzip=upstream_gzip, ) write_dataset_collection(dataset_collection_instance, archive) return archive
[docs]def write_dataset_collection(dataset_collection_instance, archive): if not dataset_collection_instance.collection.populated_optimized: raise RequestParameterInvalidException("Attempt to write dataset collection that has not been populated yet") names, hdas = get_hda_and_element_identifiers(dataset_collection_instance) for name, hda in zip(names, hdas): if hda.state != hda.states.OK or hda.purged or hda.dataset.purged: continue for file_path, relpath in hda.datatype.to_archive(dataset=hda, name=name): archive.write(file_path, relpath) return archive
[docs]def set_collection_attributes(dataset_element, *payload): for attribute, value in payload: setattr(dataset_element, attribute[1], value[1])
# TODO: to DatasetCollectionInstanceManager
[docs]class HDCAManager( base.ModelManager, secured.AccessibleManagerMixin, secured.OwnableManagerMixin, deletable.PurgableManagerMixin, annotatable.AnnotatableManagerMixin, ): """ Interface/service object for interacting with HDCAs. """ model_class = model.HistoryDatasetCollectionAssociation foreign_key_name = "history_dataset_collection_association" tag_assoc = model.HistoryDatasetCollectionTagAssociation annotation_assoc = model.HistoryDatasetCollectionAssociationAnnotationAssociation
[docs] def __init__(self, app: MinimalManagerApp): """ Set up and initialize other managers needed by hdas. """ super().__init__(app) self.tag_handler = app[GalaxyTagHandler]
[docs] def map_datasets(self, content, fn, *parents): """ Iterate over the datasets of a given collection, recursing into collections, and calling fn on each dataset. Uses the same kwargs as `contents` above. """ returned = [] # lots of nesting going on within the nesting collection = content.collection if hasattr(content, "collection") else content this_parents = (content,) + parents for element in collection.elements: next_parents = (element,) + this_parents if element.is_collection: processed_list = self.map_datasets(element.child_collection, fn, *next_parents) returned.extend(processed_list) else: processed = fn(element.dataset_instance, *next_parents) returned.append(processed) return returned
[docs] def update_attributes(self, content, payload: Dict): # pre-requisite checked that attributes are valid self.map_datasets(content, fn=lambda item, *args: set_collection_attributes(item, payload.items()))
# serializers # -----------------------------------------------------------------------------
[docs]class DCESerializer(base.ModelSerializer): """ Serializer for DatasetCollectionElements. """
[docs] def __init__(self, app: StructuredApp): super().__init__(app) self.hda_serializer = hdas.HDASerializer(app) self.dc_serializer = DCSerializer(app, dce_serializer=self) self.default_view = "summary" self.add_view("summary", ["id", "model_class", "element_index", "element_identifier", "element_type", "object"])
[docs] def add_serializers(self): super().add_serializers() self.serializers.update( {"model_class": lambda *a, **c: "DatasetCollectionElement", "object": self.serialize_object} )
[docs] def serialize_object(self, item, key, **context): if item.hda: return self.hda_serializer.serialize_to_view(item.hda, view="summary", **context) if item.child_collection: return self.dc_serializer.serialize_to_view(item.child_collection, view="detailed", **context) return "object"
[docs]class DCSerializer(base.ModelSerializer): """ Serializer for DatasetCollections. """
[docs] def __init__(self, app: StructuredApp, dce_serializer=None): super().__init__(app) self.dce_serializer = dce_serializer or DCESerializer(app) self.default_view = "summary" self.add_view( "summary", [ "id", "create_time", "update_time", "collection_type", "populated_state", "populated_state_message", "element_count", ], ) self.add_view( "detailed", [ "populated", "elements", ], include_keys_from="summary", )
[docs] def add_serializers(self): super().add_serializers() self.serializers.update( { "model_class": lambda *a, **c: "DatasetCollection", "elements": self.serialize_elements, } )
[docs] def serialize_elements(self, item, key, **context): returned = [] for element in item.elements: serialized = self.dce_serializer.serialize_to_view(element, view="summary", **context) returned.append(serialized) return returned
[docs]class DCASerializer(base.ModelSerializer): """ Base (abstract) Serializer class for HDCAs and LDCAs. """ app: StructuredApp
[docs] def __init__(self, app: StructuredApp, dce_serializer=None): super().__init__(app) self.dce_serializer = dce_serializer or DCESerializer(app) self.default_view = "summary" self.add_view( "summary", [ "id", "create_time", "update_time", "collection_type", "populated_state", "populated_state_message", "element_count", ], ) self.add_view( "detailed", [ "populated", "elements", ], include_keys_from="summary", )
[docs] def add_serializers(self): super().add_serializers() # most attributes are (kinda) proxied from DCs - we need a serializer to proxy to self.dc_serializer = DCSerializer(self.app) # then set the serializers to point to it for those attrs collection_keys = [ "create_time", "update_time", "collection_type", "populated", "populated_state", "populated_state_message", "elements", "element_count", ] for key in collection_keys: self.serializers[key] = self._proxy_to_dataset_collection(key=key)
def _proxy_to_dataset_collection(self, serializer=None, key=None): # dataset_collection associations are (rough) proxies to datasets - access their serializer using this remapping fn # remapping done by either kwarg key: IOW dataset attr key (e.g. populated_state) # or by kwarg serializer: a function that's passed in (e.g. elements) if key: return lambda i, k, **c: self.dc_serializer.serialize(i.collection, [k], **c)[k] if serializer: return lambda i, k, **c: serializer(i.collection, key or k, **c) raise TypeError("kwarg serializer or key needed")
[docs]class HDCASerializer(DCASerializer, taggable.TaggableSerializerMixin, annotatable.AnnotatableSerializerMixin): """ Serializer for HistoryDatasetCollectionAssociations. """
[docs] def __init__(self, app: StructuredApp): super().__init__(app) self.hdca_manager = HDCAManager(app) self.default_view = "summary" self.add_view( "summary", [ "id", "type_id", "name", "history_id", "collection_id", "hid", "history_content_type", "collection_type", "populated_state", "populated_state_message", "element_count", "job_source_id", "job_source_type", "job_state_summary", "name", "deleted", "visible", "type", "url", "create_time", "update_time", "tags", "contents_url", ], ) self.add_view( "detailed", [ "populated", "elements", "elements_datatypes", ], include_keys_from="summary", )
[docs] def add_serializers(self): super().add_serializers() taggable.TaggableSerializerMixin.add_serializers(self) annotatable.AnnotatableSerializerMixin.add_serializers(self) serializers: Dict[str, base.Serializer] = { "model_class": lambda item, key, **context: self.hdca_manager.model_class.__class__.__name__, # TODO: remove "type": lambda item, key, **context: "collection", # part of a history and container "history_id": self.serialize_id, "history_content_type": lambda item, key, **context: self.hdca_manager.model_class.content_type, "type_id": self.serialize_type_id, "job_source_id": self.serialize_id, "url": lambda item, key, **context: self.url_for( "history_content_typed", history_id=self.app.security.encode_id(item.history_id), id=self.app.security.encode_id(item.id), type=self.hdca_manager.model_class.content_type, context=context, ), "contents_url": self.generate_contents_url, "job_state_summary": self.serialize_job_state_summary, "elements_datatypes": self.serialize_elements_datatypes, "collection_id": self.serialize_id, } self.serializers.update(serializers)
[docs] def generate_contents_url(self, item, key, **context): encode_id = self.app.security.encode_id trans = context.get("trans") url_for = trans.url_builder if trans and trans.url_builder else self.url_for contents_url = url_for( "contents_dataset_collection", hdca_id=encode_id(item.id), parent_id=encode_id(item.collection_id) ) return contents_url
[docs] def serialize_job_state_summary(self, item, key, **context): return item.job_state_summary_dict
[docs] def serialize_elements_datatypes(self, item, key, **context): extensions_set = item.dataset_dbkeys_and_extensions_summary[1] return list(extensions_set)