Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.services.histories
import glob
import logging
import os
import shutil
from pathlib import Path
from tempfile import (
NamedTemporaryFile,
SpooledTemporaryFile,
)
from typing import (
cast,
List,
Optional,
Tuple,
Union,
)
from sqlalchemy import (
false,
select,
true,
)
from sqlalchemy.orm import Session
from galaxy import (
exceptions as glx_exceptions,
model,
)
from galaxy.celery.tasks import (
import_model_store,
prepare_history_download,
write_history_to,
)
from galaxy.files.uris import validate_uri_access
from galaxy.managers.citations import CitationsManager
from galaxy.managers.context import ProvidesHistoryContext
from galaxy.managers.histories import (
HistoryDeserializer,
HistoryExportManager,
HistoryFilters,
HistoryManager,
HistorySerializer,
)
from galaxy.managers.notification import NotificationManager
from galaxy.managers.users import UserManager
from galaxy.model import HistoryDatasetAssociation
from galaxy.model.base import transaction
from galaxy.model.store import payload_to_source_uri
from galaxy.schema import (
FilterQueryParams,
SerializationParams,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.history import HistoryIndexQueryPayload
from galaxy.schema.schema import (
AnyArchivedHistoryView,
AnyHistoryView,
ArchiveHistoryRequestPayload,
AsyncFile,
AsyncTaskResultSummary,
CreateHistoryFromStore,
CreateHistoryPayload,
CustomBuildsMetadataResponse,
ExportHistoryArchivePayload,
HistoryArchiveExportResult,
HistoryImportArchiveSourceType,
JobExportHistoryArchiveModel,
JobIdResponse,
JobImportHistoryResponse,
LabelValuePair,
ShareHistoryWithStatus,
ShareWithPayload,
StoreExportPayload,
WriteStoreToPayload,
)
from galaxy.schema.tasks import (
GenerateHistoryDownload,
ImportModelStoreTaskRequest,
WriteHistoryTo,
)
from galaxy.schema.types import LatestLiteral
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.short_term_storage import ShortTermStorageAllocator
from galaxy.util import restore_text
from galaxy.webapps.galaxy.services.base import (
async_task_summary,
ConsumesModelStores,
model_store_storage_target,
ServesExportStores,
ServiceBase,
)
from galaxy.webapps.galaxy.services.sharable import ShareableService
log = logging.getLogger(__name__)
DEFAULT_ORDER_BY = "create_time-dsc"
[docs]class HistoriesService(ServiceBase, ConsumesModelStores, ServesExportStores):
"""Common interface/service logic for interactions with histories in the context of the API.
Provides the logic of the actions invoked by API controllers and uses type definitions
and pydantic models to declare its parameters and return types.
"""
[docs] def __init__(
self,
security: IdEncodingHelper,
manager: HistoryManager,
user_manager: UserManager,
serializer: HistorySerializer,
deserializer: HistoryDeserializer,
citations_manager: CitationsManager,
history_export_manager: HistoryExportManager,
filters: HistoryFilters,
short_term_storage_allocator: ShortTermStorageAllocator,
notification_manager: NotificationManager,
):
super().__init__(security)
self.manager = manager
self.user_manager = user_manager
self.serializer = serializer
self.deserializer = deserializer
self.citations_manager = citations_manager
self.history_export_manager = history_export_manager
self.filters = filters
self.shareable_service = ShareableHistoryService(self.manager, self.serializer, notification_manager)
self.short_term_storage_allocator = short_term_storage_allocator
[docs] def index(
self,
trans: ProvidesHistoryContext,
serialization_params: SerializationParams,
filter_query_params: FilterQueryParams,
deleted_only: Optional[bool] = False,
all_histories: Optional[bool] = False,
):
"""
Return a collection of histories for the current user. Additional filters can be applied.
:type deleted_only: optional boolean
:param deleted_only: if True, show only deleted histories, if False, non-deleted
.. note:: Anonymous users are allowed to get their current history
"""
# bail early with current history if user is anonymous
current_user = self.user_manager.current_user(trans)
if self.user_manager.is_anonymous(current_user):
current_history = self.manager.get_current(trans)
if not current_history:
return []
# note: ignores filters, limit, offset
return [self._serialize_history(trans, current_history, serialization_params)]
filter_params = self.filters.build_filter_params(filter_query_params)
filters = []
# support the old default of not-returning/filtering-out deleted_only histories
filters += self._get_deleted_filter(deleted_only, filter_params)
# if parameter 'all_histories' is true, throw exception if not admin
# else add current user filter to query (default behaviour)
if all_histories:
if not trans.user_is_admin:
message = "Only admins can query all histories"
raise glx_exceptions.AdminRequiredException(message)
else:
filters += [model.History.user == current_user]
# exclude archived histories
filters += [model.History.archived == false()]
# and apply any other filters
filters += self.filters.parse_filters(filter_params)
order_by = self._build_order_by(filter_query_params.order)
histories = self.manager.list(
filters=filters, order_by=order_by, limit=filter_query_params.limit, offset=filter_query_params.offset
)
rval = [
self._serialize_history(trans, history, serialization_params, default_view="summary")
for history in histories
]
return rval
def _get_deleted_filter(self, deleted: Optional[bool], filter_params: List[Tuple[str, str, str]]):
# TODO: this should all be removed (along with the default) in v2
# support the old default of not-returning/filtering-out deleted histories
try:
# the consumer must explicitly ask for both deleted and non-deleted
# but pull it from the parsed params (as the filter system will error on None)
deleted_filter_index = filter_params.index(("deleted", "eq", "None"))
filter_params.pop(deleted_filter_index)
return []
except ValueError:
pass
# the deleted string bool was also used as an 'include deleted' flag
if deleted is True:
return [model.History.deleted == true()]
# the third option not handled here is 'return only deleted'
# if this is passed in (in the form below), simply return and let the filter system handle it
if ("deleted", "eq", "True") in filter_params:
return []
# otherwise, do the default filter of removing the deleted histories
return [model.History.deleted == false()]
[docs] def index_query(
self,
trans,
payload: HistoryIndexQueryPayload,
serialization_params: SerializationParams,
include_total_count: bool = False,
) -> Tuple[List[AnyHistoryView], int]:
"""Return a list of History accessible by the user
:rtype: list
:returns: dictionaries containing History details
"""
entries, total_matches = self.manager.index_query(trans, payload, include_total_count)
return (
[self._serialize_history(trans, entry, serialization_params, default_view="summary") for entry in entries],
total_matches,
)
[docs] def create(
self,
trans: ProvidesHistoryContext,
payload: CreateHistoryPayload,
serialization_params: SerializationParams,
):
"""Create a new history from scratch, by copying an existing one or by importing
from URL or File depending on the provided parameters in the payload.
"""
copy_this_history_id = payload.history_id
if trans.anonymous and not copy_this_history_id: # Copying/Importing histories is allowed for anonymous users
raise glx_exceptions.AuthenticationRequired("You need to be logged in to create histories.")
if trans.user and trans.user.bootstrap_admin_user:
raise glx_exceptions.RealUserRequiredException("Only real users can create histories.")
hist_name = None
if payload.name is not None:
hist_name = restore_text(payload.name)
if payload.archive_source is not None or hasattr(payload.archive_file, "file"):
archive_source = payload.archive_source
archive_file = payload.archive_file
if archive_source:
archive_type = payload.archive_type
elif archive_file is not None and hasattr(archive_file, "file"):
archive_source = archive_file.file.name
archive_type = HistoryImportArchiveSourceType.file
if isinstance(archive_file.file, SpooledTemporaryFile):
archive_source = self._save_upload_file_tmp(archive_file)
else:
raise glx_exceptions.MessageException("Please provide a url or file.")
if archive_type == HistoryImportArchiveSourceType.url:
assert archive_source
validate_uri_access(archive_source, trans.user_is_admin, trans.app.config.fetch_url_allowlist_ips)
job = self.manager.queue_history_import(trans, archive_type=archive_type, archive_source=archive_source)
job_dict = job.to_dict()
job_dict["message"] = (
f"Importing history from source '{archive_source}'. This history will be visible when the import is complete."
)
return JobImportHistoryResponse(**job_dict)
new_history = None
# if a history id was passed, copy that history
if copy_this_history_id:
original_history = self.manager.get_accessible(
copy_this_history_id, trans.user, current_history=trans.history
)
hist_name = hist_name or (f"Copy of '{original_history.name}'")
new_history = original_history.copy(
name=hist_name, target_user=trans.user, all_datasets=payload.all_datasets
)
# otherwise, create a new empty history
else:
new_history = self.manager.create(user=trans.user, name=hist_name)
trans.app.security_agent.history_set_default_permissions(new_history)
trans.sa_session.add(new_history)
with transaction(trans.sa_session):
trans.sa_session.commit()
# an anonymous user can only have one history
if self.user_manager.is_anonymous(trans.user):
self.manager.set_current(trans, new_history)
return self._serialize_history(trans, new_history, serialization_params)
[docs] def create_from_store(
self,
trans,
payload: CreateHistoryFromStore,
serialization_params: SerializationParams,
) -> AnyHistoryView:
self._ensure_can_create_history(trans)
object_tracker = self.create_objects_from_store(
trans,
payload,
)
return self._serialize_history(trans, object_tracker.new_history, serialization_params)
[docs] def create_from_store_async(
self,
trans,
payload: CreateHistoryFromStore,
) -> AsyncTaskResultSummary:
self._ensure_can_create_history(trans)
source_uri = payload_to_source_uri(payload)
request = ImportModelStoreTaskRequest(
user=trans.async_request_user,
source_uri=source_uri,
for_library=False,
model_store_format=payload.model_store_format,
)
result = import_model_store.delay(request=request, task_user_id=getattr(trans.user, "id", None))
return async_task_summary(result)
def _ensure_can_create_history(self, trans):
if trans.anonymous:
raise glx_exceptions.AuthenticationRequired("You need to be logged in to create histories.")
if trans.user and trans.user.bootstrap_admin_user:
raise glx_exceptions.RealUserRequiredException("Only real users can create histories.")
def _save_upload_file_tmp(self, upload_file) -> str:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return str(tmp_path)
[docs] def show(
self,
trans: ProvidesHistoryContext,
serialization_params: SerializationParams,
history_id: Optional[DecodedDatabaseIdField] = None,
):
"""
Returns detailed information about the history with the given encoded `id`. If no `id` is
provided, then the most recently used history will be returned.
:param history_id: the encoded id of the history to query or None to use the most recently used
:param serialization_params: contains the optional `view`, `keys` and `default_view` for serialization
:returns: detailed history information
"""
if history_id is None: # By default display the most recent history
history = self.manager.most_recent(
trans.user, filters=(model.History.deleted == false()), current_history=trans.history
)
else:
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
return self._serialize_history(trans, history, serialization_params)
[docs] def prepare_download(
self, trans: ProvidesHistoryContext, history_id: DecodedDatabaseIdField, payload: StoreExportPayload
) -> AsyncFile:
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
short_term_storage_target = model_store_storage_target(
self.short_term_storage_allocator,
history.name,
payload.model_store_format,
)
export_association = self.history_export_manager.create_export_association(history.id)
request = GenerateHistoryDownload(
history_id=history.id,
short_term_storage_request_id=short_term_storage_target.request_id,
duration=short_term_storage_target.duration,
user=trans.async_request_user,
export_association_id=export_association.id,
**payload.dict(),
)
result = prepare_history_download.delay(request=request, task_user_id=getattr(trans.user, "id", None))
task_summary = async_task_summary(result)
export_association.task_uuid = task_summary.id
with transaction(trans.sa_session):
trans.sa_session.commit()
return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=task_summary)
[docs] def write_store(
self, trans: ProvidesHistoryContext, history_id: DecodedDatabaseIdField, payload: WriteStoreToPayload
) -> AsyncTaskResultSummary:
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
export_association = self.history_export_manager.create_export_association(history.id)
request = WriteHistoryTo(
user=trans.async_request_user,
history_id=history.id,
export_association_id=export_association.id,
**payload.dict(),
)
result = write_history_to.delay(request=request, task_user_id=getattr(trans.user, "id", None))
task_summary = async_task_summary(result)
export_association.task_uuid = task_summary.id
with transaction(trans.sa_session):
trans.sa_session.commit()
return task_summary
[docs] def update(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
payload,
serialization_params: SerializationParams,
):
"""Updates the values for the history with the given ``id``
:param history_id: the encoded id of the history to update
:param payload: a dictionary containing any or all the
fields in :func:`galaxy.model.History.to_dict` and/or the following:
* annotation: an annotation for the history
:param serialization_params: contains the optional `view`, `keys` and `default_view` for serialization
:returns: an error object if an error occurred or a dictionary containing
any values that were different from the original and, therefore, updated
"""
# TODO: PUT /api/histories/{encoded_history_id} payload = { rating: rating } (w/ no security checks)
history = self.manager.get_mutable(history_id, trans.user, current_history=trans.history)
self.deserializer.deserialize(history, payload, user=trans.user, trans=trans)
return self._serialize_history(trans, history, serialization_params)
[docs] def delete(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
serialization_params: SerializationParams,
purge: bool = False,
):
"""Delete the history with the given ``id``
.. note:: Stops all active jobs in the history if purge is set.
You can purge a history, removing all it's datasets from disk (if unshared),
by passing in ``purge=True`` in the url.
"""
history = self.manager.get_mutable(history_id, trans.user, current_history=trans.history)
if purge:
self.manager.purge(history)
else:
self.manager.delete(history)
return self._serialize_history(trans, history, serialization_params)
[docs] def undelete(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
serialization_params: SerializationParams,
):
"""Undelete history (that hasn't been purged) with the given ``id``
:param history_id: the encoded id of the history to undelete
:param serialization_params: contains the optional `view`, `keys` and `default_view` for serialization
:returns: the undeleted history
"""
history = self.manager.get_mutable(history_id, trans.user, current_history=trans.history)
self.manager.undelete(history)
return self._serialize_history(trans, history, serialization_params)
[docs] def count(
self,
trans: ProvidesHistoryContext,
):
"""
Returns number of histories for the current user.
"""
current_user = self.user_manager.current_user(trans)
if self.user_manager.is_anonymous(current_user):
current_history = self.manager.get_current(trans)
return 1 if current_history else 0
return self.manager.get_active_count(current_user)
[docs] def published(
self,
trans: ProvidesHistoryContext,
serialization_params: SerializationParams,
filter_query_params: FilterQueryParams,
):
"""
Return all histories that are published. The results can be filtered.
"""
filters = self.filters.parse_query_filters(filter_query_params)
order_by = self._build_order_by(filter_query_params.order)
histories = self.manager.list_published(
filters=filters,
order_by=order_by,
limit=filter_query_params.limit,
offset=filter_query_params.offset,
)
rval = [
self._serialize_history(trans, history, serialization_params, default_view="summary")
for history in histories
]
return rval
[docs] def citations(self, trans: ProvidesHistoryContext, history_id: DecodedDatabaseIdField):
"""
Return all the citations for the tools used to produce the datasets in
the history.
"""
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
tool_ids = set()
for dataset in history.datasets:
job = dataset.creating_job
if not job:
continue
tool_id = job.tool_id
if not tool_id:
continue
tool_ids.add(tool_id)
return [citation.to_dict("bibtex") for citation in self.citations_manager.citations_for_tool_ids(tool_ids)]
[docs] def index_exports(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
use_tasks: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
):
if use_tasks:
return self.history_export_manager.get_task_exports(trans, history_id, limit, offset)
return self.history_export_manager.get_exports(trans, history_id)
[docs] def archive_export(
self,
trans,
history_id: DecodedDatabaseIdField,
payload: Optional[ExportHistoryArchivePayload] = None,
) -> Tuple[HistoryArchiveExportResult, bool]:
"""
start job (if needed) to create history export for corresponding
history.
:param history_id: the encoded id of the history to export
:returns: object containing url to fetch export from.
"""
if payload is None:
payload = ExportHistoryArchivePayload()
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
jeha = history.latest_export
exporting_to_uri = payload.directory_uri
# always just issue a new export when exporting to a URI.
up_to_date = not payload.force and not exporting_to_uri and (jeha and jeha.up_to_date)
job = None
if not up_to_date:
# Need to create new JEHA + job.
job = self.manager.queue_history_export(
trans,
history,
gzip=payload.gzip,
include_hidden=payload.include_hidden,
include_deleted=payload.include_deleted,
directory_uri=payload.directory_uri,
file_name=payload.file_name,
)
else:
job = jeha.job
ready = bool((up_to_date and jeha.ready) or exporting_to_uri)
if exporting_to_uri:
# we don't have a jeha, there will never be a download_url. Just let
# the client poll on the created job_id to determine when the file has been
# written.
return (JobIdResponse(job_id=job.id), ready)
if up_to_date and jeha.ready:
serialized_jeha = self.history_export_manager.serialize(trans, history_id, jeha)
return (JobExportHistoryArchiveModel(**serialized_jeha), ready)
else:
# Valid request, just resource is not ready yet.
if jeha:
serialized_jeha = self.history_export_manager.serialize(trans, history_id, jeha)
return (JobExportHistoryArchiveModel(**serialized_jeha), ready)
else:
assert job is not None, "logic error, don't have a jeha or a job"
return (JobIdResponse(job_id=job.id), ready)
[docs] def get_ready_history_export(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
jeha_id: Union[DecodedDatabaseIdField, LatestLiteral],
) -> model.JobExportHistoryArchive:
"""Returns the exported history archive information if it's ready
or raises an exception if not."""
return self.history_export_manager.get_ready_jeha(trans, history_id, jeha_id)
[docs] def get_archive_download_path(
self,
trans: ProvidesHistoryContext,
jeha: model.JobExportHistoryArchive,
) -> str:
"""
If ready and available, return raw contents of exported history
using a generator function.
"""
return self.manager.get_ready_history_export_file_path(trans, jeha)
[docs] def get_archive_media_type(self, jeha: model.JobExportHistoryArchive):
media_type = "application/x-tar"
if jeha.compressed:
media_type = "application/x-gzip"
return media_type
# TODO: remove this function and HistoryManager.legacy_serve_ready_history_export when
# removing the legacy HistoriesController
[docs] def legacy_archive_download(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
jeha_id: DecodedDatabaseIdField,
):
"""
If ready and available, return raw contents of exported history.
"""
jeha = self.history_export_manager.get_ready_jeha(trans, history_id, jeha_id)
return self.manager.legacy_serve_ready_history_export(trans, jeha)
[docs] def get_custom_builds_metadata(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
) -> CustomBuildsMetadataResponse:
"""
Returns metadata for custom builds.
"""
history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)
installed_builds = []
for build in glob.glob(os.path.join(trans.app.config.len_file_path, "*.len")):
installed_builds.append(os.path.basename(build).split(".len")[0])
fasta_hdas = get_fasta_hdas_by_history(trans.sa_session, history.id)
return CustomBuildsMetadataResponse(
installed_builds=[LabelValuePair(label=ins, value=ins) for ins in installed_builds],
fasta_hdas=[
LabelValuePair(label=f"{hda.hid}: {hda.name}", value=trans.security.encode_id(hda.id))
for hda in fasta_hdas
],
)
def _serialize_history(
self,
trans: ProvidesHistoryContext,
history: model.History,
serialization_params: SerializationParams,
default_view: str = "detailed",
) -> AnyHistoryView:
"""
Returns a dictionary with the corresponding values depending on the
serialization parameters provided.
"""
serialization_params.default_view = default_view
serialized_history = self.serializer.serialize_to_view(
history, user=trans.user, trans=trans, encode_id=False, **serialization_params.model_dump()
)
return serialized_history
def _build_order_by(self, order: Optional[str]):
return self.build_order_by(self.manager, order or DEFAULT_ORDER_BY)
[docs] def archive_history(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
payload: Optional[ArchiveHistoryRequestPayload] = None,
) -> AnyArchivedHistoryView:
"""Marks the history with the given id as archived and optionally associates it with the given archive export record in the payload.
Archived histories are not part of the active histories of the user, so they won't be shown to the user by default.
"""
if trans.anonymous:
raise glx_exceptions.AuthenticationRequired("Only registered users can archive histories.")
history = self.manager.get_owned(history_id, trans.user)
if history.archived:
raise glx_exceptions.Conflict("History is already archived.")
archive_export_id = payload.archive_export_id if payload else None
if archive_export_id:
export_record = self.history_export_manager.get_task_export_by_id(archive_export_id)
self._ensure_export_record_can_be_associated_with_history_archival(history_id, export_record)
# After this point, the export record is valid and can be associated with the history archival
purge_history = payload.purge_history if payload else False
if purge_history:
if archive_export_id is None:
raise glx_exceptions.RequestParameterMissingException(
"Cannot purge history without an export record. A valid archive_export_id is required."
)
self.manager.purge(history)
history = self.manager.archive_history(history, archive_export_id=archive_export_id)
return self._serialize_archived_history(trans, history)
def _ensure_export_record_can_be_associated_with_history_archival(
self, history_id: int, export_record: model.StoreExportAssociation
):
if export_record.object_id != history_id or export_record.object_type != "history":
raise glx_exceptions.RequestParameterInvalidException(
"The given archive export record does not belong to this history."
)
export_metadata = self.history_export_manager.get_record_metadata(export_record)
if export_metadata is None:
log.error(
f"Trying to archive history [{history_id}] with an export record. "
f"But the given archive export record [{export_record.id}] does not have the required metadata."
)
raise glx_exceptions.RequestParameterInvalidException(
"The given archive export record does not have the required metadata."
)
if not export_metadata.is_ready():
raise glx_exceptions.RequestParameterInvalidException(
"The given archive export record must be ready before it can be used to archive a history. "
"Please wait for the export to finish and try again."
)
if export_metadata.is_short_term():
raise glx_exceptions.RequestParameterInvalidException(
"The given archive export record is temporal, only persistent sources can be used to archive a history."
)
# TODO: should we also ensure the export was requested to include files with `include_files`, `include_hidden`, etc.?
[docs] def restore_archived_history(
self,
trans: ProvidesHistoryContext,
history_id: DecodedDatabaseIdField,
force: Optional[bool] = False,
) -> AnyHistoryView:
if trans.anonymous:
raise glx_exceptions.AuthenticationRequired("Only registered users can access archived histories.")
history = self.manager.get_owned(history_id, trans.user)
history = self.manager.restore_archived_history(history, force=force or False)
return self._serialize_archived_history(trans, history)
[docs] def get_archived_histories(
self,
trans: ProvidesHistoryContext,
serialization_params: SerializationParams,
filter_query_params: FilterQueryParams,
include_total_matches: bool = False,
) -> Tuple[List[AnyArchivedHistoryView], Optional[int]]:
if trans.anonymous:
raise glx_exceptions.AuthenticationRequired("Only registered users can have or access archived histories.")
filters = self.filters.parse_query_filters(filter_query_params)
filters += [
model.History.user == trans.user,
model.History.archived == true(),
]
total_matches = self.manager.count(filters=filters) if include_total_matches else None
order_by = self._build_order_by(filter_query_params.order)
histories = self.manager.list(
filters=filters, order_by=order_by, limit=filter_query_params.limit, offset=filter_query_params.offset
)
histories = [self._serialize_archived_history(trans, history, serialization_params) for history in histories]
return histories, total_matches
def _serialize_archived_history(
self,
trans: ProvidesHistoryContext,
history: model.History,
serialization_params: Optional[SerializationParams] = None,
):
if serialization_params is None:
serialization_params = SerializationParams(default_view="summary")
archived_history = self.serializer.serialize_to_view(
history, user=trans.user, trans=trans, **serialization_params.dict()
)
export_record_data = self._get_export_record_data(history)
archived_history["export_record_data"] = export_record_data.dict() if export_record_data else None
return archived_history
def _get_export_record_data(self, history: model.History) -> Optional[WriteStoreToPayload]:
if history.archive_export_id:
export_record = self.history_export_manager.get_task_export_by_id(history.archive_export_id)
export_metadata = self.history_export_manager.get_record_metadata(export_record)
if export_metadata and isinstance(export_metadata.request_data.payload, WriteStoreToPayload):
return export_metadata.request_data.payload
return None
[docs]def get_fasta_hdas_by_history(session: Session, history_id: int):
stmt = (
select(HistoryDatasetAssociation)
.filter_by(history_id=history_id, extension="fasta", deleted=False)
.order_by(HistoryDatasetAssociation.hid.desc())
)
return session.scalars(stmt).all()