Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.history_contents

"""
API operations on the contents of a history.
"""
import datetime
import json
import logging
import os
import re
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Set,
    Union,
)

import dateutil.parser
from pydantic import Extra
from typing_extensions import Literal

from galaxy import (
    exceptions,
    util
)
from galaxy.managers import (
    folders,
    hdas,
    hdcas,
    histories,
    history_contents
)
from galaxy.managers.base import (
    ModelSerializer,
    ServiceBase,
)
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.collections_util import (
    api_payload_to_create_params,
    dictify_dataset_collection_instance,
)
from galaxy.managers.jobs import fetch_job_states, summarize_jobs_to_dict
from galaxy.managers.library_datasets import LibraryDatasetsManager
from galaxy.model import (
    History,
    HistoryDatasetAssociation,
    HistoryDatasetCollectionAssociation,
    LibraryDataset,
)
from galaxy.model.security import GalaxyRBACAgent
from galaxy.schema import FilterQueryParams
from galaxy.schema.fields import (
    EncodedDatabaseIdField,
    Field,
    OrderParamField,
)
from galaxy.schema.schema import (
    ColletionSourceType,
    DatasetAssociationRoles,
    DatasetPermissionAction,
    DeleteHDCAResult,
    HDABeta,
    HDADetailed,
    HDASummary,
    HDCABeta,
    HDCADetailed,
    HDCASummary,
    HistoryContentSource,
    HistoryContentType,
    ImplicitCollectionJobsStateSummary,
    JobSourceType,
    JobStateSummary,
    Model,
    UpdateDatasetPermissionsPayload,
    UpdateHistoryContentsBatchPayload,
    WorkflowInvocationStateSummary,
)
from galaxy.schema.types import SerializationParams
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util.json import safe_dumps
from galaxy.util.zipstream import ZipstreamWrapper
from galaxy.web import (
    expose_api,
    expose_api_anonymous,
    expose_api_raw,
    expose_api_raw_anonymous
)
from galaxy.webapps.base.controller import (
    UsesLibraryMixinItems,
    UsesTagsMixin
)
from galaxy.webapps.galaxy.api.common import parse_serialization_params
from . import BaseGalaxyAPIController, depends

log = logging.getLogger(__name__)

DatasetDetailsType = Union[Set[EncodedDatabaseIdField], Literal['all']]

AnyHDA = Union[HDASummary, HDADetailed, HDABeta]
AnyHistoryContentItem = Union[AnyHDA, HDCASummary, HDCADetailed, HDCABeta]

AnyJobStateSummary = Union[
    JobStateSummary,
    ImplicitCollectionJobsStateSummary,
    WorkflowInvocationStateSummary,
]


