import logging
from enum import Enum
from tempfile import NamedTemporaryFile
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from pydantic import (
BaseModel,
Field,
)
from galaxy.celery.tasks import (
prepare_invocation_download,
write_invocation_to,
)
from galaxy.exceptions import (
AdminRequiredException,
ObjectNotFound,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.workflows import WorkflowsManager
from galaxy.model import WorkflowInvocation
from galaxy.model.store import (
BcoExportOptions,
get_export_store_factory,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.invocation import InvocationMessageResponseModel
from galaxy.schema.schema import (
AsyncFile,
AsyncTaskResultSummary,
BcoGenerationParametersMixin,
InvocationIndexQueryPayload,
StoreExportPayload,
WriteStoreToPayload,
)
from galaxy.schema.tasks import (
GenerateInvocationDownload,
WriteInvocationTo,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.web.short_term_storage import ShortTermStorageAllocator
from galaxy.webapps.galaxy.services.base import (
async_task_summary,
ensure_celery_tasks_enabled,
model_store_storage_target,
ServiceBase,
)
log = logging.getLogger(__name__)
[docs]class InvocationSerializationView(str, Enum):
element = "element"
collection = "collection"
[docs]class InvocationSerializationParams(BaseModel):
"""Contains common parameters for customizing model serialization."""
view: Optional[InvocationSerializationView] = Field(
default=None,
title="View",
description=(
"The name of the view used to serialize this item. "
"This will return a predefined set of attributes of the item."
),
example="element",
)
step_details: bool = Field(
default=False, title="Include step details", description="Include details for individual invocation steps."
)
legacy_job_state: bool = Field(
default=False,
deprecated=True,
)
[docs]class InvocationIndexPayload(InvocationIndexQueryPayload):
instance: bool = Field(default=False, description="Is provided workflow id for Workflow instead of StoredWorkflow?")
[docs]class PrepareStoreDownloadPayload(StoreExportPayload, BcoGenerationParametersMixin):
pass
[docs]class WriteInvocationStoreToPayload(WriteStoreToPayload, BcoGenerationParametersMixin):
pass
[docs]class InvocationsService(ServiceBase):
[docs] def __init__(
self,
security: IdEncodingHelper,
histories_manager: HistoryManager,
workflows_manager: WorkflowsManager,
short_term_storage_allocator: ShortTermStorageAllocator,
):
super().__init__(security=security)
self._histories_manager = histories_manager
self._workflows_manager = workflows_manager
self.short_term_storage_allocator = short_term_storage_allocator
[docs] def index(
self, trans, invocation_payload: InvocationIndexPayload, serialization_params: InvocationSerializationParams
) -> Tuple[List[Dict[str, Any]], int]:
workflow_id = invocation_payload.workflow_id
if invocation_payload.instance:
instance = invocation_payload.instance
invocation_payload.workflow_id = self._workflows_manager.get_stored_workflow(
trans, workflow_id, by_stored_id=not instance
).id
if invocation_payload.history_id:
# access check
self._histories_manager.get_accessible(
invocation_payload.history_id, trans.user, current_history=trans.history
)
if not trans.user_is_admin:
# We restrict the query to the current users' invocations
# Endpoint requires user login, so trans.user.id is never None
# TODO: user_id should be optional!
user_id = trans.user.id
if invocation_payload.user_id and invocation_payload.user_id != user_id:
raise AdminRequiredException("Only admins can index the invocations of others")
else:
# Get all invocations if user is admin (and user_id is None).
# xref https://github.com/galaxyproject/galaxy/pull/13862#discussion_r865732297
user_id = invocation_payload.user_id
invocations, total_matches = self._workflows_manager.build_invocations_query(
trans,
stored_workflow_id=invocation_payload.workflow_id,
history_id=invocation_payload.history_id,
job_id=invocation_payload.job_id,
user_id=user_id,
include_terminal=invocation_payload.include_terminal,
limit=invocation_payload.limit,
offset=invocation_payload.offset,
sort_by=invocation_payload.sort_by,
sort_desc=invocation_payload.sort_desc,
)
invocation_dict = self.serialize_workflow_invocations(invocations, serialization_params)
return invocation_dict, total_matches
[docs] def prepare_store_download(
self, trans, invocation_id: DecodedDatabaseIdField, payload: PrepareStoreDownloadPayload
) -> AsyncFile:
ensure_celery_tasks_enabled(trans.app.config)
model_store_format = payload.model_store_format
workflow_invocation = self._workflows_manager.get_invocation(trans, invocation_id, eager=True)
if not workflow_invocation:
raise ObjectNotFound()
try:
invocation_name = f"Invocation of {workflow_invocation.workflow.stored_workflow.name} at {workflow_invocation.create_time.isoformat()}"
except AttributeError:
invocation_name = f"Invocation of workflow at {workflow_invocation.create_time.isoformat()}"
short_term_storage_target = model_store_storage_target(
self.short_term_storage_allocator,
invocation_name,
model_store_format,
)
request = GenerateInvocationDownload(
short_term_storage_request_id=short_term_storage_target.request_id,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
galaxy_url=trans.request.url_path,
**payload.dict(),
)
result = prepare_invocation_download.delay(request=request)
return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=async_task_summary(result))
[docs] def write_store(
self, trans, invocation_id: DecodedDatabaseIdField, payload: WriteInvocationStoreToPayload
) -> AsyncTaskResultSummary:
ensure_celery_tasks_enabled(trans.app.config)
workflow_invocation = self._workflows_manager.get_invocation(trans, invocation_id, eager=True)
if not workflow_invocation:
raise ObjectNotFound()
request = WriteInvocationTo(
galaxy_url=trans.request.url_path,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
**payload.dict(),
)
result = write_invocation_to.delay(request=request)
rval = async_task_summary(result)
return rval
[docs] def serialize_workflow_invocation(
self,
invocation: WorkflowInvocation,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.element,
):
view = params.view or default_view
step_details = params.step_details
legacy_job_state = params.legacy_job_state
as_dict = invocation.to_dict(view.value, step_details=step_details, legacy_job_state=legacy_job_state)
as_dict = self.security.encode_all_ids(as_dict, recursive=True)
as_dict["messages"] = [
InvocationMessageResponseModel.parse_obj(message).__root__.dict() for message in invocation.messages
]
return as_dict
[docs] def serialize_workflow_invocations(
self,
invocations,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.collection,
):
return list(
map(lambda i: self.serialize_workflow_invocation(i, params, default_view=default_view), invocations)
)
# TODO: remove this after 23.1 release
[docs] def deprecated_generate_invocation_bco(
self,
trans,
invocation_id: DecodedDatabaseIdField,
export_options: BcoExportOptions,
):
workflow_invocation = self._workflows_manager.get_invocation(trans, invocation_id, eager=True)
if not workflow_invocation:
raise ObjectNotFound()
with NamedTemporaryFile() as export_target:
with get_export_store_factory(trans.app, "bco.json", bco_export_options=export_options)(
export_target.name
) as export_store:
export_store.export_workflow_invocation(workflow_invocation)
export_target.seek(0)
return export_target.read()