Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.history_contents

import logging
import os
import re
from typing import (
    Any,
    cast,
    Dict,
    Iterable,
    List,
    Optional,
    Set,
    Union,
)

from celery import chain
from pydantic import (
    Extra,
    Field,
)
from typing_extensions import (
    Literal,
    Protocol,
)

from galaxy import exceptions
from galaxy.celery.tasks import (
    change_datatype,
    materialize as materialize_task,
    prepare_dataset_collection_download,
    prepare_history_content_download,
    touch,
    write_history_content_to,
)
from galaxy.managers import (
    folders,
    hdas,
    hdcas,
    histories,
)
from galaxy.managers.base import ModelSerializer
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.collections_util import (
    api_payload_to_create_params,
    dictify_dataset_collection_instance,
)
from galaxy.managers.context import (
    ProvidesHistoryContext,
    ProvidesUserContext,
)
from galaxy.managers.genomes import GenomesManager
from galaxy.managers.history_contents import (
    HistoryContentsFilters,
    HistoryContentsManager,
)
from galaxy.managers.jobs import (
    fetch_job_states,
    summarize_jobs_to_dict,
)
from galaxy.managers.library_datasets import LibraryDatasetsManager
from galaxy.model import (
    History,
    HistoryDatasetAssociation,
    HistoryDatasetCollectionAssociation,
    LibraryDataset,
    User,
)
from galaxy.model.base import transaction
from galaxy.model.security import GalaxyRBACAgent
from galaxy.objectstore import BaseObjectStore
from galaxy.schema import (
    FilterQueryParams,
    SerializationParams,
    ValueFilterQueryParams,
)
from galaxy.schema.fields import (
    DecodedDatabaseIdField,
    LibraryFolderDatabaseIdField,
)
from galaxy.schema.schema import (
    AnyBulkOperationParams,
    AnyHistoryContentItem,
    AnyJobStateSummary,
    AsyncFile,
    AsyncTaskResultSummary,
    BulkOperationItemError,
    ChangeDatatypeOperationParams,
    ChangeDbkeyOperationParams,
    ColletionSourceType,
    CreateNewCollectionPayload,
    DatasetAssociationRoles,
    DeleteHistoryContentPayload,
    EncodedHistoryContentItem,
    HistoryContentBulkOperationPayload,
    HistoryContentBulkOperationResult,
    HistoryContentItem,
    HistoryContentItemOperation,
    HistoryContentsArchiveDryRunResult,
    HistoryContentSource,
    HistoryContentsResult,
    HistoryContentStats,
    HistoryContentsWithStatsResult,
    HistoryContentType,
    JobSourceType,
    MaterializeDatasetInstanceRequest,
    Model,
    StoreContentSource,
    StoreExportPayload,
    TagOperationParams,
    UpdateDatasetPermissionsPayload,
    UpdateHistoryContentsBatchPayload,
    WriteStoreToPayload,
)
from galaxy.schema.tasks import (
    GenerateHistoryContentDownload,
    MaterializeDatasetInstanceTaskRequest,
    PrepareDatasetCollectionDownload,
    WriteHistoryContentTo,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util.zipstream import ZipstreamWrapper
from galaxy.web.short_term_storage import ShortTermStorageAllocator
from galaxy.webapps.galaxy.services.base import (
    async_task_summary,
    ConsumesModelStores,
    ensure_celery_tasks_enabled,
    model_store_storage_target,
    ServesExportStores,
    ServiceBase,
)

log = logging.getLogger(__name__)

DatasetDetailsType = Union[Set[DecodedDatabaseIdField], Literal["all"]]
HistoryItemModel = Union[HistoryDatasetAssociation, HistoryDatasetCollectionAssociation]


[docs]class HistoryContentsIndexParams(Model): """Query parameters exclusively used by the *new version* of `index` operation.""" v: Optional[Literal["dev"]] dataset_details: Optional[DatasetDetailsType]
[docs]class LegacyHistoryContentsIndexParams(Model): """Query parameters exclusively used by the *legacy version* of `index` operation.""" ids: Optional[List[DecodedDatabaseIdField]] types: List[HistoryContentType] dataset_details: Optional[DatasetDetailsType] deleted: Optional[bool] visible: Optional[bool] shareable: Optional[bool] = Field( default=None, title="Sharable", description="Whether to return only shareable or not shareable datasets. Leave unset for both.", )
[docs]class HistoryContentsIndexJobsSummaryParams(Model): """Query parameters exclusively used by the `index_jobs_summary` operation.""" ids: List[DecodedDatabaseIdField] = [] types: List[JobSourceType] = []
[docs]class CreateHistoryContentPayloadBase(Model): type: Optional[HistoryContentType] = Field( HistoryContentType.dataset, title="Type", description="The type of content to be created in the history.", )
[docs]class CreateHistoryContentPayloadFromCopy(CreateHistoryContentPayloadBase): source: Optional[HistoryContentSource] = Field( None, title="Source", description="The source of the content. Can be other history element to be copied or library elements.", ) content: Optional[Union[DecodedDatabaseIdField, LibraryFolderDatabaseIdField]] = Field( None, title="Content", description=( "Depending on the `source` it can be:\n" "- The encoded id from the library dataset\n" "- The encoded id from the library folder\n" "- The encoded id from the HDA\n" "- The encoded id from the HDCA\n" ), )
[docs]class CollectionElementIdentifier(Model): name: Optional[str] = Field( None, title="Name", description="The name of the element.", ) src: ColletionSourceType = Field( ..., title="Source", description="The source of the element.", ) id: Optional[DecodedDatabaseIdField] = Field( None, title="ID", description="The encoded ID of the element.", ) tags: List[str] = Field( default=[], title="Tags", description="The list of tags associated with the element.", ) element_identifiers: Optional[List["CollectionElementIdentifier"]] = Field( default=None, title="Element Identifiers", description="List of elements that should be in the new nested collection.", ) collection_type: Optional[str] = Field( default=None, title="Collection Type", description="The type of the nested collection. For example, `list`, `paired`, `list:paired`.", )
# Required for self-referencing models # See https://pydantic-docs.helpmanual.io/usage/postponed_annotations/#self-referencing-models CollectionElementIdentifier.update_forward_refs()
[docs]class CreateHistoryContentFromStore(StoreContentSource): pass
[docs]class CreateHistoryContentPayloadFromCollection(CreateHistoryContentPayloadFromCopy): dbkey: Optional[str] = Field( default=None, title="DBKey", description="TODO", ) copy_elements: Optional[bool] = Field( default=True, title="Copy Elements", description=( "If the source is a collection, whether to copy child HDAs into the target " "history as well. Prior to the galaxy release 23.1 this defaulted to false." ), )
[docs]class CreateHistoryContentPayload(CreateHistoryContentPayloadFromCollection, CreateNewCollectionPayload):
[docs] class Config: extra = Extra.allow
[docs]class HistoriesContentsService(ServiceBase, ServesExportStores, ConsumesModelStores): """Common interface/service logic for interactions with histories contents in the context of the API. Provides the logic of the actions invoked by API controllers and uses type definitions and pydantic models to declare its parameters and return types. """
[docs] def __init__( self, security: IdEncodingHelper, object_store: BaseObjectStore, history_manager: histories.HistoryManager, history_contents_manager: HistoryContentsManager, hda_manager: hdas.HDAManager, hdca_manager: hdcas.HDCAManager, dataset_collection_manager: DatasetCollectionManager, ldda_manager: LibraryDatasetsManager, folder_manager: folders.FolderManager, hda_serializer: hdas.HDASerializer, hda_deserializer: hdas.HDADeserializer, hdca_serializer: hdcas.HDCASerializer, history_contents_filters: HistoryContentsFilters, short_term_storage_allocator: ShortTermStorageAllocator, genomes_manager: GenomesManager, ): super().__init__(security) self.history_manager = history_manager self.history_contents_manager = history_contents_manager self.hda_manager = hda_manager self.hdca_manager = hdca_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager self.folder_manager = folder_manager self.hda_serializer = hda_serializer self.hda_deserializer = hda_deserializer self.hdca_serializer = hdca_serializer self.history_contents_filters = history_contents_filters self.item_operator = HistoryItemOperator(self.hda_manager, self.hdca_manager, self.dataset_collection_manager) self.short_term_storage_allocator = short_term_storage_allocator self.genomes_manager = genomes_manager self.object_store = object_store
[docs] def index( self, trans, history_id: DecodedDatabaseIdField, params: HistoryContentsIndexParams, legacy_params: LegacyHistoryContentsIndexParams, serialization_params: SerializationParams, filter_query_params: FilterQueryParams, accept: str, ) -> Union[HistoryContentsResult, HistoryContentsWithStatsResult]: """ Return a list of contents (HDAs and HDCAs) for the history with the given ``ID``. .. note:: Anonymous users are allowed to get their current history contents. """ if params.v == "dev": return self.__index_v2(trans, history_id, params, serialization_params, filter_query_params, accept) return self.__index_legacy(trans, history_id, legacy_params)
[docs] def show( self, trans, id: DecodedDatabaseIdField, serialization_params: SerializationParams, contents_type: HistoryContentType, fuzzy_count: Optional[int] = None, ) -> AnyHistoryContentItem: """ Return detailed information about an HDA or HDCA within a history .. note:: Anonymous users are allowed to get their current history contents :param id: the encoded id of the HDA or HDCA to return :param contents_type: 'dataset' or 'dataset_collection' :param serialization_params.view: if fetching a dataset collection - the view style of the dataset collection to produce. 'collection' returns no element information, 'element' returns detailed element information for all datasets, 'element-reference' returns a minimal set of information about datasets (for instance id, type, and state but not metadata, peek, info, or name). The default is 'element'. :param fuzzy_count: this value can be used to broadly restrict the magnitude of the number of elements returned via the API for large collections. The number of actual elements returned may be "a bit" more than this number or "a lot" less - varying on the depth of nesting, balance of nesting at each level, and size of target collection. The consumer of this API should not expect a stable number or pre-calculable number of elements to be produced given this parameter - the only promise is that this API will not respond with an order of magnitude more elements estimated with this value. The UI uses this parameter to fetch a "balanced" concept of the "start" of large collections at every depth of the collection. :returns: dictionary containing detailed HDA or HDCA information """ if contents_type == HistoryContentType.dataset: return self.__show_dataset(trans, id, serialization_params) elif contents_type == HistoryContentType.dataset_collection: return self.__show_dataset_collection(trans, id, serialization_params, fuzzy_count) raise exceptions.UnknownContentsType(f"Unknown contents type: {contents_type}")
[docs] def prepare_store_download( self, trans: ProvidesHistoryContext, id: DecodedDatabaseIdField, payload: StoreExportPayload, contents_type: HistoryContentType = HistoryContentType.dataset, ) -> AsyncFile: model_store_format = payload.model_store_format if contents_type == HistoryContentType.dataset: hda = self.hda_manager.get_accessible(id, trans.user) content_id = hda.id content_name = hda.name elif contents_type == HistoryContentType.dataset_collection: dataset_collection_instance = self.__get_accessible_collection(trans, id) content_id = dataset_collection_instance.id content_name = dataset_collection_instance.name else: raise exceptions.UnknownContentsType(f"Unknown contents type: {contents_type}") short_term_storage_target = model_store_storage_target( self.short_term_storage_allocator, content_name, model_store_format, ) request = GenerateHistoryContentDownload( short_term_storage_request_id=short_term_storage_target.request_id, user=trans.async_request_user, content_type=contents_type, content_id=content_id, **payload.dict(), ) result = prepare_history_content_download.delay(request=request) return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=async_task_summary(result))
[docs] def write_store( self, trans: ProvidesHistoryContext, id: DecodedDatabaseIdField, payload: WriteStoreToPayload, contents_type: HistoryContentType = HistoryContentType.dataset, ): ensure_celery_tasks_enabled(trans.app.config) if contents_type == HistoryContentType.dataset: hda = self.hda_manager.get_accessible(id, trans.user) content_id = hda.id elif contents_type == HistoryContentType.dataset_collection: dataset_collection_instance = self.__get_accessible_collection(trans, id) content_id = dataset_collection_instance.id else: raise exceptions.UnknownContentsType(f"Unknown contents type: {contents_type}") request = WriteHistoryContentTo( user=trans.async_request_user, content_id=content_id, contents_type=contents_type, **payload.dict() ) result = write_history_content_to.delay(request=request) return async_task_summary(result)
[docs] def index_jobs_summary( self, trans, params: HistoryContentsIndexJobsSummaryParams, ) -> List[AnyJobStateSummary]: """ Return job state summary info for jobs, implicit groups jobs for collections or workflow invocations Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. """ ids = params.ids types = params.types if len(ids) != len(types): raise exceptions.RequestParameterInvalidException( f"The number of ids ({len(ids)}) and types ({len(types)}) must match." ) return [self.encode_all_ids(job_state) for job_state in fetch_job_states(trans.sa_session, ids, types)]
[docs] def show_jobs_summary( self, trans, id: DecodedDatabaseIdField, contents_type: HistoryContentType, ) -> AnyJobStateSummary: """ Return detailed information about an HDA or HDCAs jobs Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. :param id: the encoded id of the HDA or HDCA to return :param contents_type: 'dataset' or 'dataset_collection' :returns: dictionary containing jobs summary object """ # At most one of job or implicit_collection_jobs should be found. job = None implicit_collection_jobs = None if contents_type == HistoryContentType.dataset: hda = self.hda_manager.get_accessible(id, trans.user) job = hda.creating_job elif contents_type == HistoryContentType.dataset_collection: dataset_collection_instance = self.__get_accessible_collection(trans, id) job_source_type = dataset_collection_instance.job_source_type if job_source_type == JobSourceType.Job: job = dataset_collection_instance.job elif job_source_type == JobSourceType.ImplicitCollectionJobs: implicit_collection_jobs = dataset_collection_instance.implicit_collection_jobs assert job is None or implicit_collection_jobs is None return self.encode_all_ids(summarize_jobs_to_dict(trans.sa_session, job or implicit_collection_jobs))
[docs] def get_dataset_collection_archive_for_download(self, trans, id: DecodedDatabaseIdField): """ Download the content of a HistoryDatasetCollection as a tgz archive while maintaining approximate collection structure. :param id: encoded HistoryDatasetCollectionAssociation (HDCA) id """ dataset_collection_instance = self.__get_accessible_collection(trans, id) return self.__stream_dataset_collection(trans, dataset_collection_instance)
[docs] def prepare_collection_download(self, trans, id: DecodedDatabaseIdField) -> AsyncFile: ensure_celery_tasks_enabled(trans.app.config) dataset_collection_instance = self.__get_accessible_collection(trans, id) archive_name = f"{dataset_collection_instance.hid}: {dataset_collection_instance.name}" short_term_storage_target = self.short_term_storage_allocator.new_target( filename=archive_name, mime_type="application/x-zip-compressed" ) request = PrepareDatasetCollectionDownload( short_term_storage_request_id=short_term_storage_target.request_id, history_dataset_collection_association_id=dataset_collection_instance.id, ) result = prepare_dataset_collection_download.delay(request=request) return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=async_task_summary(result))
def __stream_dataset_collection(self, trans, dataset_collection_instance): archive = hdcas.stream_dataset_collection( dataset_collection_instance=dataset_collection_instance, upstream_mod_zip=trans.app.config.upstream_mod_zip ) return archive
[docs] def create( self, trans, history_id: DecodedDatabaseIdField, payload: CreateHistoryContentPayload, serialization_params: SerializationParams, ) -> Union[AnyHistoryContentItem, List[AnyHistoryContentItem]]: """ Create a new HDA or HDCA. ..note: Currently, a user can only copy an HDA from a history that the user owns. """ history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) serialization_params.default_view = "detailed" history_content_type = payload.type if history_content_type == HistoryContentType.dataset: source = payload.source if source == HistoryContentSource.library_folder: return self.__create_datasets_from_library_folder(trans, history, payload, serialization_params) else: return self.__create_dataset(trans, history, payload, serialization_params) elif history_content_type == HistoryContentType.dataset_collection: return self.__create_dataset_collection(trans, history, payload, serialization_params) raise exceptions.UnknownContentsType(f"Unknown contents type: {payload.type}")
[docs] def create_from_store( self, trans, history_id: DecodedDatabaseIdField, payload: CreateHistoryContentFromStore, serialization_params: SerializationParams, ) -> List[AnyHistoryContentItem]: history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) object_tracker = self.create_objects_from_store( trans, payload, history=history, ) rval: List[AnyHistoryContentItem] = [] serialization_params.default_view = "detailed" for hda in object_tracker.hdas_by_key.values(): if hda.visible: hda_dict = self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params.dict() ) rval.append(hda_dict) for hdca in object_tracker.hdcas_by_key.values(): hdca_dict = self.hdca_serializer.serialize_to_view( hdca, user=trans.user, trans=trans, **serialization_params.dict() ) rval.append(hdca_dict) return rval
[docs] def materialize( self, trans, request: MaterializeDatasetInstanceRequest, ) -> AsyncTaskResultSummary: # DO THIS JUST TO MAKE SURE IT IS OWNED... self.history_manager.get_mutable(request.history_id, trans.user, current_history=trans.history) assert trans.app.config.enable_celery_tasks task_request = MaterializeDatasetInstanceTaskRequest( history_id=request.history_id, source=request.source, content=request.content, user=trans.async_request_user, ) results = materialize_task.delay(request=task_request) return async_task_summary(results)
[docs] def update_permissions( self, trans, history_content_id: DecodedDatabaseIdField, payload: UpdateDatasetPermissionsPayload, ) -> DatasetAssociationRoles: """ Set permissions of the given dataset to the given role ids. :type payload: dict :param payload: dictionary structure containing: - action: (required) describes what action should be performed. Available actions: make_private, remove_restrictions, set_permissions - access_ids[]: list of Role.id defining roles that should have access permission on the dataset - manage_ids[]: list of Role.id defining roles that should have manage permission on the dataset - modify_ids[]: list of Role.id defining roles that should have modify permission on the library dataset item :raises: RequestParameterInvalidException, ObjectNotFound, InsufficientPermissionsException, InternalServerError RequestParameterMissingException """ payload_dict = payload.dict(by_alias=True) hda = self.hda_manager.get_owned(history_content_id, trans.user, current_history=trans.history, trans=trans) assert hda is not None self.history_manager.error_unless_mutable(hda.history) self.hda_manager.update_permissions(trans, hda, **payload_dict) roles = self.hda_manager.serialize_dataset_association_roles(trans, hda) return DatasetAssociationRoles.construct(**roles)
[docs] def update( self, trans, history_id: DecodedDatabaseIdField, id: DecodedDatabaseIdField, payload: Dict[str, Any], serialization_params: SerializationParams, contents_type: HistoryContentType, ): """ Updates the values for the history content item with the given ``id`` :param history_id: encoded id string of the items's History :param id: the encoded id of the history item to update :param payload: a dictionary containing any or all the fields for HDA or HDCA with values different than the defaults :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) if contents_type == HistoryContentType.dataset: return self.__update_dataset(trans, history, id, payload, serialization_params) elif contents_type == HistoryContentType.dataset_collection: return self.__update_dataset_collection(trans, id, payload) else: raise exceptions.UnknownContentsType(f"Unknown contents type: {contents_type}")
[docs] def update_batch( self, trans, history_id: DecodedDatabaseIdField, payload: UpdateHistoryContentsBatchPayload, serialization_params: SerializationParams, ) -> List[AnyHistoryContentItem]: """ PUT /api/histories/{history_id}/contents :type history_id: str :param history_id: encoded id string of the history containing supplied items :type id: str :param id: the encoded id of the history to update :type payload: dict :param payload: a dictionary containing any or all the :rtype: dict :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) items = payload.items hda_ids: List[DecodedDatabaseIdField] = [] hdca_ids: List[DecodedDatabaseIdField] = [] for item in items: contents_type = item.history_content_type if contents_type == HistoryContentType.dataset: hda_ids.append(item.id) else: hdca_ids.append(item.id) payload_dict = payload.dict(exclude_unset=True) hdas = self.__datasets_for_update(trans, history, hda_ids, payload_dict) rval = [] for hda in hdas: self.__deserialize_dataset(trans, hda, payload_dict) serialization_params.default_view = "summary" rval.append( self.hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, **serialization_params.dict()) ) for hdca_id in hdca_ids: self.__update_dataset_collection(trans, hdca_id, payload.dict(exclude_defaults=True)) dataset_collection_instance = self.__get_accessible_collection(trans, hdca_id) rval.append(self.__collection_dict(trans, dataset_collection_instance, view="summary")) return rval
[docs] def bulk_operation( self, trans: ProvidesHistoryContext, history_id: DecodedDatabaseIdField, filter_query_params: ValueFilterQueryParams, payload: HistoryContentBulkOperationPayload, ) -> HistoryContentBulkOperationResult: history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) filters = self.history_contents_filters.parse_query_filters(filter_query_params) self._validate_bulk_operation_params(payload, trans.user, trans) contents: List[HistoryItemModel] if payload.items: contents = self._get_contents_by_item_list( trans, history, payload.items, ) else: contents = self.history_contents_manager.contents( history, filters, ) errors = self._apply_bulk_operation(contents, payload.operation, payload.params, trans) with transaction(trans.sa_session): trans.sa_session.commit() success_count = len(contents) - len(errors) return HistoryContentBulkOperationResult.construct(success_count=success_count, errors=errors)
[docs] def validate(self, trans, history_id: DecodedDatabaseIdField, history_content_id: DecodedDatabaseIdField): """ Validates the metadata associated with a dataset within a History. :type history_id: str :param history_id: encoded id string of the items's History :type id: str :param id: the encoded id of the history item to validate :rtype: dict :returns: TODO """ decoded_id = history_content_id history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) hda = self.hda_manager.get_owned_ids([decoded_id], history=history)[0] if hda: self.hda_manager.set_metadata(trans, hda, overwrite=True, validate=True) return {}
[docs] def delete( self, trans, id: DecodedDatabaseIdField, serialization_params: SerializationParams, contents_type: HistoryContentType, payload: DeleteHistoryContentPayload, ): """ Delete the history content with the given ``id`` and specified type (defaults to dataset) """ if contents_type == HistoryContentType.dataset: return self.__delete_dataset(trans, id, payload.purge, payload.stop_job, serialization_params) elif contents_type == HistoryContentType.dataset_collection: async_result = self.dataset_collection_manager.delete( trans, "history", id, recursive=payload.recursive, purge=payload.purge ) return {"id": self.encode_id(id), "deleted": True, "async_result": async_result is not None} else: raise exceptions.UnknownContentsType(f"Unknown contents type: {contents_type}")
[docs] def archive( self, trans, history_id: DecodedDatabaseIdField, filter_query_params: FilterQueryParams, filename: Optional[str] = "", dry_run: Optional[bool] = True, ) -> Union[HistoryContentsArchiveDryRunResult, ZipstreamWrapper]: """ Build and return a compressed archive of the selected history contents :type filename: string :param filename: (optional) archive name (defaults to history name) :type dry_run: boolean :param dry_run: (optional) if True, return the archive and file paths only as json and not an archive file :returns: archive file for download or json in `dry run` mode """ # roughly from: http://stackoverflow.com/a/31976060 (windows, linux) invalid_filename_char_regex = re.compile(r'[:<>|\\\/\?\* "]') # path format string - dot separator between id and name id_name_format = "{}.{}" def name_to_filename(name, max_length=150, replace_with="_"): # TODO: seems like shortening unicode with [:] would cause unpredictable display strings return invalid_filename_char_regex.sub(replace_with, name)[0:max_length] # given a set of parents for a dataset (HDCAs, DC, DCEs, etc.) - build a directory structure that # (roughly) recreates the nesting in the contents using the parent names and ids def build_path_from_parents(parents): parent_names = [] for parent in parents: # an HDCA if hasattr(parent, "hid"): name = name_to_filename(parent.name) parent_names.append(id_name_format.format(parent.hid, name)) # a DCE elif hasattr(parent, "element_index"): name = name_to_filename(parent.element_identifier) parent_names.append(id_name_format.format(parent.element_index, name)) # NOTE: DCs are skipped and use the wrapping DCE info instead return parent_names # get the history used for the contents query and check for accessibility history = self.history_manager.get_accessible(history_id, trans.user) archive_base_name = filename or name_to_filename(history.name) # this is the fn applied to each dataset contained in the query paths_and_files = [] def build_archive_files_and_paths(content, *parents): archive_path = archive_base_name if not self.hda_manager.is_accessible(content, trans.user): # if the underlying dataset is not accessible, skip it silently return content_container_id = content.hid content_name = name_to_filename(content.name) if parents: if hasattr(parents[0], "element_index"): # if content is directly wrapped in a DCE, strip it from parents (and the resulting path) # and instead replace the content id and name with the DCE index and identifier parent_dce, parents = parents[0], parents[1:] content_container_id = parent_dce.element_index content_name = name_to_filename(parent_dce.element_identifier) # reverse for path from parents: oldest parent first archive_path = os.path.join(archive_path, *build_path_from_parents(parents)[::-1]) # TODO: this is brute force - building the path each time instead of re-using it # possibly cache # add the name as the last element in the archive path content_id_and_name = id_name_format.format(content_container_id, content_name) archive_path = os.path.join(archive_path, content_id_and_name) # ---- for composite files, we use id and name for a directory and, inside that, ... if self.hda_manager.is_composite(content): # ...save the 'main' composite file (gen. html) paths_and_files.append((content.file_name, os.path.join(archive_path, f"{content.name}.html"))) for extra_file in self.hda_manager.extra_files(content): extra_file_basename = os.path.basename(extra_file) archive_extra_file_path = os.path.join(archive_path, extra_file_basename) # ...and one for each file in the composite paths_and_files.append((extra_file, archive_extra_file_path)) # ---- for single files, we add the true extension to id and name and store that single filename else: # some dataset names can contain their original file extensions, don't repeat if not archive_path.endswith(f".{content.extension}"): archive_path += f".{content.extension}" paths_and_files.append((content.file_name, archive_path)) # filter the contents that contain datasets using any filters possible from index above and map the datasets filters = self.history_contents_filters.parse_query_filters(filter_query_params) self.history_contents_manager.map_datasets(history, build_archive_files_and_paths, filters=filters) # if dry_run, return the structure as json for debugging if dry_run: return HistoryContentsArchiveDryRunResult.construct(__root__=paths_and_files) # create the archive, add the dataset files, then stream the archive as a download archive = ZipstreamWrapper( archive_name=archive_base_name, upstream_mod_zip=trans.app.config.upstream_mod_zip, upstream_gzip=trans.app.config.upstream_gzip, ) for file_path, archive_path in paths_and_files: archive.write(file_path, archive_path) return archive
def __delete_dataset( self, trans, id: DecodedDatabaseIdField, purge: bool, stop_job: bool, serialization_params: SerializationParams ): hda = self.hda_manager.get_owned(id, trans.user, current_history=trans.history) self.history_manager.error_unless_mutable(hda.history) self.hda_manager.error_if_uploading(hda) async_result = None if purge: async_result = self.hda_manager.purge(hda) else: self.hda_manager.delete(hda, stop_job=stop_job) serialization_params.default_view = "detailed" rval = self.hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, **serialization_params.dict()) rval["async_result"] = async_result is not None return rval def __update_dataset_collection(self, trans, id: DecodedDatabaseIdField, payload: Dict[str, Any]): return self.dataset_collection_manager.update(trans, "history", id, payload) def __update_dataset( self, trans, history: History, id: DecodedDatabaseIdField, payload: Dict[str, Any], serialization_params: SerializationParams, ): # anon user: ensure that history ids match up and the history is the current, # check for uploading, and use only the subset of attribute keys manipulatable by anon users hda = self.__datasets_for_update(trans, history, [id], payload)[0] if hda: self.__deserialize_dataset(trans, hda, payload) serialization_params.default_view = "detailed" return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params.dict() ) return {} def __datasets_for_update( self, trans, history: History, hda_ids: List[DecodedDatabaseIdField], payload: Dict[str, Any] ): anonymous_user = not trans.user_is_admin and trans.user is None if anonymous_user: anon_allowed_payload = {} if "deleted" in payload: anon_allowed_payload["deleted"] = payload["deleted"] if "visible" in payload: anon_allowed_payload["visible"] = payload["visible"] payload = anon_allowed_payload hdas = self.hda_manager.get_owned_ids(hda_ids, history=history) # only check_state if not deleting, otherwise cannot delete uploading files check_state = not payload.get("deleted", False) if check_state: for hda in hdas: hda = self.hda_manager.error_if_uploading(hda) return hdas def __deserialize_dataset(self, trans, hda, payload: Dict[str, Any]): # TODO: when used in batch it would be a lot faster if we set flush=false # and the caller flushes only at the end or when a given chunk size is reached. self.hda_deserializer.deserialize(hda, payload, user=trans.user, trans=trans, flush=True) # TODO: this should be an effect of deleting the hda if payload.get("deleted", False): self.hda_manager.stop_creating_job(hda, flush=True) def __index_legacy( self, trans, history_id: DecodedDatabaseIdField, legacy_params: LegacyHistoryContentsIndexParams, ) -> HistoryContentsResult: """Legacy implementation of the `index` action.""" history = self._get_history(trans, history_id) legacy_params_dict = legacy_params.dict(exclude_defaults=True) ids = legacy_params_dict.get("ids") if ids: legacy_params_dict["ids"] = self.decode_ids(ids) object_store_ids = None shareable = legacy_params.shareable if shareable is not None: object_store_ids = self.object_store.object_store_ids(private=not shareable) if object_store_ids: legacy_params_dict["object_store_ids"] = object_store_ids contents = history.contents_iter(**legacy_params_dict) items = [ self._serialize_legacy_content_item(trans, content, legacy_params_dict.get("dataset_details")) for content in contents ] return HistoryContentsResult.construct(__root__=items) def __index_v2( self, trans, history_id: DecodedDatabaseIdField, params: HistoryContentsIndexParams, serialization_params: SerializationParams, filter_query_params: FilterQueryParams, accept: str, ) -> Union[HistoryContentsResult, HistoryContentsWithStatsResult]: """ Latests implementation of the `index` action. Allows additional filtering of contents and custom serialization. """ history = self._get_history(trans, history_id) filters = self.history_contents_filters.parse_query_filters_with_relations(filter_query_params, history_id) stats_requested = accept == HistoryContentsWithStatsResult.__accept_type__ if stats_requested and self.history_contents_filters.contains_non_orm_filter(filters): non_orm_filter_keys = [*self.history_contents_filters.fn_filter_parsers] raise exceptions.RequestParameterInvalidException( f"Invalid filter found. When requesting stats, please avoid filtering by {non_orm_filter_keys}" ) serialization_params = self._handle_extra_serialization_for_media_type(serialization_params, accept) filter_query_params.order = filter_query_params.order or "hid-asc" order_by = self.build_order_by(self.history_contents_manager, filter_query_params.order) contents = self.history_contents_manager.contents( history, filters=filters, limit=filter_query_params.limit, offset=filter_query_params.offset, order_by=order_by, serialization_params=serialization_params, ) items = [ self._serialize_content_item( trans, content, dataset_details=params.dataset_details, serialization_params=serialization_params, ) for content in contents ] if stats_requested: total_matches = self.history_contents_manager.contents_count( history, filters=filters, ) stats = HistoryContentStats.construct(total_matches=total_matches) return HistoryContentsWithStatsResult.construct(contents=items, stats=stats) return HistoryContentsResult.construct(__root__=items) def _handle_extra_serialization_for_media_type( self, serialization_params: SerializationParams, request_media_type: str, ) -> SerializationParams: """According to the requested media type the response may include extra information.""" if request_media_type == HistoryContentsWithStatsResult.__accept_type__: if not serialization_params.keys: serialization_params.keys = [] serialization_params.keys.append("elements_datatypes") return serialization_params def _serialize_legacy_content_item( self, trans, content, dataset_details: Optional[DatasetDetailsType] = None, ): encoded_content_id = content.id detailed = dataset_details and (dataset_details == "all" or (encoded_content_id in dataset_details)) if isinstance(content, HistoryDatasetAssociation): view = "detailed" if detailed else "summary" return self.hda_serializer.serialize_to_view(content, view=view, user=trans.user, trans=trans) elif isinstance(content, HistoryDatasetCollectionAssociation): view = "element" if detailed else "collection" return self.__collection_dict(trans, content, view=view) def _serialize_content_item( self, trans, content, dataset_details: Optional[DatasetDetailsType], serialization_params: SerializationParams, default_view: str = "summary", ): """ Returns a dictionary with the appropriate values depending on the serialization parameters provided. """ serialization_params_dict = serialization_params.dict() view = serialization_params_dict.pop("view", default_view) or default_view serializer: Optional[ModelSerializer] = None if isinstance(content, HistoryDatasetAssociation): serializer = self.hda_serializer if dataset_details and (dataset_details == "all" or content.id in dataset_details): view = "detailed" elif isinstance(content, HistoryDatasetCollectionAssociation): serializer = self.hdca_serializer if serializer is None: raise exceptions.UnknownContentsType(f"Unknown contents type: {content.content_type}") rval = serializer.serialize_to_view( content, user=trans.user, trans=trans, view=view, **serialization_params_dict ) # Override URL generation to use UrlBuilder if trans.url_builder: if rval.get("url"): rval["url"] = trans.url_builder( "history_content_typed", history_id=rval["history_id"], id=rval["id"], type=rval["history_content_type"], ) if rval.get("contents_url"): rval["contents_url"] = trans.url_builder( "contents_dataset_collection", hdca_id=rval["id"], parent_id=self.encode_id(content.collection_id) ) return rval def __collection_dict(self, trans, dataset_collection_instance, **kwds): return dictify_dataset_collection_instance( dataset_collection_instance, security=trans.security, url_builder=trans.url_builder, parent=dataset_collection_instance.history, **kwds, ) def _get_history(self, trans, history_id: DecodedDatabaseIdField) -> History: """Retrieves the History with the given ID or raises an error if the current user cannot access it.""" history = self.history_manager.get_accessible(history_id, trans.user, current_history=trans.history) return history def __show_dataset( self, trans, id: DecodedDatabaseIdField, serialization_params: SerializationParams, ): serialization_params.default_view = "detailed" hda = self.hda_manager.get_accessible(id, trans.user) return self.hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, **serialization_params.dict()) def __show_dataset_collection( self, trans, id: DecodedDatabaseIdField, serialization_params: SerializationParams, fuzzy_count: Optional[int] = None, ): dataset_collection_instance = self.__get_accessible_collection(trans, id) view = serialization_params.view or "element" return self.__collection_dict(trans, dataset_collection_instance, view=view, fuzzy_count=fuzzy_count) def __get_accessible_collection(self, trans, id: DecodedDatabaseIdField): return self.dataset_collection_manager.get_dataset_collection_instance( trans=trans, instance_type="history", id=id, ) def __create_datasets_from_library_folder( self, trans, history: History, payload: CreateHistoryContentPayloadFromCopy, serialization_params: SerializationParams, ): rval = [] source = payload.source if source == HistoryContentSource.library_folder: content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of lda or hda is missing") folder_id = content folder = self.folder_manager.get(trans, folder_id) current_user_roles = trans.get_current_user_roles() security_agent: GalaxyRBACAgent = trans.app.security_agent def traverse(folder): admin = trans.user_is_admin rval = [] for subfolder in folder.active_folders: if not admin: can_access, folder_ids = security_agent.check_folder_contents( trans.user, current_user_roles, subfolder ) if (admin or can_access) and not subfolder.deleted: rval.extend(traverse(subfolder)) for ld in folder.datasets: if not admin: can_access = security_agent.can_access_dataset( current_user_roles, ld.library_dataset_dataset_association.dataset ) if (admin or can_access) and not ld.deleted: rval.append(ld) return rval for ld in traverse(folder): hda = ld.library_dataset_dataset_association.to_history_dataset_association( history, add_to_history=True ) hda_dict = self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params.dict() ) rval.append(hda_dict) else: message = f"Invalid 'source' parameter in request: {source}" raise exceptions.RequestParameterInvalidException(message) with transaction(trans.sa_session): trans.sa_session.commit() return rval def __create_dataset( self, trans, history: History, payload: CreateHistoryContentPayloadFromCopy, serialization_params: SerializationParams, ): source = payload.source if source not in (HistoryContentSource.library, HistoryContentSource.hda): raise exceptions.RequestParameterInvalidException(f"'source' must be either 'library' or 'hda': {source}") content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of lda or hda is missing") hda = None if source == HistoryContentSource.library: hda = self.__create_hda_from_ldda(trans, history, content) elif source == HistoryContentSource.hda: hda = self.__create_hda_from_copy(trans, history, content) if hda is None: return None with transaction(trans.sa_session): trans.sa_session.commit() return self.hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, **serialization_params.dict()) def __create_hda_from_ldda(self, trans, history: History, ldda_id: int): decoded_ldda_id = ldda_id ld = self.ldda_manager.get(trans, decoded_ldda_id) if type(ld) is not LibraryDataset: raise exceptions.RequestParameterInvalidException("Library content id is not a dataset") hda = ld.library_dataset_dataset_association.to_history_dataset_association(history, add_to_history=True) return hda def __create_hda_from_copy(self, trans, history: History, original_hda_id: int): original = self.hda_manager.get_accessible(original_hda_id, trans.user) # check for access on history that contains the original hda as well self.history_manager.error_unless_accessible(original.history, trans.user, current_history=trans.history) hda = self.hda_manager.copy(original, history=history) return hda def __create_dataset_collection( self, trans, history: History, payload: CreateHistoryContentPayloadFromCollection, serialization_params: SerializationParams, ): """Create hdca in a history from the list of element identifiers :param history: history the new hdca should be added to :param source: whether to create a new collection or copy existing one :type source: str :param payload: dictionary structure containing: :param collection_type: type (and depth) of the new collection :type name: str :param element_identifiers: list of elements that should be in the new collection :param element: one member of the collection :param name: name of the element :type name: str :param src: source of the element (hda/ldda) :type src: str :param id: identifier :type id: str :param id: tags :type id: list :type element: dict :type name: list :param name: name of the collection :type name: str :param hide_source_items: whether to mark the original hdas as hidden :type name: bool :param copy_elements: whether to copy HDAs when creating collection :type name: bool :type payload: dict .. note:: Elements may be nested depending on the collection_type :returns: dataset collection information :rtype: dict :raises: RequestParameterInvalidException, RequestParameterMissingException """ source = payload.source or HistoryContentSource.new_collection dataset_collection_manager = self.dataset_collection_manager if source == HistoryContentSource.new_collection: create_params = api_payload_to_create_params(payload.dict()) dataset_collection_instance = dataset_collection_manager.create( trans, parent=history, history=history, **create_params ) elif source == HistoryContentSource.hdca: content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of target to copy is missing") dbkey = payload.dbkey copy_required = dbkey is not None copy_elements = payload.copy_elements or copy_required if copy_required and not copy_elements: raise exceptions.RequestParameterMissingException( "copy_elements passed as 'false' but it is required to change specified attributes" ) dataset_instance_attributes = {} if dbkey is not None: dataset_instance_attributes["dbkey"] = dbkey dataset_collection_instance = dataset_collection_manager.copy( trans=trans, parent=history, source=source, encoded_source_id=content, copy_elements=copy_elements, dataset_instance_attributes=dataset_instance_attributes, ) else: message = f"Invalid 'source' parameter in request: {source}" raise exceptions.RequestParameterInvalidException(message) # if the consumer specified keys or view, use the secondary serializer if serialization_params.view or serialization_params.keys: return self.hdca_serializer.serialize_to_view( dataset_collection_instance, user=trans.user, trans=trans, **serialization_params.dict() ) return self.__collection_dict(trans, dataset_collection_instance, view="element") def _validate_bulk_operation_params( self, payload: HistoryContentBulkOperationPayload, user: User, trans: ProvidesHistoryContext ): if payload.operation == HistoryContentItemOperation.change_dbkey: dbkey = cast(ChangeDbkeyOperationParams, payload.params).dbkey if not self.genomes_manager.is_registered_dbkey(dbkey, user): raise exceptions.RequestParameterInvalidException(f"Database/build '{dbkey}' is not registered") if payload.operation == HistoryContentItemOperation.change_datatype: ensure_celery_tasks_enabled(trans.app.config) datatype = cast(ChangeDatatypeOperationParams, payload.params).datatype existing_datatype = trans.app.datatypes_registry.get_datatype_by_extension(datatype) if not existing_datatype and not datatype == "auto": raise exceptions.RequestParameterInvalidException(f"Data type '{datatype}' is not registered") def _apply_bulk_operation( self, contents: Iterable[HistoryItemModel], operation: HistoryContentItemOperation, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, ) -> List[BulkOperationItemError]: errors: List[BulkOperationItemError] = [] for item in contents: error = self._apply_operation_to_item(operation, item, params, trans) if error: errors.append(error) return errors def _apply_operation_to_item( self, operation: HistoryContentItemOperation, item: HistoryItemModel, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, ) -> Optional[BulkOperationItemError]: try: self.item_operator.apply(operation, item, params, trans) return None except BaseException as exc: return BulkOperationItemError( item=EncodedHistoryContentItem(id=item.id, history_content_type=item.history_content_type), error=str(exc), ) def _get_contents_by_item_list( self, trans, history: History, items: List[HistoryContentItem] ) -> List[HistoryItemModel]: contents: List[HistoryItemModel] = [] dataset_items = filter(lambda item: item.history_content_type == HistoryContentType.dataset, items) datasets_ids = map(lambda dataset: dataset.id, dataset_items) contents.extend(self.hda_manager.get_owned_ids(datasets_ids, history)) collection_items = filter( lambda item: item.history_content_type == HistoryContentType.dataset_collection, items ) collections = [ self.dataset_collection_manager.get_dataset_collection_instance( trans, instance_type="history", id=collection_item.id, check_ownership=True ) for collection_item in collection_items ] contents.extend(collections) return contents
[docs]class ItemOperation(Protocol): def __call__( self, item: HistoryItemModel, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext ) -> None: ...
[docs]class HistoryItemOperator: """Defines operations on history items."""
[docs] def __init__( self, hda_manager: hdas.HDAManager, hdca_manager: hdcas.HDCAManager, dataset_collection_manager: DatasetCollectionManager, ): self.hda_manager = hda_manager self.hdca_manager = hdca_manager self.dataset_collection_manager = dataset_collection_manager self.flush = False self._operation_map: Dict[HistoryContentItemOperation, ItemOperation] = { HistoryContentItemOperation.hide: lambda item, params, trans: self._hide(item), HistoryContentItemOperation.unhide: lambda item, params, trans: self._unhide(item), HistoryContentItemOperation.delete: lambda item, params, trans: self._delete(item, trans), HistoryContentItemOperation.undelete: lambda item, params, trans: self._undelete(item), HistoryContentItemOperation.purge: lambda item, params, trans: self._purge(item, trans), HistoryContentItemOperation.change_datatype: lambda item, params, trans: self._change_datatype( item, params, trans ), HistoryContentItemOperation.change_dbkey: lambda item, params, trans: self._change_dbkey(item, params), HistoryContentItemOperation.add_tags: lambda item, params, trans: self._add_tags(trans, item, params), HistoryContentItemOperation.remove_tags: lambda item, params, trans: self._remove_tags(trans, item, params), }
[docs] def apply( self, operation: HistoryContentItemOperation, item: HistoryItemModel, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, ): self._operation_map[operation](item, params, trans)
def _get_item_manager(self, item: HistoryItemModel): if isinstance(item, HistoryDatasetAssociation): return self.hda_manager return self.hdca_manager def _hide(self, item: HistoryItemModel): item.visible = False def _unhide(self, item: HistoryItemModel): item.visible = True def _delete(self, item: HistoryItemModel, trans: ProvidesHistoryContext): if isinstance(item, HistoryDatasetCollectionAssociation): self.dataset_collection_manager.delete(trans, "history", item.id, recursive=True, purge=False) else: self.hda_manager.delete(item, flush=self.flush) # In the edge case where all selected items are already deleted we need to force an update # otherwise the history will wait indefinitely for the items to be deleted item.update() def _undelete(self, item: HistoryItemModel): if getattr(item, "purged", False): raise exceptions.ItemDeletionException("This item has been permanently deleted and cannot be recovered.") manager = self._get_item_manager(item) manager.undelete(item, flush=self.flush) def _purge(self, item: HistoryItemModel, trans: ProvidesHistoryContext): if getattr(item, "purged", False): # TODO: remove this `update` when we can properly track the operation results to notify the history item.update() return if isinstance(item, HistoryDatasetCollectionAssociation): return self.dataset_collection_manager.delete(trans, "history", item.id, recursive=True, purge=True) self.hda_manager.purge(item, flush=True) def _change_datatype( self, item: HistoryItemModel, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext ): if isinstance(item, HistoryDatasetAssociation): wrapped_task = self._change_item_datatype(item, params, trans) with transaction(trans.sa_session): trans.sa_session.commit() if wrapped_task: wrapped_task.delay() elif isinstance(item, HistoryDatasetCollectionAssociation): wrapped_tasks = [] for dataset_instance in item.dataset_instances: wrapped_task = self._change_item_datatype(dataset_instance, params, trans) if wrapped_task: wrapped_tasks.append(wrapped_task) with transaction(trans.sa_session): trans.sa_session.commit() # chain these for sequential execution. chord would be nice, but requires a non-RPC backend. chain(*wrapped_tasks, touch.si(item_id=item.id, model_class="HistoryDatasetCollectionAssociation")).delay() def _change_item_datatype( self, item: HistoryDatasetAssociation, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext ): self.hda_manager.ensure_can_change_datatype(item) self.hda_manager.ensure_can_set_metadata(item) is_deferred = item.has_deferred_data item.dataset.state = item.dataset.states.SETTING_METADATA if is_deferred: if params.datatype == "auto": # if `auto` just keep the original guessed datatype item.update() # TODO: remove this `update` when we can properly track the operation results to notify the history else: trans.app.datatypes_registry.change_datatype(item, params.datatype) item.dataset.state = item.dataset.states.DEFERRED else: return change_datatype.si(dataset_id=item.id, datatype=params.datatype) def _change_dbkey(self, item: HistoryItemModel, params: ChangeDbkeyOperationParams): if isinstance(item, HistoryDatasetAssociation): item.set_dbkey(params.dbkey) elif isinstance(item, HistoryDatasetCollectionAssociation): for dataset_instance in item.dataset_instances: dataset_instance.set_dbkey(params.dbkey) def _add_tags(self, trans: ProvidesUserContext, item: HistoryItemModel, params: TagOperationParams): trans.tag_handler.add_tags_from_list(trans.user, item, params.tags, flush=self.flush) def _remove_tags(self, trans: ProvidesUserContext, item: HistoryItemModel, params: TagOperationParams): trans.tag_handler.remove_tags_from_list(trans.user, item, params.tags, flush=self.flush)