[docs]class HistoryContentsFilterQueryParams(FilterQueryParams): order: Optional[str] = OrderParamField(default_order="hid-asc")
HistoryContentFilter = List[Any] # Lists with [attribute:str, operator:str, value:Any] HistoryContentsFilterList = List[HistoryContentFilter]
[docs]class HistoryContentsIndexLegacyParamsBase(Model): deleted: Optional[bool] = Field( default=None, title="Deleted", description="Whether to return deleted or undeleted datasets only. Leave unset for both.", ) visible: Optional[bool] = Field( default=None, title="Visible", description="Whether to return visible or hidden datasets only. Leave unset for both.", )
[docs]class HistoryContentsIndexLegacyParams(HistoryContentsIndexLegacyParamsBase): v: Optional[str] = Field( # Should this be deprecated at some point and directly use the latest version by default? default=None, title="Version", description="Only `dev` value is allowed. Set it to use the latest version of this endpoint.", ) ids: Optional[str] = Field( default=None, title="IDs", description=( "A comma separated list of encoded `HDA` IDs. If this list is provided, only information about the " "specific datasets will be provided. Also, setting this value will set `dataset_details` to `all`." ), ) types: Optional[Union[List[HistoryContentType], str]] = Field( default=None, alias="type", # Legacy alias title="Types", description=( "A list or comma separated list of kinds of contents to return " "(currently just `dataset` and `dataset_collection` are available)." ), ) dataset_details: Optional[str] = Field( default=None, alias="details", # Legacy alias title="Dataset Details", description=( "A comma separated list of encoded dataset IDs that will return additional (full) details " "instead of the *summary* default information." ), )
[docs]class ParsedHistoryContentsLegacyParams(HistoryContentsIndexLegacyParamsBase): ids: Optional[List[int]] = Field( default=None, title="IDs", description="A list of (decoded) `HDA` IDs to return detailed information. Only these items will be returned.", ) types: List[HistoryContentType] = Field( default=[], title="Types", description="A list with the types of contents to return.", ) dataset_details: Optional[DatasetDetailsType] = Field( default=None, title="Dataset Details", description=( "A set of encoded IDs for the datasets that will be returned with detailed " "information or the value `all` to return (full) details for all datasets." ), )
[docs]class CreateHistoryContentPayloadBase(Model): type: Optional[HistoryContentType] = Field( HistoryContentType.dataset, title="Type", description="The type of content to be created in the history.", )
[docs]class CreateHistoryContentPayloadFromCopy(CreateHistoryContentPayloadBase): source: Optional[HistoryContentSource] = Field( None, title="Source", description="The source of the content. Can be other history element to be copied or library elements.", ) content: Optional[EncodedDatabaseIdField] = Field( None, title="Content", description=( "Depending on the `source` it can be:\n" "- The encoded id from the library dataset\n" "- The encoded id from the library folder\n" "- The encoded id from the HDA\n" "- The encoded id from the HDCA\n" ), )
[docs]class CollectionElementIdentifier(Model): name: Optional[str] = Field( None, title="Name", description="The name of the element.", ) src: ColletionSourceType = Field( ..., title="Source", description="The source of the element.", ) id: EncodedDatabaseIdField = Field( ..., title="ID", description="The encoded ID of the element.", ) tags: List[str] = Field( default=[], title="Tags", description="The list of tags associated with the element.", )
[docs]class CreateNewCollectionPayload(Model): collection_type: str = Field( ..., title="Collection Type", description="The type of the collection. For example, `list`, `paired`, `list:paired`.", ) element_identifiers: List[CollectionElementIdentifier] = Field( ..., title="Element Identifiers", description="List of elements that should be in the new collection.", ) name: Optional[str] = Field( default=None, title="Name", description="The name of the new collection.", ) hide_source_items: Optional[bool] = Field( default=False, title="Hide Source Items", description="Whether to mark the original HDAs as hidden.", )
[docs]class CreateHistoryContentPayloadFromCollection(CreateHistoryContentPayloadFromCopy): dbkey: Optional[str] = Field( default=None, title="DBKey", description="TODO", ) copy_elements: Optional[bool] = Field( default=False, title="Copy Elements", description=( "If the source is a collection, whether to copy child HDAs into the target " "history as well, defaults to False but this is less than ideal and may " "be changed in future releases." ), )
[docs]class CreateHistoryContentPayload(CreateHistoryContentPayloadFromCollection):
[docs] class Config: extra = Extra.allow
[docs]class HistoriesContentsService(ServiceBase): """Common interface/service logic for interactions with histories contents in the context of the API. Provides the logic of the actions invoked by API controllers and uses type definitions and pydantic models to declare its parameters and return types. """
[docs] def __init__( self, security: IdEncodingHelper, history_manager: histories.HistoryManager, history_contents_manager: history_contents.HistoryContentsManager, hda_manager: hdas.HDAManager, dataset_collection_manager: DatasetCollectionManager, ldda_manager: LibraryDatasetsManager, folder_manager: folders.FolderManager, hda_serializer: hdas.HDASerializer, hda_deserializer: hdas.HDADeserializer, hdca_serializer: hdcas.HDCASerializer, history_contents_filters: history_contents.HistoryContentsFilters, ): super().__init__(security) self.history_manager = history_manager self.history_contents_manager = history_contents_manager self.hda_manager = hda_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager self.folder_manager = folder_manager self.hda_serializer = hda_serializer self.hda_deserializer = hda_deserializer self.hdca_serializer = hdca_serializer self.history_contents_filters = history_contents_filters
[docs] def index( self, trans, history_id: EncodedDatabaseIdField, legacy_params: HistoryContentsIndexLegacyParams, serialization_params: SerializationParams, filter_query_params: HistoryContentsFilterQueryParams, ) -> List[AnyHistoryContentItem]: """ Return a list of contents (HDAs and HDCAs) for the history with the given ``id`` .. note:: Anonymous users are allowed to get their current history contents """ if legacy_params.v == 'dev': return self.__index_v2( trans, history_id, legacy_params, serialization_params, filter_query_params ) return self.__index_legacy(trans, history_id, legacy_params)
[docs] def show( self, trans, id: EncodedDatabaseIdField, serialization_params: SerializationParams, contents_type: HistoryContentType = HistoryContentType.dataset, fuzzy_count: Optional[int] = None, ) -> AnyHistoryContentItem: """ Return detailed information about an HDA or HDCA within a history .. note:: Anonymous users are allowed to get their current history contents :param id: the encoded id of the HDA or HDCA to return :param contents_type: 'dataset' or 'dataset_collection' :param serialization_params.view: if fetching a dataset collection - the view style of the dataset collection to produce. 'collection' returns no element information, 'element' returns detailed element information for all datasets, 'element-reference' returns a minimal set of information about datasets (for instance id, type, and state but not metadata, peek, info, or name). The default is 'element'. :param fuzzy_count: this value can be used to broadly restrict the magnitude of the number of elements returned via the API for large collections. The number of actual elements returned may be "a bit" more than this number or "a lot" less - varying on the depth of nesting, balance of nesting at each level, and size of target collection. The consumer of this API should not expect a stable number or pre-calculable number of elements to be produced given this parameter - the only promise is that this API will not respond with an order of magnitude more elements estimated with this value. The UI uses this parameter to fetch a "balanced" concept of the "start" of large collections at every depth of the collection. :returns: dictionary containing detailed HDA or HDCA information """ if contents_type == HistoryContentType.dataset: return self.__show_dataset(trans, id, serialization_params) elif contents_type == HistoryContentType.dataset_collection: return self.__show_dataset_collection(trans, id, serialization_params, fuzzy_count) raise exceptions.UnknownContentsType(f'Unknown contents type: {contents_type}')
[docs] def index_jobs_summary( self, trans, ids: List[EncodedDatabaseIdField], types: List[JobSourceType], ) -> List[AnyJobStateSummary]: """ Return job state summary info for jobs, implicit groups jobs for collections or workflow invocations Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. :param ids: the encoded ids of job summary objects to return - if ids is specified types must also be specified and have same length. :param types: type of object represented by elements in the ids array - any of Job, ImplicitCollectionJob, or WorkflowInvocation. :returns: an array of job summary object dictionaries. """ if len(ids) != len(types): raise exceptions.RequestParameterInvalidException( f"The number of ids ({len(ids)}) and types ({len(types)}) must match." ) decoded_ids = self.decode_ids(ids) return [ self.encode_all_ids(job_state) for job_state in fetch_job_states(trans.sa_session, decoded_ids, types) ]
[docs] def show_jobs_summary( self, trans, id: EncodedDatabaseIdField, contents_type: HistoryContentType = HistoryContentType.dataset, ) -> AnyJobStateSummary: """ Return detailed information about an HDA or HDCAs jobs Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. :param id: the encoded id of the HDA or HDCA to return :param contents_type: 'dataset' or 'dataset_collection' :returns: dictionary containing jobs summary object """ # At most one of job or implicit_collection_jobs should be found. job = None implicit_collection_jobs = None if contents_type == HistoryContentType.dataset: hda = self.hda_manager.get_accessible(self.decode_id(id), trans.user) job = hda.creating_job elif contents_type == HistoryContentType.dataset_collection: dataset_collection_instance = self.__get_accessible_collection(trans, id) job_source_type = dataset_collection_instance.job_source_type if job_source_type == JobSourceType.Job: job = dataset_collection_instance.job elif job_source_type == JobSourceType.ImplicitCollectionJobs: implicit_collection_jobs = dataset_collection_instance.implicit_collection_jobs assert job is None or implicit_collection_jobs is None return self.encode_all_ids(summarize_jobs_to_dict(trans.sa_session, job or implicit_collection_jobs))
[docs] def download_dataset_collection(self, trans, id: EncodedDatabaseIdField): """ Download the content of a HistoryDatasetCollection as a tgz archive while maintaining approximate collection structure. :param id: encoded HistoryDatasetCollectionAssociation (HDCA) id """ try: dataset_collection_instance = self.__get_accessible_collection(trans, id) return self.__stream_dataset_collection(trans, dataset_collection_instance) except Exception as e: log.exception("Error in API while creating dataset collection archive") trans.response.status = 500 return {'error': util.unicodify(e)}
def __stream_dataset_collection(self, trans, dataset_collection_instance): archive = hdcas.stream_dataset_collection(dataset_collection_instance=dataset_collection_instance, upstream_mod_zip=trans.app.config.upstream_mod_zip) trans.response.headers.update(archive.get_headers()) return archive.response()
[docs] def create( self, trans, history_id: EncodedDatabaseIdField, payload: CreateHistoryContentPayload, serialization_params: SerializationParams, ) -> AnyHistoryContentItem: """ Create a new HDA or HDCA. ..note: Currently, a user can only copy an HDA from a history that the user owns. """ history = self.history_manager.get_owned( self.decode_id(history_id), trans.user, current_history=trans.history ) type = payload.type if type == HistoryContentType.dataset: source = payload.source if source == HistoryContentSource.library_folder: return self.__create_datasets_from_library_folder(trans, history, payload, serialization_params) else: return self.__create_dataset(trans, history, payload, serialization_params) elif type == HistoryContentType.dataset_collection: return self.__create_dataset_collection(trans, history, payload, serialization_params) raise exceptions.UnknownContentsType(f'Unknown contents type: {payload.type}')
[docs] def update_permissions( self, trans, history_content_id: EncodedDatabaseIdField, payload: UpdateDatasetPermissionsPayload, ) -> DatasetAssociationRoles: """ Set permissions of the given dataset to the given role ids. :param payload: dictionary structure containing: :param action: (required) describes what action should be performed available actions: make_private, remove_restrictions, set_permissions :type action: string :param access_ids[]: list of Role.id defining roles that should have access permission on the dataset :type access_ids[]: string or list :param manage_ids[]: list of Role.id defining roles that should have manage permission on the dataset :type manage_ids[]: string or list :param modify_ids[]: list of Role.id defining roles that should have modify permission on the library dataset item :type modify_ids[]: string or list :raises: RequestParameterInvalidException, ObjectNotFound, InsufficientPermissionsException, InternalServerError RequestParameterMissingException """ payload_dict = payload.dict(by_alias=True) hda = self.hda_manager.get_owned(self.decode_id(history_content_id), trans.user, current_history=trans.history, trans=trans) assert hda is not None self.hda_manager.update_permissions(trans, hda, **payload_dict) roles = self.hda_manager.serialize_dataset_association_roles(trans, hda) return DatasetAssociationRoles.parse_obj(roles)
[docs] def update( self, trans, history_id: EncodedDatabaseIdField, id: EncodedDatabaseIdField, payload: Dict[str, Any], serialization_params: SerializationParams, contents_type: HistoryContentType = HistoryContentType.dataset, ): """ Updates the values for the history content item with the given ``id`` :param history_id: encoded id string of the items's History :param id: the encoded id of the history item to update :param payload: a dictionary containing any or all the fields for HDA or HDCA with values different than the defaults :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ if contents_type == HistoryContentType.dataset: return self.__update_dataset(trans, history_id, id, payload, serialization_params) elif contents_type == HistoryContentType.dataset_collection: return self.__update_dataset_collection(trans, id, payload) else: raise exceptions.UnknownContentsType(f'Unknown contents type: {contents_type}')
[docs] def update_batch( self, trans, history_id: EncodedDatabaseIdField, payload: UpdateHistoryContentsBatchPayload, serialization_params: SerializationParams, ): """ PUT /api/histories/{history_id}/contents :type history_id: str :param history_id: encoded id string of the history containing supplied items :type id: str :param id: the encoded id of the history to update :type payload: dict :param payload: a dictionary containing any or all the :rtype: dict :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ history = self.history_manager.get_owned( self.decode_id(history_id), trans.user, current_history=trans.history ) items = payload.items hda_ids = [] hdca_ids = [] for item in items: contents_type = item.history_content_type if contents_type == HistoryContentType.dataset: decoded_id = self.decode_id(item.id) hda_ids.append(decoded_id) else: hdca_ids.append(item.id) payload_dict = payload.dict(exclude_unset=True) hdas = self.__datasets_for_update(trans, history, hda_ids, payload_dict) rval = [] for hda in hdas: self.__deserialize_dataset(trans, hda, payload_dict) serialization_params["default_view"] = "summary" rval.append(self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params )) for hdca_id in hdca_ids: self.__update_dataset_collection(trans, hdca_id, payload.dict(exclude_defaults=True)) dataset_collection_instance = self.__get_accessible_collection(trans, hdca_id) rval.append(self.__collection_dict(trans, dataset_collection_instance, view="summary")) return rval
[docs] def validate( self, trans, history_id: EncodedDatabaseIdField, history_content_id: EncodedDatabaseIdField ): """ Updates the values for the history content item with the given ``id`` :type history_id: str :param history_id: encoded id string of the items's History :type id: str :param id: the encoded id of the history item to validate :rtype: dict :returns: TODO """ decoded_id = self.decode_id(history_content_id) history = self.history_manager.get_owned( self.decode_id(history_id), trans.user, current_history=trans.history ) hda = self.hda_manager.get_owned_ids([decoded_id], history=history)[0] if hda: self.hda_manager.set_metadata(trans, hda, overwrite=True, validate=True) return {}
[docs] def delete( self, trans, id, serialization_params: SerializationParams, contents_type: HistoryContentType = HistoryContentType.dataset, purge: bool = False, recursive: bool = False, ) -> Union[AnyHDA, DeleteHDCAResult]: """ Delete the history content with the given ``id`` and specified type (defaults to dataset) .. note:: Currently does not stop any active jobs for which this dataset is an output. :param id: the encoded id of the history item to delete :type recursive: bool :param recursive: if True, and deleted an HDCA also delete containing HDAs :type purge: bool :param purge: if True, purge the target HDA or child HDAs of the target HDCA :rtype: dict :returns: an error object if an error occurred or a dictionary containing: * id: the encoded id of the history, * deleted: if the history content was marked as deleted, * purged: if the history content was purged """ if contents_type == HistoryContentType.dataset: return self.__delete_dataset(trans, id, purge, serialization_params) elif contents_type == HistoryContentType.dataset_collection: self.dataset_collection_manager.delete(trans, "history", id, recursive=recursive, purge=purge) return DeleteHDCAResult(id=id, deleted=True) else: raise exceptions.UnknownContentsType(f'Unknown contents type: {contents_type}')
[docs] def archive( self, trans, history_id: EncodedDatabaseIdField, filter_query_params: HistoryContentsFilterQueryParams, filename: str = '', dry_run: bool = True, ): """ Build and return a compressed archive of the selected history contents :type filename: string :param filename: (optional) archive name (defaults to history name) :type dry_run: boolean :param dry_run: (optional) if True, return the archive and file paths only as json and not an archive file :returns: archive file for download or json in `dry run` mode """ # roughly from: http://stackoverflow.com/a/31976060 (windows, linux) invalid_filename_char_regex = re.compile(r'[:<>|\\\/\?\* "]') # path format string - dot separator between id and name id_name_format = '{}.{}' def name_to_filename(name, max_length=150, replace_with='_'): # TODO: seems like shortening unicode with [:] would cause unpredictable display strings return invalid_filename_char_regex.sub(replace_with, name)[0:max_length] # given a set of parents for a dataset (HDCAs, DC, DCEs, etc.) - build a directory structure that # (roughly) recreates the nesting in the contents using the parent names and ids def build_path_from_parents(parents): parent_names = [] for parent in parents: # an HDCA if hasattr(parent, 'hid'): name = name_to_filename(parent.name) parent_names.append(id_name_format.format(parent.hid, name)) # a DCE elif hasattr(parent, 'element_index'): name = name_to_filename(parent.element_identifier) parent_names.append(id_name_format.format(parent.element_index, name)) # NOTE: DCs are skipped and use the wrapping DCE info instead return parent_names # get the history used for the contents query and check for accessibility history = self.history_manager.get_accessible(trans.security.decode_id(history_id), trans.user) archive_base_name = filename or name_to_filename(history.name) # this is the fn applied to each dataset contained in the query paths_and_files = [] def build_archive_files_and_paths(content, *parents): archive_path = archive_base_name if not self.hda_manager.is_accessible(content, trans.user): # if the underlying dataset is not accessible, skip it silently return content_container_id = content.hid content_name = name_to_filename(content.name) if parents: if hasattr(parents[0], 'element_index'): # if content is directly wrapped in a DCE, strip it from parents (and the resulting path) # and instead replace the content id and name with the DCE index and identifier parent_dce, parents = parents[0], parents[1:] content_container_id = parent_dce.element_index content_name = name_to_filename(parent_dce.element_identifier) # reverse for path from parents: oldest parent first archive_path = os.path.join(archive_path, *build_path_from_parents(parents)[::-1]) # TODO: this is brute force - building the path each time instead of re-using it # possibly cache # add the name as the last element in the archive path content_id_and_name = id_name_format.format(content_container_id, content_name) archive_path = os.path.join(archive_path, content_id_and_name) # ---- for composite files, we use id and name for a directory and, inside that, ... if self.hda_manager.is_composite(content): # ...save the 'main' composite file (gen. html) paths_and_files.append((content.file_name, os.path.join(archive_path, f"{content.name}.html"))) for extra_file in self.hda_manager.extra_files(content): extra_file_basename = os.path.basename(extra_file) archive_extra_file_path = os.path.join(archive_path, extra_file_basename) # ...and one for each file in the composite paths_and_files.append((extra_file, archive_extra_file_path)) # ---- for single files, we add the true extension to id and name and store that single filename else: # some dataset names can contain their original file extensions, don't repeat if not archive_path.endswith(f".{content.extension}"): archive_path += f".{content.extension}" paths_and_files.append((content.file_name, archive_path)) # filter the contents that contain datasets using any filters possible from index above and map the datasets filters = self.history_contents_filters.parse_query_filters(filter_query_params) self.history_contents_manager.map_datasets(history, build_archive_files_and_paths, filters=filters) # if dry_run, return the structure as json for debugging if dry_run: trans.response.headers['Content-Type'] = 'application/json' return safe_dumps(paths_and_files) # create the archive, add the dataset files, then stream the archive as a download archive = ZipstreamWrapper( archive_name=archive_base_name, upstream_mod_zip=trans.app.config.upstream_mod_zip, upstream_gzip=trans.app.config.upstream_gzip, ) for file_path, archive_path in paths_and_files: archive.write(file_path, archive_path) trans.response.headers.update(archive.get_headers()) return archive.response()
[docs] def contents_near( self, trans, history_id: EncodedDatabaseIdField, serialization_params: SerializationParams, filter_params: HistoryContentsFilterList, hid: int, limit: int, since: Optional[datetime.datetime] = None, ): """ This endpoint provides random access to a large history without having to know exactly how many pages are in the final query. Pick a target HID and filters, and the endpoint will get LIMIT counts above and below that target regardless of how many gaps may exist in the HID due to filtering. It does 2 queries, one up and one down from the target hid with a result size of limit. Additional counts for total matches of both seeks provided in the http headers. I've also abandoned the wierd q/qv syntax. """ history: History = self.history_manager.get_accessible( self.decode_id(history_id), trans.user, current_history=trans.history ) # while polling, check to see if the history has changed # if it hasn't then we can short-circuit the poll request if since: # sqlalchemy DateTime columns are not timezone aware, but `since` may be a timezone aware # DateTime object if a timezone offset is provided (https://github.com/samuelcolvin/pydantic/blob/5ccbdcb5904f35834300b01432a665c75dc02296/pydantic/datetime_parse.py#L179). # If a timezone is provided (since.tzinfo is not None) we convert to UTC and remove tzinfo so that comparison with history.update_time is correct. since = since if since.tzinfo is None else since.astimezone(datetime.timezone.utc).replace(tzinfo=None) if history.update_time <= since: trans.response.status = 204 return # parse content params view = serialization_params.pop('view') # SEEK UP, contents > hid up_params = filter_params + self._hid_greater_than(hid) up_order = 'hid-asc' contents_up, up_count = self._seek(history, up_params, up_order, limit, serialization_params) # SEEK DOWN, contents <= hid down_params = filter_params + self._hid_less_or_equal_than(hid) down_order = 'hid-dsc' contents_down, down_count = self._seek(history, down_params, down_order, limit, serialization_params) # min/max hid values min_hid, max_hid = self._get_filtered_extrema(history, filter_params) # results up = self._expand_contents(trans, contents_up, serialization_params, view) up.reverse() down = self._expand_contents(trans, contents_down, serialization_params, view) contents = up + down # Put stats in http headers trans.response.headers['matches_up'] = len(contents_up) trans.response.headers['matches_down'] = len(contents_down) trans.response.headers['total_matches_up'] = up_count trans.response.headers['total_matches_down'] = down_count trans.response.headers['max_hid'] = max_hid trans.response.headers['min_hid'] = min_hid trans.response.headers['history_size'] = str(history.disk_size) trans.response.headers['history_empty'] = json.dumps(history.empty) # convert to proper bool return json.dumps(contents)
def _hid_greater_than(self, hid: int) -> HistoryContentsFilterList: return [["hid", "gt", hid]] def _hid_less_or_equal_than(self, hid: int) -> HistoryContentsFilterList: return [["hid", "le", hid]] # Perform content query and matching count def _seek(self, history, filter_params, order_by_string, limit, serialization_params): filters = self.history_contents_filters.parse_filters(filter_params) order_by = self.build_order_by(self.history_contents_manager, order_by_string) # actual contents contents = self.history_contents_manager.contents(history, filters=filters, limit=limit, offset=0, order_by=order_by, serialization_params=serialization_params) # count of same query count_filter_params = [f for f in filter_params if f[0] != 'update_time'] count_filters = self.history_contents_filters.parse_filters(count_filter_params) contents_count = self.history_contents_manager.contents_count(history, count_filters) return contents, contents_count # Adds subquery details to initial contents results, perhaps better realized # as a proc or view. def _expand_contents(self, trans, contents, serialization_params, view): rval = [] for content in contents: if isinstance(content, HistoryDatasetAssociation): dataset = self.hda_serializer.serialize_to_view(content, user=trans.user, trans=trans, view=view, **serialization_params) rval.append(dataset) elif isinstance(content, HistoryDatasetCollectionAssociation): collection = self.hdca_serializer.serialize_to_view(content, user=trans.user, trans=trans, view=view, **serialization_params) rval.append(collection) return rval def _get_filtered_extrema(self, history, filter_params): extrema_params = parse_serialization_params(keys='hid', default_view='summary') extrema_filter_params = [f for f in filter_params if f[0] != 'update_time'] extrema_filters = self.history_contents_filters.parse_filters(extrema_filter_params) order_by_dsc = self.build_order_by(self.history_contents_manager, 'hid-dsc') order_by_asc = self.build_order_by(self.history_contents_manager, 'hid-asc') max_row_result = self.history_contents_manager.contents(history, limit=1, filters=extrema_filters, order_by=order_by_dsc, serialization_params=extrema_params) max_row = max_row_result.pop() if len(max_row_result) else None min_row_result = self.history_contents_manager.contents(history, limit=1, filters=extrema_filters, order_by=order_by_asc, serialization_params=extrema_params) min_row = min_row_result.pop() if len(min_row_result) else None max_hid = max_row.hid if max_row else None min_hid = min_row.hid if min_row else None return min_hid, max_hid def __delete_dataset( self, trans, id: EncodedDatabaseIdField, purge: bool, serialization_params: SerializationParams ): hda = self.hda_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history) self.hda_manager.error_if_uploading(hda) if purge: self.hda_manager.purge(hda) else: self.hda_manager.delete(hda) serialization_params["default_view"] = 'detailed' return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params ) def __update_dataset_collection(self, trans, id: EncodedDatabaseIdField, payload: Dict[str, Any]): return self.dataset_collection_manager.update(trans, "history", id, payload) def __update_dataset( self, trans, history_id: EncodedDatabaseIdField, id: EncodedDatabaseIdField, payload: Dict[str, Any], serialization_params: SerializationParams ): # anon user: ensure that history ids match up and the history is the current, # check for uploading, and use only the subset of attribute keys manipulatable by anon users decoded_id = self.decode_id(id) history = self.history_manager.get_owned( self.decode_id(history_id), trans.user, current_history=trans.history ) hda = self.__datasets_for_update(trans, history, [decoded_id], payload)[0] if hda: self.__deserialize_dataset(trans, hda, payload) serialization_params["default_view"] = 'detailed' return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params ) return {} def __datasets_for_update( self, trans, history: History, hda_ids: List[int], payload: Dict[str, Any] ): anonymous_user = not trans.user_is_admin and trans.user is None if anonymous_user: anon_allowed_payload = {} if 'deleted' in payload: anon_allowed_payload['deleted'] = payload['deleted'] if 'visible' in payload: anon_allowed_payload['visible'] = payload['visible'] payload = anon_allowed_payload hdas = self.hda_manager.get_owned_ids(hda_ids, history=history) # only check_state if not deleting, otherwise cannot delete uploading files check_state = not payload.get('deleted', False) if check_state: for hda in hdas: hda = self.hda_manager.error_if_uploading(hda) return hdas def __deserialize_dataset(self, trans, hda, payload: Dict[str, Any]): self.hda_deserializer.deserialize(hda, payload, user=trans.user, trans=trans) # TODO: this should be an effect of deleting the hda if payload.get('deleted', False): self.hda_manager.stop_creating_job(hda) def __index_legacy( self, trans, history_id: EncodedDatabaseIdField, legacy_params: HistoryContentsIndexLegacyParams, ) -> List[AnyHistoryContentItem]: """Legacy implementation of the `index` action.""" history = self._get_history(trans, history_id) parsed_legacy_params = self._parse_legacy_contents_params(legacy_params) contents = history.contents_iter(**parsed_legacy_params) return [ self._serialize_legacy_content_item(trans, content, parsed_legacy_params.get("dataset_details")) for content in contents ] def __index_v2( self, trans, history_id: EncodedDatabaseIdField, legacy_params: HistoryContentsIndexLegacyParams, serialization_params: SerializationParams, filter_query_params: HistoryContentsFilterQueryParams, ) -> List[AnyHistoryContentItem]: """ Latests implementation of the `index` action. Allows additional filtering of contents and custom serialization. """ history = self._get_history(trans, history_id) filters = self.history_contents_filters.parse_query_filters(filter_query_params) order_by = self.build_order_by(self.history_contents_manager, filter_query_params.order) # TODO: > 16.04: remove these # TODO: remove 'dataset_details' and the following section when the UI doesn't need it parsed_legacy_params = self._parse_legacy_contents_params(legacy_params) contents = self.history_contents_manager.contents( history, filters=filters, limit=filter_query_params.limit, offset=filter_query_params.offset, order_by=order_by, serialization_params=serialization_params ) return [ self._serialize_content_item( trans, content, dataset_details=parsed_legacy_params.get("dataset_details"), serialization_params=serialization_params, ) for content in contents ] def _serialize_legacy_content_item( self, trans, content, dataset_details: Optional[DatasetDetailsType] = None, ): encoded_content_id = self.encode_id(content.id) detailed = dataset_details and (dataset_details == 'all' or (encoded_content_id in dataset_details)) if isinstance(content, HistoryDatasetAssociation): view = 'detailed' if detailed else 'summary' return self.hda_serializer.serialize_to_view(content, view=view, user=trans.user, trans=trans) elif isinstance(content, HistoryDatasetCollectionAssociation): view = 'element' if detailed else 'collection' return self.__collection_dict(trans, content, view=view) def _serialize_content_item( self, trans, content, dataset_details: Optional[DatasetDetailsType], serialization_params: SerializationParams, default_view: str = "summary", ): """ Returns a dictionary with the appropriate values depending on the serialization parameters provided. """ view = serialization_params.pop("view", default_view) or default_view serializer: Optional[ModelSerializer] = None if isinstance(content, HistoryDatasetAssociation): serializer = self.hda_serializer if dataset_details and (dataset_details == 'all' or self.encode_id(content.id) in dataset_details): view = "detailed" elif isinstance(content, HistoryDatasetCollectionAssociation): serializer = self.hdca_serializer if serializer is None: raise exceptions.UnknownContentsType(f'Unknown contents type: {content.content_type}') return serializer.serialize_to_view( content, user=trans.user, trans=trans, view=view, **serialization_params ) def _parse_legacy_contents_params(self, params: HistoryContentsIndexLegacyParams): if params.types: types = util.listify(params.types) else: types = [e.value for e in HistoryContentType] details: Any = params.dataset_details ids = None if params.ids: ids = [self.decode_id(EncodedDatabaseIdField(id)) for id in params.ids.split(',')] # If explicit ids given, always used detailed result. details = 'all' else: if details and details != 'all': details = set(util.listify(details)) return ParsedHistoryContentsLegacyParams( types=types, ids=ids, deleted=params.deleted, visible=params.visible, dataset_details=details, ).dict(exclude_defaults=True) def __collection_dict(self, trans, dataset_collection_instance, **kwds): return dictify_dataset_collection_instance(dataset_collection_instance, security=trans.security, parent=dataset_collection_instance.history, **kwds) def _get_history(self, trans, history_id: EncodedDatabaseIdField) -> History: """Retrieves the History with the given ID or raises an error if the current user cannot access it.""" history = self.history_manager.get_accessible(self.decode_id(history_id), trans.user, current_history=trans.history) return history def __show_dataset( self, trans, id: EncodedDatabaseIdField, serialization_params: SerializationParams, ): serialization_params["default_view"] = "detailed" hda = self.hda_manager.get_accessible(self.decode_id(id), trans.user) return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params ) def __show_dataset_collection( self, trans, id: EncodedDatabaseIdField, serialization_params: SerializationParams, fuzzy_count: Optional[int] = None, ): dataset_collection_instance = self.__get_accessible_collection(trans, id) view = serialization_params.get("view") or "element" return self.__collection_dict(trans, dataset_collection_instance, view=view, fuzzy_count=fuzzy_count) def __get_accessible_collection(self, trans, id: EncodedDatabaseIdField): return self.dataset_collection_manager.get_dataset_collection_instance( trans=trans, instance_type="history", id=id, ) def __create_datasets_from_library_folder( self, trans, history: History, payload: CreateHistoryContentPayloadFromCopy, serialization_params: SerializationParams, ): rval = [] source = payload.source if source == HistoryContentSource.library_folder: content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of lda or hda is missing") folder_id = self.folder_manager.cut_and_decode(trans, content) folder = self.folder_manager.get(trans, folder_id) current_user_roles = trans.get_current_user_roles() security_agent: GalaxyRBACAgent = trans.app.security_agent def traverse(folder): admin = trans.user_is_admin rval = [] for subfolder in folder.active_folders: if not admin: can_access, folder_ids = security_agent.check_folder_contents(trans.user, current_user_roles, subfolder) if (admin or can_access) and not subfolder.deleted: rval.extend(traverse(subfolder)) for ld in folder.datasets: if not admin: can_access = security_agent.can_access_dataset( current_user_roles, ld.library_dataset_dataset_association.dataset ) if (admin or can_access) and not ld.deleted: rval.append(ld) return rval for ld in traverse(folder): hda = ld.library_dataset_dataset_association.to_history_dataset_association(history, add_to_history=True) hda_dict = self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, default_view='detailed', **serialization_params ) rval.append(hda_dict) else: message = f"Invalid 'source' parameter in request: {source}" raise exceptions.RequestParameterInvalidException(message) trans.sa_session.flush() return rval def __create_dataset( self, trans, history: History, payload: CreateHistoryContentPayloadFromCopy, serialization_params: SerializationParams, ): source = payload.source if source not in (HistoryContentSource.library, HistoryContentSource.hda): raise exceptions.RequestParameterInvalidException( f"'source' must be either 'library' or 'hda': {source}") content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of lda or hda is missing") hda = None if source == HistoryContentSource.library: hda = self.__create_hda_from_ldda(trans, history, content) elif source == HistoryContentSource.hda: hda = self.__create_hda_from_copy(trans, history, content) if hda is None: return None trans.sa_session.flush() serialization_params["default_view"] = 'detailed' return self.hda_serializer.serialize_to_view( hda, user=trans.user, trans=trans, **serialization_params ) def __create_hda_from_ldda(self, trans, history: History, ldda_id: EncodedDatabaseIdField): decoded_ldda_id = self.decode_id(ldda_id) ld = self.ldda_manager.get(trans, decoded_ldda_id) if type(ld) is not LibraryDataset: raise exceptions.RequestParameterInvalidException( f"Library content id ( {ldda_id} ) is not a dataset") hda = ld.library_dataset_dataset_association.to_history_dataset_association(history, add_to_history=True) return hda def __create_hda_from_copy(self, trans, history: History, original_hda_id: EncodedDatabaseIdField): unencoded_hda_id = self.decode_id(original_hda_id) original = self.hda_manager.get_accessible(unencoded_hda_id, trans.user) # check for access on history that contains the original hda as well self.history_manager.error_unless_accessible(original.history, trans.user, current_history=trans.history) hda = self.hda_manager.copy(original, history=history) return hda def __create_dataset_collection( self, trans, history: History, payload: CreateHistoryContentPayloadFromCollection, serialization_params: SerializationParams, ): """Create hdca in a history from the list of element identifiers :param history: history the new hdca should be added to :param source: whether to create a new collection or copy existing one :type source: str :param payload: dictionary structure containing: :param collection_type: type (and depth) of the new collection :type name: str :param element_identifiers: list of elements that should be in the new collection :param element: one member of the collection :param name: name of the element :type name: str :param src: source of the element (hda/ldda) :type src: str :param id: identifier :type id: str :param id: tags :type id: list :type element: dict :type name: list :param name: name of the collection :type name: str :param hide_source_items: whether to mark the original hdas as hidden :type name: bool :param copy_elements: whether to copy HDAs when creating collection :type name: bool :type payload: dict .. note:: Elements may be nested depending on the collection_type :returns: dataset collection information :rtype: dict :raises: RequestParameterInvalidException, RequestParameterMissingException """ source = payload.source or HistoryContentSource.new_collection dataset_collection_manager = self.dataset_collection_manager if source == HistoryContentSource.new_collection: create_params = api_payload_to_create_params(payload.dict()) dataset_collection_instance = dataset_collection_manager.create( trans, parent=history, history=history, **create_params ) elif source == HistoryContentSource.hdca: content = payload.content if content is None: raise exceptions.RequestParameterMissingException("'content' id of target to copy is missing") dbkey = payload.dbkey copy_required = dbkey is not None copy_elements = payload.copy_elements or copy_required if copy_required and not copy_elements: raise exceptions.RequestParameterMissingException("copy_elements passed as 'false' but it is required to change specified attributes") dataset_instance_attributes = {} if dbkey is not None: dataset_instance_attributes["dbkey"] = dbkey dataset_collection_instance = dataset_collection_manager.copy( trans=trans, parent=history, source=source, encoded_source_id=content, copy_elements=copy_elements, dataset_instance_attributes=dataset_instance_attributes, ) else: message = f"Invalid 'source' parameter in request: {source}" raise exceptions.RequestParameterInvalidException(message) # if the consumer specified keys or view, use the secondary serializer if serialization_params.get('view') or serialization_params.get('keys'): serialization_params["default_view"] = 'detailed' return self.hdca_serializer.serialize_to_view( dataset_collection_instance, user=trans.user, trans=trans, **serialization_params ) return self.__collection_dict(trans, dataset_collection_instance, view="element")
[docs]class HistoryContentsController(BaseGalaxyAPIController, UsesLibraryMixinItems, UsesTagsMixin): hda_manager: hdas.HDAManager = depends(hdas.HDAManager) history_manager: histories.HistoryManager = depends(histories.HistoryManager) history_contents_manager: history_contents.HistoryContentsManager = depends(history_contents.HistoryContentsManager) hda_serializer: hdas.HDASerializer = depends(hdas.HDASerializer) hdca_serializer: hdcas.HDCASerializer = depends(hdcas.HDCASerializer) history_contents_filters: history_contents.HistoryContentsFilters = depends(history_contents.HistoryContentsFilters) service: HistoriesContentsService = depends(HistoriesContentsService)
[docs] @expose_api_anonymous def index(self, trans, history_id, **kwd): """ GET /api/histories/{history_id}/contents return a list of HDA data for the history with the given ``id`` .. note:: Anonymous users are allowed to get their current history contents If Ids is not given, index returns a list of *summary* objects for every HDA associated with the given `history_id`. If ids is given, index returns a *more complete* json object for each HDA in the ids list. :type history_id: str :param history_id: encoded id string of the HDA's History :type ids: str :param ids: (optional) a comma separated list of encoded `HDA` ids :param types: (optional) kinds of contents to index (currently just dataset, but dataset_collection will be added shortly). :type types: str :rtype: list :returns: dictionaries containing summary or detailed HDA information """ legacy_params = HistoryContentsIndexLegacyParams(**kwd) serialization_params = parse_serialization_params(**kwd) filter_parameters = HistoryContentsFilterQueryParams(**kwd) return self.service.index( trans, history_id, legacy_params, serialization_params, filter_parameters )
[docs] @expose_api_anonymous def show(self, trans, id, history_id, **kwd): """ GET /api/histories/{history_id}/contents/{id} GET /api/histories/{history_id}/contents/{type}/{id} return detailed information about an HDA or HDCA within a history .. note:: Anonymous users are allowed to get their current history contents :type id: str :param id: the encoded id of the HDA or HDCA to return :type type: str :param id: 'dataset' or 'dataset_collection' :type history_id: str :param history_id: encoded id string of the HDA's or HDCA's History :type view: str :param view: if fetching a dataset collection - the view style of the dataset collection to produce. 'collection' returns no element information, 'element' returns detailed element information for all datasets, 'element-reference' returns a minimal set of information about datasets (for instance id, type, and state but not metadata, peek, info, or name). The default is 'element'. :type fuzzy_count: int :param fuzzy_count: this value can be used to broadly restrict the magnitude of the number of elements returned via the API for large collections. The number of actual elements returned may be "a bit" more than this number or "a lot" less - varying on the depth of nesting, balance of nesting at each level, and size of target collection. The consumer of this API should not expect a stable number or pre-calculable number of elements to be produced given this parameter - the only promise is that this API will not respond with an order of magnitude more elements estimated with this value. The UI uses this parameter to fetch a "balanced" concept of the "start" of large collections at every depth of the collection. :rtype: dict :returns: dictionary containing detailed HDA or HDCA information """ serialization_params = parse_serialization_params(**kwd) contents_type = self.__get_contents_type(kwd) fuzzy_count = kwd.get("fuzzy_count", None) if fuzzy_count: fuzzy_count = int(fuzzy_count) return self.service.show(trans, id, serialization_params, contents_type, fuzzy_count)
[docs] @expose_api_anonymous def index_jobs_summary(self, trans, history_id, **kwd): """ * GET /api/histories/{history_id}/jobs_summary return job state summary info for jobs, implicit groups jobs for collections or workflow invocations Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. :type history_id: str :param history_id: encoded id string of the target history :type ids: str[] :param ids: the encoded ids of job summary objects to return - if ids is specified types must also be specified and have same length. :type types: str[] :param types: type of object represented by elements in the ids array - any of Job, ImplicitCollectionJob, or WorkflowInvocation. :rtype: dict[] :returns: an array of job summary object dictionaries. """ ids = util.listify(kwd.get("ids", None)) types = util.listify(kwd.get("types", None)) return self.service.index_jobs_summary(trans, ids, types)
[docs] @expose_api_anonymous def show_jobs_summary(self, trans, id, history_id, **kwd): """ * GET /api/histories/{history_id}/contents/{type}/{id}/jobs_summary return detailed information about an HDA or HDCAs jobs Warning: We allow anyone to fetch job state information about any object they can guess an encoded ID for - it isn't considered protected data. This keeps polling IDs as part of state calculation for large histories and collections as efficient as possible. :type id: str :param id: the encoded id of the HDA to return :type history_id: str :param history_id: encoded id string of the HDA's or the HDCA's History :rtype: dict :returns: dictionary containing jobs summary object """ contents_type = self.__get_contents_type(kwd) return self.service.show_jobs_summary(trans, id, contents_type)
def __get_contents_type(self, kwd: dict) -> HistoryContentType: contents_type = kwd.get('type', 'dataset') return HistoryContentType(contents_type)
[docs] @expose_api_raw_anonymous def download_dataset_collection(self, trans, id, history_id=None, **kwd): """ GET /api/histories/{history_id}/contents/dataset_collections/{id}/download GET /api/dataset_collection/{id}/download Download the content of a HistoryDatasetCollection as a tgz archive while maintaining approximate collection structure. :param id: encoded HistoryDatasetCollectionAssociation (HDCA) id :param history_id: encoded id string of the HDCA's History """ return self.service.download_dataset_collection(trans, id)
[docs] @expose_api_anonymous def create(self, trans, history_id, payload, **kwd): """ POST /api/histories/{history_id}/contents/{type}s POST /api/histories/{history_id}/contents create a new HDA or HDCA :type history_id: str :param history_id: encoded id string of the new HDA's History :type type: str :param type: Type of history content - 'dataset' (default) or 'dataset_collection'. This can be passed in via payload or parsed from the route. :type payload: dict :param payload: dictionary structure containing: copy from library (for type 'dataset'): 'source' = 'library' 'content' = [the encoded id from the library dataset] copy from library folder 'source' = 'library_folder' 'content' = [the encoded id from the library folder] copy from history dataset (for type 'dataset'): 'source' = 'hda' 'content' = [the encoded id from the HDA] copy from history dataset collection (for type 'dataset_collection') 'source' = 'hdca' 'content' = [the encoded id from the HDCA] 'copy_elements' Copy child HDAs into the target history as well, defaults to False but this is less than ideal and may be changed in future releases. create new history dataset collection (for type 'dataset_collection') 'source' 'new_collection' (default 'source' if type is 'dataset_collection' - no need to specify this) 'collection_type' For example, "list", "paired", "list:paired". 'copy_elements' Copy child HDAs when creating new collection, defaults to False in the API but is set to True in the UI, so that we can modify HDAs with tags when creating collections. 'name' Name of new dataset collection. 'element_identifiers' Recursive list structure defining collection. Each element must have 'src' which can be 'hda', 'ldda', 'hdca', or 'new_collection', as well as a 'name' which is the name of element (e.g. "forward" or "reverse" for paired datasets, or arbitrary sample names for instance for lists). For all src's except 'new_collection' - a encoded 'id' attribute must be included wiht element as well. 'new_collection' sources must defined a 'collection_type' and their own list of (potentially) nested 'element_identifiers'. ..note: Currently, a user can only copy an HDA from a history that the user owns. :rtype: dict :returns: dictionary containing detailed information for the new HDA """ serialization_params = parse_serialization_params(**kwd) create_payload = CreateHistoryContentPayload(**payload) return self.service.create(trans, history_id, create_payload, serialization_params)
[docs] @expose_api def show_roles(self, trans, encoded_dataset_id, **kwd): """ Display information about current or available roles for a given dataset permission. * GET /api/histories/{history_id}/contents/datasets/{encoded_dataset_id}/permissions :param encoded_dataset_id: the encoded id of the dataset to query :type encoded_dataset_id: an encoded id string :returns: either dict of current roles for all permission types or dict of available roles to choose from (is the same for any permission type) :rtype: dictionary :raises: InsufficientPermissionsException """ raise NotImplementedError() # TODO: This endpoint doesn't seem to be in use, remove?
# hda = self.hda_manager.get_owned(self.decode_id(encoded_dataset_id), trans.user, current_history=trans.history, trans=trans) # return self.hda_manager.serialize_dataset_association_roles(trans, hda)
[docs] @expose_api def update_permissions(self, trans, history_id, history_content_id, payload: Dict[str, Any] = None, **kwd): """ Set permissions of the given library dataset to the given role ids. PUT /api/histories/{history_id}/contents/datasets/{encoded_dataset_id}/permissions :param encoded_dataset_id: the encoded id of the dataset to update permissions of :type encoded_dataset_id: an encoded id string :param payload: dictionary structure containing: :param action: (required) describes what action should be performed available actions: make_private, remove_restrictions, set_permissions :type action: string :param access_ids[]: list of Role.id defining roles that should have access permission on the dataset :type access_ids[]: string or list :param manage_ids[]: list of Role.id defining roles that should have manage permission on the dataset :type manage_ids[]: string or list :param modify_ids[]: list of Role.id defining roles that should have modify permission on the library dataset item :type modify_ids[]: string or list :type: dictionary :returns: dict of current roles for all available permission types :rtype: dictionary :raises: RequestParameterInvalidException, ObjectNotFound, InsufficientPermissionsException, InternalServerError RequestParameterMissingException """ if payload: kwd.update(payload) update_payload = self._get_update_permission_payload(kwd) return self.service.update_permissions(trans, history_content_id, update_payload)
def _get_update_permission_payload(self, payload: Dict[str, Any]) -> UpdateDatasetPermissionsPayload: """Coverts the payload dictionary into a UpdateDatasetPermissionsPayload model with custom parsing. This is an attempt on supporting multiple aliases for the permissions params.""" # There are several allowed names for the same role list parameter, i.e.: `access`, `access_ids`, `access_ids[]` # The `access_ids[]` name is not pydantic friendly, so this will be modelled as an alias but we can only set one alias # TODO: Maybe we should choose only one way and deprecate the others? payload["access_ids[]"] = payload.get("access_ids[]") or payload.get("access") payload["manage_ids[]"] = payload.get("manage_ids[]") or payload.get("manage") payload["modify_ids[]"] = payload.get("modify_ids[]") or payload.get("modify") # The action is required, so the default will be used if none is specified payload["action"] = payload.get("action", DatasetPermissionAction.set_permissions) update_payload = UpdateDatasetPermissionsPayload(**payload) return update_payload
[docs] @expose_api_anonymous def update_batch(self, trans, history_id, payload, **kwd): """ PUT /api/histories/{history_id}/contents :type history_id: str :param history_id: encoded id string of the history containing supplied items :type id: str :param id: the encoded id of the history to update :type payload: dict :param payload: a dictionary containing any or all the :rtype: dict :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ serialization_params = parse_serialization_params(**kwd) update_payload = UpdateHistoryContentsBatchPayload.parse_obj(payload) return self.service.update_batch(trans, history_id, update_payload, serialization_params)
[docs] @expose_api_anonymous def update(self, trans, history_id, id, payload, **kwd): """ update( self, trans, history_id, id, payload, **kwd ) * PUT /api/histories/{history_id}/contents/{id} updates the values for the history content item with the given ``id`` :type history_id: str :param history_id: encoded id string of the items's History :type id: str :param id: the encoded id of the history item to update :type payload: dict :param payload: a dictionary containing any or all the fields in :func:`galaxy.model.HistoryDatasetAssociation.to_dict` and/or the following: * annotation: an annotation for the HDA :rtype: dict :returns: an error object if an error occurred or a dictionary containing any values that were different from the original and, therefore, updated """ serialization_params = parse_serialization_params(**kwd) contents_type = self.__get_contents_type(kwd) return self.service.update( trans, history_id, id, payload, serialization_params, contents_type )
[docs] @expose_api_anonymous def validate(self, trans, history_id, history_content_id, payload=None, **kwd): """ PUT /api/histories/{history_id}/contents/{id}/validate updates the values for the history content item with the given ``id`` :type history_id: str :param history_id: encoded id string of the items's History :type id: str :param id: the encoded id of the history item to validate :rtype: dict :returns: TODO """ return self.service.validate(trans, history_id, history_content_id)
[docs] @expose_api_anonymous def delete(self, trans, history_id, id, purge=False, recursive=False, **kwd): """ DELETE /api/histories/{history_id}/contents/{id} DELETE /api/histories/{history_id}/contents/{type}s/{id} delete the history content with the given ``id`` and specified type (defaults to dataset) .. note:: Currently does not stop any active jobs for which this dataset is an output. :type id: str :param id: the encoded id of the history to delete :type recursive: bool :param recursive: if True, and deleted an HDCA also delete containing HDAs :type purge: bool :param purge: if True, purge the target HDA or child HDAs of the target HDCA :type kwd: dict :param kwd: (optional) dictionary structure containing: * payload: a dictionary itself containing: * purge: if True, purge the HDA * recursive: if True, see above. .. note:: that payload optionally can be placed in the query string of the request. This allows clients that strip the request body to still purge the dataset. :rtype: dict :returns: an error object if an error occurred or a dictionary containing: * id: the encoded id of the history, * deleted: if the history content was marked as deleted, * purged: if the history content was purged """ serialization_params = parse_serialization_params(**kwd) contents_type = self.__get_contents_type(kwd) purge = util.string_as_bool(purge) recursive = util.string_as_bool(recursive) if kwd.get('payload', None): # payload takes priority purge = util.string_as_bool(kwd['payload'].get('purge', purge)) recursive = util.string_as_bool(kwd['payload'].get('recursive', recursive)) return self.service.delete(trans, id, serialization_params, contents_type, purge, recursive)
[docs] @expose_api_raw def archive(self, trans, history_id, filename='', format='zip', dry_run=True, **kwd): """ archive( self, trans, history_id, filename='', format='zip', dry_run=True, **kwd ) * GET /api/histories/{history_id}/contents/archive/{id} * GET /api/histories/{history_id}/contents/archive/{filename}.{format} build and return a compressed archive of the selected history contents :type filename: string :param filename: (optional) archive name (defaults to history name) :type dry_run: boolean :param dry_run: (optional) if True, return the archive and file paths only as json and not an archive file :returns: archive file for download .. note:: this is a volatile endpoint and settings and behavior may change. """ dry_run = util.string_as_bool(dry_run) filter_parameters = HistoryContentsFilterQueryParams(**kwd) return self.service.archive(trans, history_id, filter_parameters, filename, dry_run)
[docs] @expose_api_raw_anonymous def contents_near(self, trans, history_id, hid, limit, **kwd): """ This endpoint provides random access to a large history without having to know exactly how many pages are in the final query. Pick a target HID and filters, and the endpoint will get LIMIT counts above and below that target regardless of how many gaps may exist in the HID due to filtering. It does 2 queries, one up and one down from the target hid with a result size of limit. Additional counts for total matches of both seeks provided in the http headers. I've also abandoned the wierd q/qv syntax. * GET /api/histories/{history_id}/contents/near/{hid}/{limit} """ serialization_params = parse_serialization_params(default_view='betawebclient', **kwd) since_str = kwd.pop('since', None) if since_str: since = dateutil.parser.isoparse(since_str) else: since = None filter_params = self._parse_rest_params(kwd) hid = int(hid) limit = int(limit) return self.service.contents_near( trans, history_id, serialization_params, filter_params, hid, limit, since, )
# Parsing query string according to REST standards. def _parse_rest_params(self, qdict: Dict[str, Any]) -> HistoryContentsFilterList: DEFAULT_OP = 'eq' splitchar = '-' result = [] for key, val in qdict.items(): attr = key op = DEFAULT_OP if splitchar in key: attr, op = key.rsplit(splitchar, 1) result.append([attr, op, val]) return result