This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.tasks

API Controller providing experimental access to Celery Task State.
import logging

from celery.result import AsyncResult

from . import Router

log = logging.getLogger(__name__)

router = Router(tags=["tasks"])

[docs]@router.cbv class FastAPITasks:
[docs] @router.get( "/api/tasks/{task_id}/state", summary="Determine state of task ID", response_description="String indicating task state.", ) def state(self, task_id: str) -> str: res = AsyncResult(str(task_id)) return str(res.state)