Source code for galaxy.webapps.galaxy.api.tasks

"""
API Controller providing experimental access to Celery Task State.
"""

import logging
from uuid import UUID

from galaxy.managers.tasks import (
    AsyncTasksManager,
    TaskState,
)
from . import (
    depends,
    Router,
)

log = logging.getLogger(__name__)

router = Router(tags=["tasks"])


[docs]@router.cbv class FastAPITasks: manager: AsyncTasksManager = depends(AsyncTasksManager) # type: ignore[type-abstract]
[docs] @router.get( "/api/tasks/{task_id}/state", public=True, summary="Determine state of task ID", response_description="String indicating task state.", ) def state(self, task_id: UUID) -> TaskState: return self.manager.get_state(task_id)