Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.hdas
"""
Manager and Serializer for HDAs.
HistoryDatasetAssociations (HDAs) are datasets contained or created in a
history.
"""
import gettext
import logging
import os
from galaxy import (
datatypes,
exceptions,
model
)
from galaxy.managers import (
annotatable,
datasets,
secured,
taggable,
users
)
log = logging.getLogger(__name__)
[docs]class HDAManager(datasets.DatasetAssociationManager,
secured.OwnableManagerMixin,
taggable.TaggableManagerMixin,
annotatable.AnnotatableManagerMixin):
"""
Interface/service object for interacting with HDAs.
"""
model_class = model.HistoryDatasetAssociation
foreign_key_name = 'history_dataset_association'
tag_assoc = model.HistoryDatasetAssociationTagAssociation
annotation_assoc = model.HistoryDatasetAssociationAnnotationAssociation
# TODO: move what makes sense into DatasetManager
# TODO: which of these are common with LDDAs and can be pushed down into DatasetAssociationManager?
[docs] def __init__(self, app):
"""
Set up and initialize other managers needed by hdas.
"""
super().__init__(app)
self.user_manager = users.UserManager(app)
[docs] def get_owned_ids(self, object_ids, history=None):
"""Get owned IDs.
"""
filters = [self.model_class.id.in_(object_ids), self.model_class.history_id == history.id]
return self.list(filters=filters)
# .... security and permissions
[docs] def is_accessible(self, hda, user, **kwargs):
"""
Override to allow owners (those that own the associated history).
"""
# this, apparently, is not True:
# if I have a copy of a dataset and anyone who manages permissions on it revokes my access
# I can not access that dataset even if it's in my history
# if self.is_owner( hda, user, **kwargs ):
# return True
return super().is_accessible(hda, user, **kwargs)
[docs] def is_owner(self, hda, user, current_history=None, **kwargs):
"""
Use history to see if current user owns HDA.
"""
if self.user_manager.is_admin(user, trans=kwargs.get("trans", None)):
return True
history = hda.history
if history is None:
raise HistoryDatasetAssociationNoHistoryException
# allow anonymous user to access current history
# TODO: some dup here with historyManager.is_owner but prevents circ import
# TODO: awkward kwarg (which is my new band name); this may not belong here - move to controller?
if self.user_manager.is_anonymous(user):
if current_history and history == current_history:
return True
return False
return history.user == user
# .... create and copy
[docs] def create(self, history=None, dataset=None, flush=True, **kwargs):
"""
Create a new hda optionally passing in it's history and dataset.
..note: to explicitly set hid to `None` you must pass in `hid=None`, otherwise
it will be automatically set.
"""
if not dataset:
kwargs['create_dataset'] = True
hda = model.HistoryDatasetAssociation(history=history, dataset=dataset,
sa_session=self.app.model.context, **kwargs)
if history:
history.add_dataset(hda, set_hid=('hid' not in kwargs))
# TODO:?? some internal sanity check here (or maybe in add_dataset) to make sure hids are not duped?
self.session().add(hda)
if flush:
self.session().flush()
return hda
[docs] def copy(self, hda, history=None, hide_copy=False, **kwargs):
"""
Copy hda, including annotation and tags, add to history and return the given HDA.
"""
copy = hda.copy(parent_id=kwargs.get('parent_id'), copy_hid=False)
if hide_copy:
copy.visible = False
# add_dataset will update the hid to the next avail. in history
if history:
history.add_dataset(copy)
copy.copied_from_history_dataset_association = hda
copy.set_size()
original_annotation = self.annotation(hda)
self.annotate(copy, original_annotation, user=hda.history.user)
# these use a session flush
original_tags = self.get_tags(hda)
self.set_tags(copy, original_tags, user=hda.history.user)
return copy
[docs] def copy_ldda(self, history, ldda, **kwargs):
"""
Copy this HDA as a LDDA and return.
"""
return ldda.to_history_dataset_association(history, add_to_history=True)
# .... deletion and purging
[docs] def purge(self, hda, flush=True):
"""
Purge this HDA and the dataset underlying it.
"""
user = hda.history.user or None
quota_amount_reduction = 0
if user:
quota_amount_reduction = hda.quota_amount(user)
super().purge(hda, flush=flush)
# decrease the user's space used
if quota_amount_reduction:
user.adjust_total_disk_usage(-quota_amount_reduction)
return hda
# .... states
[docs] def error_if_uploading(self, hda):
"""
Raise error if HDA is still uploading.
"""
# TODO: may be better added to an overridden get_accessible
if hda.state == model.Dataset.states.UPLOAD:
raise exceptions.Conflict("Please wait until this dataset finishes uploading")
return hda
[docs] def has_been_resubmitted(self, hda):
"""
Return True if the hda's job was resubmitted at any point.
"""
job_states = model.Job.states
query = (self._job_state_history_query(hda)
.filter(model.JobStateHistory.state == job_states.RESUBMITTED))
return self.app.model.context.query(query.exists()).scalar()
def _job_state_history_query(self, hda):
"""
Return a query of the job's state history for the job that created this hda.
"""
session = self.app.model.context
JobToOutputDatasetAssociation = model.JobToOutputDatasetAssociation
JobStateHistory = model.JobStateHistory
# TODO: this does not play well with copied hdas
# NOTE: don't eagerload (JODA will load the hda were using!)
hda_id = hda.id
query = (session.query(JobToOutputDatasetAssociation, JobStateHistory)
.filter(JobToOutputDatasetAssociation.dataset_id == hda_id)
.filter(JobStateHistory.job_id == JobToOutputDatasetAssociation.job_id)
.enable_eagerloads(False))
return query
[docs] def data_conversion_status(self, hda):
"""
Returns a message if an hda is not ready to be used in visualization.
"""
# this is a weird syntax and return val
if not hda:
return self.model_class.conversion_messages.NO_DATA
if hda.state == model.Job.states.ERROR:
return self.model_class.conversion_messages.ERROR
if hda.state != model.Job.states.OK:
return self.model_class.conversion_messages.PENDING
return None
# .... data
# TODO: to data provider or Text datatype directly
[docs] def text_data(self, hda, preview=True):
"""
Get data from text file, truncating if necessary.
"""
# 1 MB
MAX_PEEK_SIZE = 1000000
truncated = False
hda_data = None
# For now, cannot get data from non-text datasets.
if not isinstance(hda.datatype, datatypes.data.Text):
return truncated, hda_data
if not os.path.exists(hda.file_name):
return truncated, hda_data
truncated = preview and os.stat(hda.file_name).st_size > MAX_PEEK_SIZE
hda_data = open(hda.file_name).read(MAX_PEEK_SIZE)
return truncated, hda_data
# .... annotatable
[docs] def annotation(self, hda):
# override to scope to history owner
return self._user_annotation(hda, hda.history.user)
def _set_permissions(self, trans, hda, role_ids_dict):
# The user associated the DATASET_ACCESS permission on the dataset with 1 or more roles. We
# need to ensure that they did not associate roles that would cause accessibility problems.
security_agent = trans.app.security_agent
permissions, in_roles, error, message = \
security_agent.derive_roles_from_access(trans, hda.dataset.id, 'root', **role_ids_dict)
if error:
# Keep the original role associations for the DATASET_ACCESS permission on the dataset.
access_action = security_agent.get_action(security_agent.permitted_actions.DATASET_ACCESS.action)
permissions[access_action] = hda.dataset.get_access_roles(security_agent)
trans.sa_session.refresh(hda.dataset)
raise exceptions.RequestParameterInvalidException(message)
else:
error = security_agent.set_all_dataset_permissions(hda.dataset, permissions)
trans.sa_session.refresh(hda.dataset)
if error:
raise exceptions.RequestParameterInvalidException(error)
[docs]class HDASerializer( # datasets._UnflattenedMetadataDatasetAssociationSerializer,
datasets.DatasetAssociationSerializer,
taggable.TaggableSerializerMixin,
annotatable.AnnotatableSerializerMixin):
model_manager_class = HDAManager
[docs] def __init__(self, app):
super().__init__(app)
self.hda_manager = self.manager
self.default_view = 'summary'
self.add_view('summary', [
'id',
'type_id',
'name',
'history_id',
'hid',
'history_content_type',
'dataset_id',
'state',
'extension',
'deleted', 'purged', 'visible',
'tags',
'type',
'url',
'create_time',
'update_time',
])
self.add_view('detailed', [
'model_class',
'history_id', 'hid',
# why include if model_class is there?
'hda_ldda',
# TODO: accessible needs to go away
'accessible',
# remapped
'genome_build', 'misc_info', 'misc_blurb',
'file_ext', 'file_size',
'resubmitted',
'metadata', 'meta_files', 'data_type',
'peek',
'creating_job',
'rerunnable',
'uuid',
'permissions',
'file_name',
'display_apps',
'display_types',
'visualizations',
'validated_state',
'validated_state_message',
# 'url',
'download_url',
'annotation',
'api_type',
'created_from_basename',
], include_keys_from='summary')
self.add_view('extended', [
'tool_version', 'parent_id', 'designation',
], include_keys_from='detailed')
# keyset returned to create show a dataset where the owner has no access
self.add_view('inaccessible', [
'accessible',
'id', 'name', 'history_id', 'hid', 'history_content_type',
'state', 'deleted', 'visible'
])
# fields for new beta web client, there is no summary/detailed split any more
self.add_view('betawebclient', [
# common to hdca
'create_time',
'deleted',
'hid',
'history_content_type',
'history_id',
'id',
'name',
'tags',
'type',
'type_id',
'update_time',
'url',
'visible',
# dataset only
'accessible',
'api_type',
'annotation',
'created_from_basename',
'creating_job',
'dataset_id',
'data_type',
'display_apps',
'display_types',
'download_url',
'extension',
'file_ext',
'file_name',
'file_size',
'genome_build',
'hda_ldda',
'meta_files',
'misc_blurb',
'misc_info',
'model_class',
'peek',
'purged',
'rerunnable',
'resubmitted',
'state',
'uuid',
'validated_state',
'validated_state_message',
])
[docs] def add_serializers(self):
super().add_serializers()
taggable.TaggableSerializerMixin.add_serializers(self)
annotatable.AnnotatableSerializerMixin.add_serializers(self)
self.serializers.update({
'model_class' : lambda *a, **c: 'HistoryDatasetAssociation',
'history_content_type': lambda *a, **c: 'dataset',
'hda_ldda' : lambda *a, **c: 'hda',
'type_id' : self.serialize_type_id,
'history_id' : self.serialize_id,
# remapped
'misc_info' : self._remap_from('info'),
'misc_blurb' : self._remap_from('blurb'),
'file_ext' : self._remap_from('extension'),
'file_path' : self._remap_from('file_name'),
'resubmitted' : lambda i, k, **c: self.hda_manager.has_been_resubmitted(i),
'display_apps' : self.serialize_display_apps,
'display_types' : self.serialize_old_display_applications,
'visualizations': self.serialize_visualization_links,
# 'url' : url_for( 'history_content_typed', history_id=encoded_history_id, id=encoded_id, type="dataset" ),
# TODO: this intermittently causes a routes.GenerationException - temp use the legacy route to prevent this
# see also: https://trello.com/c/5d6j4X5y
# see also: https://sentry.galaxyproject.org/galaxy/galaxy-main/group/20769/events/9352883/
'url' : lambda i, k, **c: self.url_for('history_content',
history_id=self.app.security.encode_id(i.history_id),
id=self.app.security.encode_id(i.id)),
'urls' : self.serialize_urls,
# TODO: backwards compat: need to go away
'download_url' : lambda i, k, **c: self.url_for('history_contents_display',
history_id=self.app.security.encode_id(i.history.id),
history_content_id=self.app.security.encode_id(i.id)),
'parent_id' : self.serialize_id,
# TODO: to DatasetAssociationSerializer
'accessible' : lambda i, k, user=None, **c: self.manager.is_accessible(i, user, **c),
'api_type' : lambda *a, **c: 'file',
'type' : lambda *a, **c: 'file',
'created_from_basename' : lambda i, k, **c: i.created_from_basename,
})
[docs] def serialize(self, hda, keys, user=None, **context):
"""
Override to hide information to users not able to access.
"""
# TODO: to DatasetAssociationSerializer
if not self.manager.is_accessible(hda, user, **context):
keys = self._view_to_keys('inaccessible')
return super().serialize(hda, keys, user=user, **context)
[docs] def serialize_display_apps(self, hda, key, trans=None, **context):
"""
Return dictionary containing new-style display app urls.
"""
display_apps = []
for display_app in hda.get_display_applications(trans).values():
app_links = []
for link_app in display_app.links.values():
app_links.append({
'target': link_app.url.get('target_frame', '_blank'),
'href': link_app.get_display_url(hda, trans),
'text': gettext.gettext(link_app.name)
})
if app_links:
display_apps.append(dict(label=display_app.name, links=app_links))
return display_apps
[docs] def serialize_old_display_applications(self, hda, key, trans=None, **context):
"""
Return dictionary containing old-style display app urls.
"""
display_apps = []
if not self.app.config.enable_old_display_applications:
return display_apps
display_link_fn = hda.datatype.get_display_links
for display_app in hda.datatype.get_display_types():
target_frame, display_links = display_link_fn(hda, display_app, self.app, trans.request.base)
if len(display_links) > 0:
display_label = hda.datatype.get_display_label(display_app)
app_links = []
for display_name, display_link in display_links:
app_links.append({
'target': target_frame,
'href': display_link,
'text': gettext.gettext(display_name)
})
if app_links:
display_apps.append(dict(label=display_label, links=app_links))
return display_apps
[docs] def serialize_visualization_links(self, hda, key, trans=None, **context):
"""
Return a list of dictionaries with links to visualization pages
for those visualizations that apply to this hda.
"""
# use older system if registry is off in the config
if not self.app.visualizations_registry:
return hda.get_visualizations()
return self.app.visualizations_registry.get_visualizations(trans, hda)
[docs] def serialize_urls(self, hda, key, **context):
"""
Return web controller urls useful for this HDA.
"""
url_for = self.url_for
encoded_id = self.app.security.encode_id(hda.id)
urls = {
'purge' : url_for(controller='dataset', action='purge_async', dataset_id=encoded_id),
'display' : url_for(controller='dataset', action='display', dataset_id=encoded_id, preview=True),
'edit' : url_for(controller='dataset', action='edit', dataset_id=encoded_id),
'download' : url_for(controller='dataset', action='display',
dataset_id=encoded_id, to_ext=hda.extension),
'report_error' : url_for(controller='dataset', action='errors', id=encoded_id),
'rerun' : url_for(controller='tool_runner', action='rerun', id=encoded_id),
'show_params' : url_for(controller='dataset', action='show_params', dataset_id=encoded_id),
'visualization' : url_for(controller='visualization', action='index',
id=encoded_id, model='HistoryDatasetAssociation'),
'meta_download' : url_for(controller='dataset', action='get_metadata_file',
hda_id=encoded_id, metadata_name=''),
}
return urls
[docs]class HDADeserializer(datasets.DatasetAssociationDeserializer,
taggable.TaggableDeserializerMixin,
annotatable.AnnotatableDeserializerMixin):
"""
Interface/service object for validating and deserializing dictionaries into histories.
"""
model_manager_class = HDAManager
[docs] def add_deserializers(self):
super().add_deserializers()
taggable.TaggableDeserializerMixin.add_deserializers(self)
annotatable.AnnotatableDeserializerMixin.add_deserializers(self)
self.deserializers.update({
'visible' : self.deserialize_bool,
# remapped
'genome_build' : lambda i, k, v, **c: self.deserialize_genome_build(i, 'dbkey', v),
'misc_info' : lambda i, k, v, **c: self.deserialize_basestring(i, 'info', v,
convert_none_to_empty=True),
})
self.deserializable_keyset.update(self.deserializers.keys())
[docs]class HDAFilterParser(datasets.DatasetAssociationFilterParser,
taggable.TaggableFilterMixin,
annotatable.AnnotatableFilterMixin):
model_manager_class = HDAManager
model_class = model.HistoryDatasetAssociation
def _add_parsers(self):
super()._add_parsers()
taggable.TaggableFilterMixin._add_parsers(self)
annotatable.AnnotatableFilterMixin._add_parsers(self)