import json
import logging
from typing import (
Any,
Dict,
List,
Tuple,
)
from pydantic import Field
from galaxy.celery.tasks import (
prepare_invocation_download,
write_invocation_to,
)
from galaxy.exceptions import (
AdminRequiredException,
InconsistentDatabase,
ObjectNotFound,
)
from galaxy.managers.context import (
ProvidesHistoryContext,
ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.jobs import (
fetch_job_states,
get_job_metrics_for_invocation,
invocation_job_source_iter,
summarize_metrics,
)
from galaxy.managers.workflows import WorkflowsManager
from galaxy.model import (
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
WorkflowInvocation,
WorkflowInvocationStep,
WorkflowRequestInputParameter,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.invocation import (
CreateInvocationFromStore,
InvocationSerializationParams,
InvocationSerializationView,
InvocationStep,
WorkflowInvocationRequestModel,
WorkflowInvocationResponse,
)
from galaxy.schema.schema import (
AsyncFile,
AsyncTaskResultSummary,
BcoGenerationParametersMixin,
InvocationIndexQueryPayload,
StoreExportPayload,
WriteStoreToPayload,
)
from galaxy.schema.tasks import (
GenerateInvocationDownload,
WriteInvocationTo,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.short_term_storage import ShortTermStorageAllocator
from galaxy.webapps.galaxy.services.base import (
async_task_summary,
ConsumesModelStores,
ensure_celery_tasks_enabled,
model_store_storage_target,
ServiceBase,
)
log = logging.getLogger(__name__)
[docs]
class InvocationIndexPayload(InvocationIndexQueryPayload):
instance: bool = Field(default=False, description="Is provided workflow id for Workflow instead of StoredWorkflow?")
[docs]
class PrepareStoreDownloadPayload(StoreExportPayload, BcoGenerationParametersMixin):
pass
[docs]
class WriteInvocationStoreToPayload(WriteStoreToPayload, BcoGenerationParametersMixin):
pass
[docs]
class InvocationsService(ServiceBase, ConsumesModelStores):
[docs]
def __init__(
self,
security: IdEncodingHelper,
histories_manager: HistoryManager,
workflows_manager: WorkflowsManager,
short_term_storage_allocator: ShortTermStorageAllocator,
):
super().__init__(security=security)
self._histories_manager = histories_manager
self._workflows_manager = workflows_manager
self.short_term_storage_allocator = short_term_storage_allocator
[docs]
def index(
self, trans, invocation_payload: InvocationIndexPayload, serialization_params: InvocationSerializationParams
) -> Tuple[List[WorkflowInvocationResponse], int]:
workflow_id = invocation_payload.workflow_id
if invocation_payload.instance:
instance = invocation_payload.instance
invocation_payload.workflow_id = self._workflows_manager.get_stored_workflow(
trans, workflow_id, by_stored_id=not instance
).id
if invocation_payload.history_id:
# access check
self._histories_manager.get_accessible(
invocation_payload.history_id, trans.user, current_history=trans.history
)
if not trans.user_is_admin:
# We restrict the query to the current users' invocations
# Endpoint requires user login, so trans.user.id is never None
# TODO: user_id should be optional!
user_id = trans.user.id
if invocation_payload.user_id and invocation_payload.user_id != user_id:
raise AdminRequiredException("Only admins can index the invocations of others")
else:
# Get all invocations if user is admin (and user_id is None).
# xref https://github.com/galaxyproject/galaxy/pull/13862#discussion_r865732297
user_id = invocation_payload.user_id
invocations, total_matches = self._workflows_manager.build_invocations_query(
trans,
stored_workflow_id=invocation_payload.workflow_id,
history_id=invocation_payload.history_id,
job_id=invocation_payload.job_id,
user_id=user_id,
include_terminal=invocation_payload.include_terminal,
limit=invocation_payload.limit,
offset=invocation_payload.offset,
sort_by=invocation_payload.sort_by,
sort_desc=invocation_payload.sort_desc,
include_nested_invocations=invocation_payload.include_nested_invocations,
check_ownership=False,
)
invocation_dict = self.serialize_workflow_invocations(invocations, serialization_params)
return invocation_dict, total_matches
[docs]
def show(self, trans, invocation_id, serialization_params):
wfi = self._workflows_manager.get_invocation(trans, invocation_id, check_ownership=False, check_accessible=True)
return self.serialize_workflow_invocation(wfi, serialization_params)
[docs]
def as_request(self, trans: ProvidesUserContext, invocation_id) -> WorkflowInvocationRequestModel:
wfi = self._workflows_manager.get_invocation(trans, invocation_id, check_ownership=True, check_accessible=True)
return self.serialize_workflow_invocation_to_request(trans, wfi)
[docs]
def cancel(self, trans, invocation_id, serialization_params):
wfi = self._workflows_manager.request_invocation_cancellation(trans, invocation_id)
return self.serialize_workflow_invocation(wfi, serialization_params)
[docs]
def show_invocation_report(self, trans, invocation_id, format="json"):
wfi_report = self._workflows_manager.get_invocation_report(trans, invocation_id, format=format)
return wfi_report
[docs]
def show_invocation_step(self, trans, step_id) -> InvocationStep:
wfi_step = self._workflows_manager.get_invocation_step(
trans, step_id, check_ownership=False, check_accessible=True
)
return self.serialize_workflow_invocation_step(wfi_step)
[docs]
def show_invocation_metrics(self, trans: ProvidesHistoryContext, invocation_id: int):
extended_job_metrics = get_job_metrics_for_invocation(trans.sa_session, invocation_id)
job_metrics = []
job_ids = []
tool_ids = []
step_indexes = []
step_labels = []
for row in extended_job_metrics:
step_indexes.append(row[0])
job_ids.append(row[1])
tool_ids.append(row[2])
step_labels.append(row[3])
job_metrics.append(row[4])
metrics_dict_list = summarize_metrics(trans, job_metrics)
for tool_id, job_id, step_index, step_label, metrics_dict in zip(
tool_ids, job_ids, step_indexes, step_labels, metrics_dict_list
):
metrics_dict["tool_id"] = tool_id
metrics_dict["job_id"] = trans.security.encode_id(job_id)
metrics_dict["step_index"] = step_index
metrics_dict["step_label"] = step_label
return metrics_dict_list
[docs]
def update_invocation_step(self, trans, step_id, action):
wfi_step = self._workflows_manager.update_invocation_step(trans, step_id, action)
return self.serialize_workflow_invocation_step(wfi_step)
[docs]
def show_invocation_step_jobs_summary(self, trans, invocation_id) -> List[Dict[str, Any]]:
ids = []
types = []
for job_source_type, job_source_id, _ in invocation_job_source_iter(trans.sa_session, invocation_id):
ids.append(job_source_id)
types.append(job_source_type)
return fetch_job_states(trans.sa_session, ids, types)
[docs]
def show_invocation_jobs_summary(self, trans, invocation_id) -> Dict[str, Any]:
ids = [invocation_id]
types = ["WorkflowInvocation"]
return fetch_job_states(trans.sa_session, ids, types)[0]
[docs]
def prepare_store_download(
self, trans, invocation_id: DecodedDatabaseIdField, payload: PrepareStoreDownloadPayload
) -> AsyncFile:
ensure_celery_tasks_enabled(trans.app.config)
model_store_format = payload.model_store_format
workflow_invocation = self._workflows_manager.get_invocation(
trans, invocation_id, check_ownership=False, check_accessible=True
)
if not workflow_invocation:
raise ObjectNotFound()
try:
invocation_name = f"Invocation of {workflow_invocation.workflow.stored_workflow.name} at {workflow_invocation.create_time.isoformat()}"
except AttributeError:
invocation_name = f"Invocation of workflow at {workflow_invocation.create_time.isoformat()}"
short_term_storage_target = model_store_storage_target(
self.short_term_storage_allocator,
invocation_name,
model_store_format,
)
request = GenerateInvocationDownload(
short_term_storage_request_id=short_term_storage_target.request_id,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
galaxy_url=trans.request.url_path,
**payload.model_dump(),
)
result = prepare_invocation_download.delay(request=request, task_user_id=getattr(trans.user, "id", None))
return AsyncFile(storage_request_id=short_term_storage_target.request_id, task=async_task_summary(result))
[docs]
def write_store(
self, trans, invocation_id: DecodedDatabaseIdField, payload: WriteInvocationStoreToPayload
) -> AsyncTaskResultSummary:
ensure_celery_tasks_enabled(trans.app.config)
workflow_invocation = self._workflows_manager.get_invocation(
trans, invocation_id, check_ownership=False, check_accessible=True
)
if not workflow_invocation:
raise ObjectNotFound()
request = WriteInvocationTo(
galaxy_url=trans.request.url_path,
user=trans.async_request_user,
invocation_id=workflow_invocation.id,
**payload.model_dump(),
)
result = write_invocation_to.delay(request=request, task_user_id=getattr(trans.user, "id", None))
rval = async_task_summary(result)
return rval
[docs]
def serialize_workflow_invocation(
self,
invocation: WorkflowInvocation,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.element,
):
view = params.view or default_view
step_details = params.step_details
legacy_job_state = params.legacy_job_state
as_dict = invocation.to_dict(view.value, step_details=step_details, legacy_job_state=legacy_job_state)
as_dict["messages"] = invocation.messages
return WorkflowInvocationResponse(**as_dict)
[docs]
def serialize_workflow_invocations(
self,
invocations,
params: InvocationSerializationParams,
default_view: InvocationSerializationView = InvocationSerializationView.collection,
):
return [self.serialize_workflow_invocation(i, params, default_view=default_view) for i in invocations]
[docs]
def serialize_workflow_invocation_step(
self,
invocation_step: WorkflowInvocationStep,
):
return invocation_step.to_dict("element")
[docs]
def create_from_store(
self,
trans: ProvidesHistoryContext,
payload: CreateInvocationFromStore,
serialization_params: InvocationSerializationParams,
):
history = self._histories_manager.get_owned(payload.history_id, trans.user, current_history=trans.history)
object_tracker = self.create_objects_from_store(
trans,
payload,
history=history,
)
return self.serialize_workflow_invocations(object_tracker.invocations_by_key.values(), serialization_params)
[docs]
def serialize_workflow_invocation_to_request(
self, trans: ProvidesUserContext, invocation: WorkflowInvocation
) -> WorkflowInvocationRequestModel:
history_id = trans.security.encode_id(invocation.history.id)
workflow_id = trans.security.encode_id(invocation.workflow.id)
inputs, inputs_by = invocation.recover_inputs()
export_inputs = {}
for key, value in inputs.items():
if isinstance(value, HistoryDatasetAssociation):
export_inputs[key] = {"src": "hda", "id": trans.security.encode_id(value.id)}
elif isinstance(value, HistoryDatasetCollectionAssociation):
export_inputs[key] = {"src": "hdca", "id": trans.security.encode_id(value.id)}
else:
export_inputs[key] = value
param_types = WorkflowRequestInputParameter.types
steps_by_id = invocation.workflow.steps_by_id
replacement_dict = {}
resource_params = {}
use_cached_job = False
preferred_object_store_id = None
preferred_intermediate_object_store_id = None
preferred_outputs_object_store_id = None
step_param_map: Dict[str, Dict] = {}
for parameter in invocation.input_parameters:
parameter_type = parameter.type
if parameter_type == param_types.REPLACEMENT_PARAMETERS:
replacement_dict[parameter.name] = parameter.value
elif parameter_type == param_types.META_PARAMETERS:
# copy_inputs_to_history is being skipped here sort of intentionally,
# we wouldn't want to include this on re-run.
if parameter.name == "use_cached_job":
use_cached_job = parameter.value == "true"
if parameter.name == "preferred_object_store_id":
preferred_object_store_id = parameter.value
if parameter.name == "preferred_intermediate_object_store_id":
preferred_intermediate_object_store_id = parameter.value
if parameter.name == "preferred_outputs_object_store_id":
preferred_outputs_object_store_id = parameter.value
elif parameter_type == param_types.RESOURCE_PARAMETERS:
resource_params[parameter.name] = parameter.value
elif parameter_type == param_types.STEP_PARAMETERS:
# TODO: test subworkflows and ensure this works
step_id: int
try:
step_id = int(parameter.name)
except TypeError:
raise InconsistentDatabase("saved workflow step parameters not in the format expected")
step_param_map[str(steps_by_id[step_id].order_index)] = json.loads(parameter.value)
return WorkflowInvocationRequestModel(
history_id=history_id,
workflow_id=workflow_id,
instance=True, # this is a workflow ID and not a stored workflow ID
inputs=export_inputs,
inputs_by=inputs_by,
replacement_params=None if not replacement_dict else replacement_dict,
resource_params=None if not resource_params else resource_params,
use_cached_job=use_cached_job,
preferred_object_store_id=preferred_object_store_id,
preferred_intermediate_object_store_id=preferred_intermediate_object_store_id,
preferred_outputs_object_store_id=preferred_outputs_object_store_id,
parameters_normalized=True,
parameters=None if not step_param_map else step_param_map,
)