Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.hdas
"""
Manager and Serializer for HDAs.
HistoryDatasetAssociations (HDAs) are datasets contained or created in a
history.
"""
import gettext
import logging
import os
from typing import (
Any,
Dict,
List,
Optional,
Set,
)
from sqlalchemy import (
and_,
asc,
desc,
exists,
false,
func,
nulls_first,
nulls_last,
select,
true,
)
from sqlalchemy.orm.session import object_session
from galaxy import (
datatypes,
exceptions,
model,
)
from galaxy.managers import (
annotatable,
base,
datasets,
lddas,
secured,
taggable,
users,
)
from galaxy.model import (
Job,
JobStateHistory,
JobToOutputDatasetAssociation,
)
from galaxy.model.base import transaction
from galaxy.model.deferred import materializer_factory
from galaxy.schema.schema import DatasetSourceType
from galaxy.schema.storage_cleaner import (
CleanableItemsSummary,
StorageItemCleanupError,
StorageItemsCleanupResult,
StoredItem,
StoredItemOrderBy,
)
from galaxy.schema.tasks import (
MaterializeDatasetInstanceTaskRequest,
PurgeDatasetsTaskRequest,
RequestUser,
)
from galaxy.structured_app import (
MinimalManagerApp,
StructuredApp,
)
from galaxy.util.compression_utils import get_fileobj
log = logging.getLogger(__name__)
[docs]class HDAManager(
datasets.DatasetAssociationManager,
secured.OwnableManagerMixin,
annotatable.AnnotatableManagerMixin,
):
"""
Interface/service object for interacting with HDAs.
"""
model_class = model.HistoryDatasetAssociation
foreign_key_name = "history_dataset_association"
tag_assoc = model.HistoryDatasetAssociationTagAssociation
annotation_assoc = model.HistoryDatasetAssociationAnnotationAssociation
app: MinimalManagerApp
# TODO: move what makes sense into DatasetManager
# TODO: which of these are common with LDDAs and can be pushed down into DatasetAssociationManager?
[docs] def __init__(
self,
app: MinimalManagerApp,
user_manager: users.UserManager,
ldda_manager: lddas.LDDAManager,
):
"""
Set up and initialize other managers needed by hdas.
"""
super().__init__(app)
self.user_manager = user_manager
self.ldda_manager = ldda_manager
[docs] def get_owned_ids(self, object_ids, history=None):
"""Get owned IDs."""
filters = [self.model_class.table.c.id.in_(object_ids), self.model_class.table.c.history_id == history.id]
return self.list(filters=filters)
# .... security and permissions
[docs] def is_accessible(self, item: model.HistoryDatasetAssociation, user: Optional[model.User], **kwargs: Any) -> bool:
"""
Override to allow owners (those that own the associated history).
"""
# this, apparently, is not True:
# if I have a copy of a dataset and anyone who manages permissions on it revokes my access
# I can not access that dataset even if it's in my history
# if self.is_owner( hda, user, **kwargs ):
# return True
return super().is_accessible(item, user, **kwargs)
[docs] def is_owner(self, item: model.Base, user: Optional[model.User], current_history=None, **kwargs: Any) -> bool:
"""
Use history to see if current user owns HDA.
"""
if not isinstance(item, model.HistoryDatasetAssociation):
raise TypeError('"item" must be of type HistoryDatasetAssociation.')
if self.user_manager.is_admin(user, trans=kwargs.get("trans", None)):
return True
history = item.history
if history is None:
raise HistoryDatasetAssociationNoHistoryException
# allow anonymous user to access current history
# TODO: some dup here with historyManager.is_owner but prevents circ import
# TODO: awkward kwarg (which is my new band name); this may not belong here - move to controller?
if self.user_manager.is_anonymous(user):
if current_history and history == current_history:
return True
return False
return history.user == user
# .... create and copy
[docs] def create(
self, flush: bool = True, history=None, dataset=None, *args: Any, **kwargs: Any
) -> model.HistoryDatasetAssociation:
"""
Create a new hda optionally passing in it's history and dataset.
..note: to explicitly set hid to `None` you must pass in `hid=None`, otherwise
it will be automatically set.
"""
if not dataset:
kwargs["create_dataset"] = True
hda = model.HistoryDatasetAssociation(
history=history, dataset=dataset, sa_session=self.app.model.context, **kwargs
)
if history:
history.add_dataset(hda, set_hid=("hid" not in kwargs))
# TODO:?? some internal sanity check here (or maybe in add_dataset) to make sure hids are not duped?
self.session().add(hda)
if flush:
session = self.session()
with transaction(session):
session.commit()
return hda
[docs] def materialize(self, request: MaterializeDatasetInstanceTaskRequest) -> None:
request_user: RequestUser = request.user
materializer = materializer_factory(
True, # attached...
object_store=self.app.object_store,
file_sources=self.app.file_sources,
sa_session=self.app.model.context,
)
user = self.user_manager.by_id(request_user.user_id)
if request.source == DatasetSourceType.hda:
dataset_instance = self.get_accessible(request.content, user)
else:
dataset_instance = self.ldda_manager.get_accessible(request.content, user)
history = self.app.history_manager.by_id(request.history_id)
new_hda = materializer.ensure_materialized(dataset_instance, target_history=history)
history.add_dataset(new_hda, set_hid=True)
session = self.session()
with transaction(session):
session.commit()
[docs] def copy(
self, item: Any, history=None, hide_copy: bool = False, flush: bool = True, **kwargs: Any
) -> model.HistoryDatasetAssociation:
"""
Copy hda, including annotation and tags, add to history and return the given HDA.
"""
if not isinstance(item, model.HistoryDatasetAssociation):
raise TypeError()
hda = item
copy = hda.copy(parent_id=kwargs.get("parent_id"), copy_hid=False, copy_tags=hda.tags, flush=False)
if hide_copy:
copy.visible = False
if history:
history.stage_addition(copy)
copy.set_size()
original_annotation = self.annotation(hda)
self.annotate(copy, original_annotation, user=hda.user, flush=False)
if flush:
if history:
history.add_pending_items()
session = object_session(copy)
with transaction(session):
session.commit()
return copy
# .... deletion and purging
[docs] def purge(self, hda, flush=True, **kwargs):
if self.app.config.enable_celery_tasks:
from galaxy.celery.tasks import purge_hda
user = kwargs.get("user")
return purge_hda.delay(hda_id=hda.id, task_user_id=getattr(user, "id", None))
else:
self._purge(hda, flush=flush)
def _purge(self, hda, flush=True):
"""
Purge this HDA and the dataset underlying it.
"""
user = hda.history.user or None
quota_amount_reduction = 0
if user:
quota_amount_reduction = hda.quota_amount(user)
super().purge(hda, flush=flush)
# decrease the user's space used
quota_source_info = hda.dataset.quota_source_info
if quota_amount_reduction and quota_source_info.use:
user.adjust_total_disk_usage(-quota_amount_reduction, quota_source_info.label)
# TODO: don't flush above if we're going to re-flush here
session = object_session(user)
with transaction(session):
session.commit()
# .... states
[docs] def error_if_uploading(self, hda):
"""
Raise error if HDA is still uploading.
"""
# TODO: may be better added to an overridden get_accessible
if hda.state == model.Dataset.states.UPLOAD:
raise exceptions.Conflict("Please wait until this dataset finishes uploading")
return hda
[docs] def has_been_resubmitted(self, hda):
"""
Return True if the hda's job was resubmitted at any point.
"""
stmt = select(
exists()
.where(JobToOutputDatasetAssociation.dataset_id == hda.id)
.where(JobStateHistory.job_id == JobToOutputDatasetAssociation.job_id)
.where(JobStateHistory.state == Job.states.RESUBMITTED)
)
return self.session().scalar(stmt)
[docs] def data_conversion_status(self, hda):
"""
Returns a message if an hda is not ready to be used in visualization.
"""
# this is a weird syntax and return val
if not hda:
return self.model_class.conversion_messages.NO_DATA
if hda.state == model.Job.states.ERROR:
return self.model_class.conversion_messages.ERROR
if hda.state != model.Job.states.OK:
return self.model_class.conversion_messages.PENDING
return None
# .... data
# TODO: to data provider or Text datatype directly
[docs] def text_data(self, hda, preview=True):
"""
Get data from text file, truncating if necessary.
"""
# 1 MB
MAX_PEEK_SIZE = 1000000
truncated = False
hda_data = None
# For now, cannot get data from non-text datasets.
if not isinstance(hda.datatype, datatypes.data.Text):
return truncated, hda_data
file_path = hda.get_file_name()
if not os.path.exists(file_path):
return truncated, hda_data
truncated = preview and os.stat(file_path).st_size > MAX_PEEK_SIZE
with get_fileobj(file_path) as fh:
hda_data = fh.read(MAX_PEEK_SIZE)
return truncated, hda_data
# .... annotatable
[docs] def annotation(self, hda):
# override to scope to history owner
return self._user_annotation(hda, hda.user)
def _set_permissions(self, trans, hda, role_ids_dict):
# The user associated the DATASET_ACCESS permission on the dataset with 1 or more roles. We
# need to ensure that they did not associate roles that would cause accessibility problems.
security_agent = trans.app.security_agent
permissions, in_roles, error, message = security_agent.derive_roles_from_access(
trans, hda.dataset.id, "root", **role_ids_dict
)
if error:
# Keep the original role associations for the DATASET_ACCESS permission on the dataset.
access_action = security_agent.get_action(security_agent.permitted_actions.DATASET_ACCESS.action)
permissions[access_action] = hda.dataset.get_access_roles(security_agent)
trans.sa_session.refresh(hda.dataset)
raise exceptions.RequestParameterInvalidException(message)
else:
error = security_agent.set_all_dataset_permissions(hda.dataset, permissions)
trans.sa_session.refresh(hda.dataset)
if error:
raise exceptions.RequestParameterInvalidException(error)
[docs]class HDAStorageCleanerManager(base.StorageCleanerManager):
[docs] def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager):
self.hda_manager = hda_manager
self.dataset_manager = dataset_manager
self.sort_map = {
StoredItemOrderBy.NAME_ASC: asc(model.HistoryDatasetAssociation.name),
StoredItemOrderBy.NAME_DSC: desc(model.HistoryDatasetAssociation.name),
StoredItemOrderBy.SIZE_ASC: nulls_first(asc(model.Dataset.total_size)),
StoredItemOrderBy.SIZE_DSC: nulls_last(desc(model.Dataset.total_size)),
StoredItemOrderBy.UPDATE_TIME_ASC: asc(model.HistoryDatasetAssociation.update_time),
StoredItemOrderBy.UPDATE_TIME_DSC: desc(model.HistoryDatasetAssociation.update_time),
}
[docs] def get_discarded_summary(self, user: model.User) -> CleanableItemsSummary:
stmt = (
select(func.sum(model.Dataset.total_size), func.count(model.HistoryDatasetAssociation.id))
.select_from(model.HistoryDatasetAssociation)
.join(model.Dataset, model.HistoryDatasetAssociation.table.c.dataset_id == model.Dataset.id)
.join(model.History, model.HistoryDatasetAssociation.table.c.history_id == model.History.id)
.where(
and_(
model.HistoryDatasetAssociation.deleted == true(),
model.HistoryDatasetAssociation.purged == false(),
model.History.user_id == user.id,
)
)
)
result = self.hda_manager.session().execute(stmt).fetchone()
total_size = 0 if result[0] is None else result[0]
return CleanableItemsSummary(total_size=total_size, total_items=result[1])
[docs] def get_discarded(
self,
user: model.User,
offset: Optional[int],
limit: Optional[int],
order: Optional[StoredItemOrderBy],
) -> List[StoredItem]:
stmt = (
select(
model.HistoryDatasetAssociation.id,
model.HistoryDatasetAssociation.name,
model.HistoryDatasetAssociation.update_time,
model.Dataset.total_size,
)
.select_from(model.HistoryDatasetAssociation)
.join(model.Dataset, model.HistoryDatasetAssociation.table.c.dataset_id == model.Dataset.id)
.join(model.History, model.HistoryDatasetAssociation.table.c.history_id == model.History.id)
.where(
and_(
model.HistoryDatasetAssociation.deleted == true(),
model.HistoryDatasetAssociation.purged == false(),
model.History.user_id == user.id,
)
)
)
if offset:
stmt = stmt.offset(offset)
if limit:
stmt = stmt.limit(limit)
if order:
stmt = stmt.order_by(self.sort_map[order])
result = self.hda_manager.session().execute(stmt)
discarded = [
StoredItem(id=row.id, name=row.name, type="dataset", size=row.total_size or 0, update_time=row.update_time)
for row in result
]
return discarded
[docs] def cleanup_items(self, user: model.User, item_ids: Set[int]) -> StorageItemsCleanupResult:
success_item_count = 0
total_free_bytes = 0
errors: List[StorageItemCleanupError] = []
dataset_ids_to_remove: Set[int] = set()
for hda_id in item_ids:
try:
hda: model.HistoryDatasetAssociation = self.hda_manager.get_owned(hda_id, user)
hda.deleted = True
quota_amount = int(hda.quota_amount(user))
hda.purge_usage_from_quota(user, hda.dataset.quota_source_info)
hda.purged = True
dataset_ids_to_remove.add(hda.dataset.id)
success_item_count += 1
total_free_bytes += quota_amount
except Exception as e:
errors.append(StorageItemCleanupError(item_id=hda_id, error=str(e)))
if success_item_count:
session = self.hda_manager.session()
with transaction(session):
session.commit()
self._request_full_delete_all(dataset_ids_to_remove, user)
return StorageItemsCleanupResult(
total_item_count=len(item_ids),
success_item_count=success_item_count,
total_free_bytes=total_free_bytes,
errors=errors,
)
def _request_full_delete_all(self, dataset_ids_to_remove: Set[int], user: Optional[model.User]):
use_tasks = self.dataset_manager.app.config.enable_celery_tasks
request = PurgeDatasetsTaskRequest(dataset_ids=list(dataset_ids_to_remove))
if use_tasks:
from galaxy.celery.tasks import purge_datasets
purge_datasets.delay(request=request, task_user_id=getattr(user, "id", None))
else:
self.dataset_manager.purge_datasets(request)
[docs]class HDASerializer( # datasets._UnflattenedMetadataDatasetAssociationSerializer,
datasets.DatasetAssociationSerializer[HDAManager],
taggable.TaggableSerializerMixin,
annotatable.AnnotatableSerializerMixin,
):
model_manager_class = HDAManager
app: StructuredApp
[docs] def __init__(self, app: StructuredApp):
super().__init__(app)
self.hda_manager = self.manager
self.default_view = "summary"
self.add_view(
"summary",
[
"id",
"type_id",
"name",
"history_id",
"hid",
"history_content_type",
"dataset_id",
"genome_build",
"state",
"extension",
"deleted",
"purged",
"visible",
"tags",
"type",
"url",
"create_time",
"update_time",
"object_store_id",
"quota_source_label",
],
)
self.add_view(
"detailed",
[
"model_class",
"history_id",
"hid",
# why include if model_class is there?
"hda_ldda",
"copied_from_ldda_id",
# TODO: accessible needs to go away
"accessible",
# remapped
"misc_info",
"misc_blurb",
"file_ext",
"file_size",
"resubmitted",
"metadata",
"meta_files",
"data_type",
"peek",
"creating_job",
"rerunnable",
"uuid",
"permissions",
"file_name",
"display_apps",
"display_types",
"validated_state",
"validated_state_message",
# 'url',
"download_url",
"annotation",
"api_type",
"created_from_basename",
"hashes",
"sources",
"drs_id",
],
include_keys_from="summary",
)
self.add_view(
"extended",
[
"tool_version",
"parent_id",
"designation",
],
include_keys_from="detailed",
)
# keyset returned to create show a dataset where the owner has no access
self.add_view(
"inaccessible",
["accessible", "id", "name", "history_id", "hid", "history_content_type", "state", "deleted", "visible"],
)
[docs] def serialize_copied_from_ldda_id(self, item, key, **context):
"""
Serialize an id attribute of `item`.
"""
if item.copied_from_library_dataset_dataset_association is not None:
return self.app.security.encode_id(item.copied_from_library_dataset_dataset_association.id)
return None
[docs] def add_serializers(self):
super().add_serializers()
taggable.TaggableSerializerMixin.add_serializers(self)
annotatable.AnnotatableSerializerMixin.add_serializers(self)
serializers: Dict[str, base.Serializer] = {
"hid": lambda item, key, **context: item.hid if item.hid is not None else -1,
"model_class": lambda item, key, **context: "HistoryDatasetAssociation",
"history_content_type": lambda item, key, **context: "dataset",
"hda_ldda": lambda item, key, **context: "hda",
"type_id": self.serialize_type_id,
"copied_from_ldda_id": self.serialize_copied_from_ldda_id,
"history_id": self.serialize_id,
# remapped
"misc_info": self._remap_from("info"),
"misc_blurb": self._remap_from("blurb"),
"file_ext": self._remap_from("extension"),
"file_path": self._remap_from("file_name"),
"resubmitted": lambda item, key, **context: self.hda_manager.has_been_resubmitted(item),
"display_apps": self.serialize_display_apps,
"display_types": self.serialize_old_display_applications,
# 'url' : url_for( 'history_content_typed', history_id=encoded_history_id, id=encoded_id, type="dataset" ),
# TODO: this intermittently causes a routes.GenerationException - temp use the legacy route to prevent this
# see also: https://trello.com/c/5d6j4X5y
# see also: https://sentry.galaxyproject.org/galaxy/galaxy-main/group/20769/events/9352883/
"url": lambda item, key, **context: self.url_for(
"history_content",
history_id=self.app.security.encode_id(item.history_id),
id=self.app.security.encode_id(item.id),
context=context,
),
"urls": self.serialize_urls,
# TODO: backwards compat: need to go away
"download_url": lambda item, key, **context: self.url_for(
"history_contents_display",
history_id=self.app.security.encode_id(item.history.id),
history_content_id=self.app.security.encode_id(item.id),
context=context,
),
"parent_id": self.serialize_id,
# TODO: to DatasetAssociationSerializer
"accessible": lambda item, key, user=None, **c: self.manager.is_accessible(item, user, **c),
"api_type": lambda item, key, **context: "file",
"type": lambda item, key, **context: "file",
"created_from_basename": lambda item, key, **context: item.created_from_basename,
"object_store_id": lambda item, key, **context: item.object_store_id,
"quota_source_label": lambda item, key, **context: item.dataset.quota_source_label,
"hashes": lambda item, key, **context: [h.to_dict() for h in item.hashes],
"sources": lambda item, key, **context: [s.to_dict() for s in item.sources],
"drs_id": lambda item, key, **context: f"hda-{self.app.security.encode_id(item.id, kind='drs')}",
}
self.serializers.update(serializers)
[docs] def serialize(self, hda, keys, user=None, **context):
"""
Override to hide information to users not able to access.
"""
# TODO: to DatasetAssociationSerializer
if not self.manager.is_accessible(hda, user, **context):
keys = self._view_to_keys("inaccessible")
return super().serialize(hda, keys, user=user, **context)
[docs] def serialize_display_apps(self, item, key, trans=None, **context):
"""
Return dictionary containing new-style display app urls.
"""
hda = item
display_apps: List[Dict[str, Any]] = []
if hda.state == model.HistoryDatasetAssociation.states.OK and not hda.deleted:
for display_app in hda.get_display_applications(trans).values():
app_links = []
for link_app in display_app.links.values():
app_links.append(
{
"target": link_app.url.get("target_frame", "_blank"),
"href": link_app.get_display_url(hda, trans),
"text": gettext.gettext(link_app.name),
}
)
if app_links:
display_apps.append(dict(label=display_app.name, links=app_links))
return display_apps
[docs] def serialize_old_display_applications(self, item, key, trans=None, **context):
"""
Return dictionary containing old-style display app urls.
"""
hda = item
display_apps: List[Dict[str, Any]] = []
if (
self.app.config.enable_old_display_applications
and hda.state == model.HistoryDatasetAssociation.states.OK
and not hda.deleted
):
display_link_fn = hda.datatype.get_display_links
for display_app in hda.datatype.get_display_types():
target_frame, display_links = display_link_fn(
hda,
display_app,
self.app,
trans.request.base,
)
if len(display_links) > 0:
display_label = hda.datatype.get_display_label(display_app)
app_links = []
for display_name, display_link in display_links:
app_links.append(
{"target": target_frame, "href": display_link, "text": gettext.gettext(display_name)}
)
if app_links:
display_apps.append(dict(label=display_label, links=app_links))
return display_apps
[docs] def serialize_urls(self, item, key, **context):
"""
Return web controller urls useful for this HDA.
"""
hda = item
url_for = self.url_for
encoded_id = self.app.security.encode_id(hda.id)
urls = {
"purge": url_for(controller="dataset", action="purge_async", dataset_id=encoded_id),
"display": url_for(controller="dataset", action="display", dataset_id=encoded_id, preview=True),
"edit": url_for(controller="dataset", action="edit", dataset_id=encoded_id),
"download": url_for(controller="dataset", action="display", dataset_id=encoded_id, to_ext=hda.extension),
"report_error": url_for(controller="dataset", action="errors", id=encoded_id),
"rerun": url_for(controller="tool_runner", action="rerun", id=encoded_id),
"show_params": url_for(controller="dataset", action="details", dataset_id=encoded_id),
"visualization": url_for(
controller="visualization", action="index", id=encoded_id, model="HistoryDatasetAssociation"
),
"meta_download": url_for(
controller="dataset", action="get_metadata_file", hda_id=encoded_id, metadata_name=""
),
}
return urls
[docs]class HDADeserializer(
datasets.DatasetAssociationDeserializer,
taggable.TaggableDeserializerMixin,
annotatable.AnnotatableDeserializerMixin,
):
"""
Interface/service object for validating and deserializing dictionaries into histories.
"""
model_manager_class = HDAManager
[docs] def __init__(self, app: MinimalManagerApp):
super().__init__(app)
self.hda_manager = self.manager
[docs] def add_deserializers(self):
super().add_deserializers()
taggable.TaggableDeserializerMixin.add_deserializers(self)
annotatable.AnnotatableDeserializerMixin.add_deserializers(self)
self.deserializers.update(
{
"visible": self.deserialize_bool,
# remapped
"genome_build": lambda item, key, val, **c: self.deserialize_genome_build(item, "dbkey", val),
"misc_info": lambda item, key, val, **c: self.deserialize_basestring(
item, "info", val, convert_none_to_empty=True
),
}
)
self.deserializable_keyset.update(self.deserializers.keys())
[docs]class HDAFilterParser(
datasets.DatasetAssociationFilterParser, taggable.TaggableFilterMixin, annotatable.AnnotatableFilterMixin
):
model_manager_class = HDAManager
model_class = model.HistoryDatasetAssociation
def _add_parsers(self):
super()._add_parsers()
taggable.TaggableFilterMixin._add_parsers(self)
annotatable.AnnotatableFilterMixin._add_parsers(self)