Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.services.invocations
import json
import logging
from typing import (
Any,
Dict,
List,
Tuple,
)
from pydantic import Field
from galaxy.celery.tasks import (
prepare_invocation_download,
write_invocation_to,
)
from galaxy.exceptions import (
AdminRequiredException,
InconsistentDatabase,
ObjectNotFound,
)
from galaxy.managers.context import (
ProvidesHistoryContext,
ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.jobs import (
fetch_job_states,
get_job_metrics_for_invocation,
invocation_job_source_iter,
summarize_metrics,
)
from galaxy.managers.workflows import WorkflowsManager
from galaxy.model import (
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
WorkflowInvocation,
WorkflowInvocationStep,
WorkflowRequestInputParameter,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.invocation import (
CreateInvocationFromStore,
InvocationSerializationParams,
InvocationSerializationView,
InvocationStep,
WorkflowInvocationRequestModel,
WorkflowInvocationResponse,
)
from galaxy.schema.schema import (
AsyncFile,
AsyncTaskResultSummary,
BcoGenerationParametersMixin,
InvocationIndexQueryPayload,
StoreExportPayload,
WriteStoreToPayload,
)
from galaxy.schema.tasks import (
GenerateInvocationDownload,
WriteInvocationTo,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.short_term_storage import ShortTermStorageAllocator
from galaxy.webapps.galaxy.services.base import (
async_task_summary,
ConsumesModelStores,
ensure_celery_tasks_enabled,
model_store_storage_target,
ServiceBase,
)
log = logging.getLogger(__name__)
[docs]
class InvocationIndexPayload(InvocationIndexQueryPayload):
instance: bool = Field(default=False, description="Is provided workflow id for Workflow instead of StoredWorkflow?")
[docs]
class InvocationsService(ServiceBase, ConsumesModelStores):
[docs]
def __init__(
self,
security: IdEncodingHelper,
histories_manager: HistoryManager,
workflows_manager: WorkflowsManager,
short_term_storage_allocator: ShortTermStorageAllocator,
):
super().__init__(security=security)
self._histories_manager = histories_manager
self._workflows_manager = workflows_manager
self.short_term_storage_allocator = short_term_storage_allocator
[docs]
def index(
self, trans, invocation_payload: InvocationIndexPayload, serialization_params: InvocationSerializationParams
) -> Tuple[List[WorkflowInvocationResponse], int]:
workflow_id = invocation_payload.workflow_id
if invocation_payload.instance:
instance = invocation_payload.instance
invocation_payload.workflow_id = self._workflows_manager.get_stored_workflow(
trans, workflow_id, by_stored_id=not instance
).id
if invocation_payload.history_id:
# access check
self._histories_manager.get_accessible(
invocation_payload.history_id, trans.user, current_history=trans.history
)
if not trans.user_is_admin:
# We restrict the query to the current users' invocations
# Endpoint requires user login, so trans.user.id is never None
# TODO: user_id should be optional!
user_id = trans.user.id
if invocation_payload.user_id and invocation_payload.user_id != user_id:
raise AdminRequiredException("Only admins can index the invocations of others")
else:
# Get all invocations if user is admin (and user_id is None).
# xref https://github.com/galaxyproject/galaxy/pull/13862#discussion_r865732297
user_id = invocation_payload.user_id
invocations, total_matches = self._workflows_manager.build_invocations_query(
trans,
stored_workflow_id=invocation_payload.workflow_id,
history_id=invocation_payload.history_id,
job_id=invocation_payload.job_id,
user_id=user_id,
include_terminal=invocation_payload.include_terminal,
limit=invocation_payload.limit,
offset=invocation_payload.offset,
sort_by=invocation_payload.sort_by,
sort_desc=invocation_payload.sort_desc,
include_nested_invocations=invocation_payload.include_nested_invocations,
check_ownership=False,
)
invocation_dict = self.serialize_workflow_invocations(invocations, serialization_params)
return invocation_dict, total_matches
[docs]
def show(self, trans, invocation_id, serialization_params, eager=False):
wfi = self._workflows_manager.get_invocation(
trans, invocation_id, eager, check_ownership=False, check_accessible=True
)
return self.serialize_workflow_invocation(wfi, serialization_params)
[docs]
def as_request(self, trans: ProvidesUserContext, invocation_id) -> WorkflowInvocationRequestModel:
wfi = self._workflows_manager.get_invocation(
trans, invocation_id, True, check_ownership=True, check_accessible=True
)
return self.serialize_workflow_invocation_to_request(trans, wfi)
[docs]
def cancel(self, trans, invocation_id, serialization_params):
wfi = self._workflows_manager.request_invocation_cancellation(trans, invocation_id)
return self.serialize_workflow_invocation(wfi, serialization_params)
[docs]
def show_invocation_report(self, trans, invocation_id, format="json"):
wfi_report = self._workflows_manager.get_invocation_report(trans, invocation_id, format=format)
return wfi_report
[docs]
def show_invocation_step(self, trans, step_id) -> InvocationStep:
wfi_step = self._workflows_manager.get_invocation_step(
trans, step_id, check_ownership=False, check_accessible=True
)
return self.serialize_workflow_invocation_step(wfi_step)
[docs]
def show_invocation_metrics(self, trans: ProvidesHistoryContext, invocation_id: int):
extended_job_metrics = get_job_metrics_for_invocation(trans.sa_session, invocation_id)
job_metrics = []
job_ids = []
tool_ids = []
step_indexes = []
step_labels = []
for row in extended_job_metrics:
step_indexes.append(row[0])
job_ids.append(row[1])
tool_ids.append(row[2])
step_labels.append(row[3])
job_metrics.append(row[4])
metrics_dict_list = summarize_metrics(trans, job_metrics)
for tool_id, job_id, step_index, step_label, metrics_dict in zip(
tool_ids, job_ids, step_indexes, step_labels, metrics_dict_list
):
metrics_dict["tool_id"] = tool_id
metrics_dict["job_id"] = trans.security.encode_id(job_id)
metrics_dict["step_index"] = step_index
metrics_dict["step_label"] = step_label
return metrics_dict_list
[docs]
def update_invocation_step(self, trans, step_id, action):
wfi_step = self._workflows_manager.update_invocation_step(trans, step_id, action)
return self.serialize_workflow_invocation_step(wfi_step)
[docs]
def show_invocation_step_jobs_summary(self, trans, invocation_id) -> List[Dict[str, Any]]:
ids = []
types = []
for job_source_type, job_source_id, _ in invocation_job_source_iter(trans.sa_session, invocation_id):
ids.append(job_source_id)
types.append(job_source_type)
return fetch_job_states(trans.sa_session, ids, types)
[docs]
def show_invocation_jobs_summary(self, trans, invocation_id) -> Dict[str, Any]:
ids = [invocation_id]
types = ["WorkflowInvocation"]
return fetch_job_states(trans.sa_session, ids, types)[0]
[docs]
def prepare_store_download(
self, trans, invocation_id: DecodedDatabaseIdField, payload: PrepareStoreDownloadPayload
) -> AsyncFile:
ensure_celery_tasks_enabled(trans.app.config)
model_store_format = payload.model_store_format
workflow_invocation = self._workflows_manager.get_invocation(
trans, invocation_id, eager=True, check_ownership=False, check_accessible=True
)
if not workflow_invocation:
raise ObjectNotFound()
try:
invocation_name = f"Invocation of {workflow_invocation.workflow.stored_workflow.name} at {workflow_invocation.create_time.isoformat()}"
except AttributeError:
invocation_name = f"Invocation of workflow at {workflow_invocation.create_time.isoformat()}"
short_term_storage_target = model_store_storage_target(
self.short_term_storage_allocator,
invocation_name,
model_store_format,
)
request = GenerateInvocationDownload(
short_term_storage_request_id=short_term_storage_target.request_id,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
galaxy_url=trans.request.url_path,
**payload.model_dump(),
)
result = prepare_invocation_download.delay(request=request, task_user_id=getattr(trans.user, "id", None))
return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=async_task_summary(result))
[docs]
def write_store(
self, trans, invocation_id: DecodedDatabaseIdField, payload: WriteInvocationStoreToPayload
) -> AsyncTaskResultSummary:
ensure_celery_tasks_enabled(trans.app.config)
workflow_invocation = self._workflows_manager.get_invocation(
trans, invocation_id, eager=True, check_ownership=False, check_accessible=True
)
if not workflow_invocation:
raise ObjectNotFound()
request = WriteInvocationTo(
galaxy_url=trans.request.url_path,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
**payload.model_dump(),
)
result = write_invocation_to.delay(request=request, task_user_id=getattr(trans.user, "id", None))
rval = async_task_summary(result)
return rval
[docs]
def serialize_workflow_invocation(
self,
invocation: WorkflowInvocation,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.element,
):
view = params.view or default_view
step_details = params.step_details
legacy_job_state = params.legacy_job_state
as_dict = invocation.to_dict(view.value, step_details=step_details, legacy_job_state=legacy_job_state)
as_dict["messages"] = invocation.messages
return WorkflowInvocationResponse(**as_dict)
[docs]
def serialize_workflow_invocations(
self,
invocations,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.collection,
):
return [self.serialize_workflow_invocation(i, params, default_view=default_view) for i in invocations]
[docs]
def serialize_workflow_invocation_step(
self,
invocation_step: WorkflowInvocationStep,
):
return invocation_step.to_dict("element")
[docs]
def create_from_store(
self,
trans: ProvidesHistoryContext,
payload: CreateInvocationFromStore,
serialization_params: InvocationSerializationParams,
):
history = self._histories_manager.get_owned(payload.history_id, trans.user, current_history=trans.history)
object_tracker = self.create_objects_from_store(
trans,
payload,
history=history,
)
return self.serialize_workflow_invocations(object_tracker.invocations_by_key.values(), serialization_params)
[docs]
def serialize_workflow_invocation_to_request(
self, trans: ProvidesUserContext, invocation: WorkflowInvocation
) -> WorkflowInvocationRequestModel:
history_id = trans.security.encode_id(invocation.history.id)
workflow_id = trans.security.encode_id(invocation.workflow.id)
inputs, inputs_by = invocation.recover_inputs()
export_inputs = {}
for key, value in inputs.items():
if isinstance(value, HistoryDatasetAssociation):
export_inputs[key] = {"src": "hda", "id": trans.security.encode_id(value.id)}
elif isinstance(value, HistoryDatasetCollectionAssociation):
export_inputs[key] = {"src": "hdca", "id": trans.security.encode_id(value.id)}
else:
export_inputs[key] = value
param_types = WorkflowRequestInputParameter.types
steps_by_id = invocation.workflow.steps_by_id
replacement_dict = {}
resource_params = {}
use_cached_job = False
preferred_object_store_id = None
preferred_intermediate_object_store_id = None
preferred_outputs_object_store_id = None
step_param_map: Dict[str, Dict] = {}
for parameter in invocation.input_parameters:
parameter_type = parameter.type
if parameter_type == param_types.REPLACEMENT_PARAMETERS:
replacement_dict[parameter.name] = parameter.value
elif parameter_type == param_types.META_PARAMETERS:
# copy_inputs_to_history is being skipped here sort of intentionally,
# we wouldn't want to include this on re-run.
if parameter.name == "use_cached_job":
use_cached_job = parameter.value == "true"
if parameter.name == "preferred_object_store_id":
preferred_object_store_id = parameter.value
if parameter.name == "preferred_intermediate_object_store_id":
preferred_intermediate_object_store_id = parameter.value
if parameter.name == "preferred_outputs_object_store_id":
preferred_outputs_object_store_id = parameter.value
elif parameter_type == param_types.RESOURCE_PARAMETERS:
resource_params[parameter.name] = parameter.value
elif parameter_type == param_types.STEP_PARAMETERS:
# TODO: test subworkflows and ensure this works
step_id: int
try:
step_id = int(parameter.name)
except TypeError:
raise InconsistentDatabase("saved workflow step parameters not in the format expected")
step_param_map[str(steps_by_id[step_id].order_index)] = json.loads(parameter.value)
return WorkflowInvocationRequestModel(
history_id=history_id,
workflow_id=workflow_id,
instance=True, # this is a workflow ID and not a stored workflow ID
inputs=export_inputs,
inputs_by=inputs_by,
replacement_params=None if not replacement_dict else replacement_dict,
resource_params=None if not resource_params else resource_params,
use_cached_job=use_cached_job,
preferred_object_store_id=preferred_object_store_id,
preferred_intermediate_object_store_id=preferred_intermediate_object_store_id,
preferred_outputs_object_store_id=preferred_outputs_object_store_id,
parameters_normalized=True,
parameters=None if not step_param_map else step_param_map,
)