Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.hdcas

"""
Manager and Serializer for HDCAs.

HistoryDatasetCollectionAssociations (HDCAs) are datasets contained or created in a
history.
"""
import logging
from typing import Dict

from galaxy import model
from galaxy.managers import (
    annotatable,
    base,
    deletable,
    hdas,
    secured,
    taggable
)
from galaxy.managers.collections_util import get_hda_and_element_identifiers
from galaxy.structured_app import MinimalManagerApp
from galaxy.util.zipstream import ZipstreamWrapper


log = logging.getLogger(__name__)


[docs]def stream_dataset_collection(dataset_collection_instance, upstream_mod_zip=False, upstream_gzip=False): archive_name = f"{dataset_collection_instance.hid}: {dataset_collection_instance.name}" archive = ZipstreamWrapper( archive_name=archive_name, upstream_mod_zip=upstream_mod_zip, upstream_gzip=upstream_gzip, ) names, hdas = get_hda_and_element_identifiers(dataset_collection_instance) for name, hda in zip(names, hdas): if hda.state != hda.states.OK: continue for file_path, relpath in hda.datatype.to_archive(dataset=hda, name=name): archive.write(file_path, relpath) return archive
[docs]def set_collection_attributes(dataset_element, *payload): for attribute, value in payload: setattr(dataset_element, attribute[1], value[1])
# TODO: to DatasetCollectionInstanceManager
[docs]class HDCAManager( base.ModelManager, secured.AccessibleManagerMixin, secured.OwnableManagerMixin, deletable.PurgableManagerMixin, taggable.TaggableManagerMixin, annotatable.AnnotatableManagerMixin): """ Interface/service object for interacting with HDCAs. """ model_class = model.HistoryDatasetCollectionAssociation foreign_key_name = 'history_dataset_collection_association' tag_assoc = model.HistoryDatasetCollectionTagAssociation annotation_assoc = model.HistoryDatasetCollectionAssociationAnnotationAssociation
[docs] def map_datasets(self, content, fn, *parents): """ Iterate over the datasets of a given collection, recursing into collections, and calling fn on each dataset. Uses the same kwargs as `contents` above. """ returned = [] # lots of nesting going on within the nesting collection = content.collection if hasattr(content, 'collection') else content this_parents = (content, ) + parents for element in collection.elements: next_parents = (element, ) + this_parents if element.is_collection: processed_list = self.map_datasets(element.child_collection, fn, *next_parents) returned.extend(processed_list) else: processed = fn(element.dataset_instance, *next_parents) returned.append(processed) return returned
[docs] def update_attributes(self, content, payload: Dict): # pre-requisite checked that attributes are valid self.map_datasets(content, fn=lambda item, *args: set_collection_attributes(item, payload.items()))
# serializers # -----------------------------------------------------------------------------
[docs]class DCESerializer(base.ModelSerializer): """ Serializer for DatasetCollectionElements. """
[docs] def __init__(self, app: MinimalManagerApp): super().__init__(app) self.hda_serializer = hdas.HDASerializer(app) self.dc_serializer = DCSerializer(app, dce_serializer=self) self.default_view = 'summary' self.add_view('summary', [ 'id', 'model_class', 'element_index', 'element_identifier', 'element_type', 'object' ])
[docs] def add_serializers(self): super().add_serializers() self.serializers.update({ 'model_class': lambda *a, **c: 'DatasetCollectionElement', 'object': self.serialize_object })
[docs] def serialize_object(self, item, key, **context): if item.hda: return self.hda_serializer.serialize_to_view(item.hda, view='summary', **context) if item.child_collection: return self.dc_serializer.serialize_to_view(item.child_collection, view='detailed', **context) return 'object'
[docs]class DCSerializer(base.ModelSerializer): """ Serializer for DatasetCollections. """
[docs] def __init__(self, app: MinimalManagerApp, dce_serializer=None): super().__init__(app) self.dce_serializer = dce_serializer or DCESerializer(app) self.default_view = 'summary' self.add_view('summary', [ 'id', 'create_time', 'update_time', 'collection_type', 'populated_state', 'populated_state_message', 'element_count', ]) self.add_view('detailed', [ 'populated', 'elements', ], include_keys_from='summary')
[docs] def add_serializers(self): super().add_serializers() self.serializers.update({ 'model_class': lambda *a, **c: 'DatasetCollection', 'elements': self.serialize_elements, })
[docs] def serialize_elements(self, item, key, **context): returned = [] for element in item.elements: serialized = self.dce_serializer.serialize_to_view(element, view='summary', **context) returned.append(serialized) return returned
[docs]class DCASerializer(base.ModelSerializer): """ Base (abstract) Serializer class for HDCAs and LDCAs. """
[docs] def __init__(self, app: MinimalManagerApp, dce_serializer=None): super().__init__(app) self.dce_serializer = dce_serializer or DCESerializer(app) self.default_view = 'summary' self.add_view('summary', [ 'id', 'create_time', 'update_time', 'collection_type', 'populated_state', 'populated_state_message', 'element_count', ]) self.add_view('detailed', [ 'populated', 'elements', ], include_keys_from='summary')
[docs] def add_serializers(self): super().add_serializers() # most attributes are (kinda) proxied from DCs - we need a serializer to proxy to self.dc_serializer = DCSerializer(self.app) # then set the serializers to point to it for those attrs collection_keys = [ 'create_time', 'update_time', 'collection_type', 'populated', 'populated_state', 'populated_state_message', 'elements', 'element_count', ] for key in collection_keys: self.serializers[key] = self._proxy_to_dataset_collection(key=key)
def _proxy_to_dataset_collection(self, serializer=None, key=None): # dataset_collection associations are (rough) proxies to datasets - access their serializer using this remapping fn # remapping done by either kwarg key: IOW dataset attr key (e.g. populated_state) # or by kwarg serializer: a function that's passed in (e.g. elements) if key: return lambda i, k, **c: self.dc_serializer.serialize(i.collection, [k], **c)[k] if serializer: return lambda i, k, **c: serializer(i.collection, key or k, **c) raise TypeError('kwarg serializer or key needed')
[docs]class HDCASerializer( DCASerializer, taggable.TaggableSerializerMixin, annotatable.AnnotatableSerializerMixin): """ Serializer for HistoryDatasetCollectionAssociations. """
[docs] def __init__(self, app: MinimalManagerApp): super().__init__(app) self.hdca_manager = HDCAManager(app) self.default_view = 'summary' self.add_view('summary', [ 'id', 'type_id', 'name', 'history_id', 'hid', 'history_content_type', 'collection_type', 'populated_state', 'populated_state_message', 'element_count', 'job_source_id', 'job_source_type', 'name', 'type_id', 'deleted', # 'purged', 'visible', 'type', 'url', 'create_time', 'update_time', 'tags', # TODO: detail view only (maybe), 'contents_url' ]) self.add_view('detailed', [ 'populated', 'elements' ], include_keys_from='summary') # fields for new beta web client, there is no summary/detailed split any more self.add_view('betawebclient', [ # common to hda 'create_time', 'deleted', 'hid', 'history_content_type', 'history_id', 'id', 'name', 'tags', 'type', 'type_id', 'update_time', 'url', 'visible', # hdca only 'collection_id', 'collection_type', 'contents_url', 'element_count', 'job_source_id', 'job_source_type', 'job_state_summary', 'populated', 'populated_state', 'populated_state_message', ])
[docs] def add_serializers(self): super().add_serializers() taggable.TaggableSerializerMixin.add_serializers(self) annotatable.AnnotatableSerializerMixin.add_serializers(self) self.serializers.update({ 'model_class': lambda *a, **c: self.hdca_manager.model_class.__class__.__name__, # TODO: remove 'type': lambda *a, **c: 'collection', # part of a history and container 'history_id': self.serialize_id, 'history_content_type': lambda *a, **c: self.hdca_manager.model_class.content_type, 'type_id': self.serialize_type_id, 'job_source_id': self.serialize_id, 'url': lambda i, k, **c: self.url_for('history_content_typed', history_id=self.app.security.encode_id(i.history_id), id=self.app.security.encode_id(i.id), type=self.hdca_manager.model_class.content_type), 'contents_url': self.generate_contents_url, 'job_state_summary': self.serialize_job_state_summary })
[docs] def generate_contents_url(self, hdca, key, **context): encode_id = self.app.security.encode_id contents_url = self.url_for('contents_dataset_collection', hdca_id=encode_id(hdca.id), parent_id=encode_id(hdca.collection_id)) return contents_url
[docs] def serialize_job_state_summary(self, hdca, key, **context): states = hdca.job_state_summary.__dict__.copy() del states['_sa_instance_state'] del states['hdca_id'] return states