Source code for galaxy.jobs.runners.util.cli.job

"""
Abstract base class for cli job plugins.
"""

from abc import (
    ABCMeta,
    abstractmethod,
)
from enum import Enum
from typing import (
    Dict,
    List,
)

from typing_extensions import TypeAlias

try:
    from galaxy.model import Job

    job_states: TypeAlias = Job.states
except ImportError:
    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    class job_states(str, Enum):  # type: ignore[no-redef]
        RUNNING = "running"
        OK = "complete"
        QUEUED = "queued"
        ERROR = "failed"


[docs]class BaseJobExec(metaclass=ABCMeta):
[docs] def __init__(self, **params): """ Constructor for CLI job executor. """ self.params = params.copy()
[docs] def job_script_kwargs(self, ofile, efile, job_name): """Return extra keyword argument for consumption by job script module. """ return {}
[docs] @abstractmethod def submit(self, script_file): """ Given specified script_file path, yield command to submit it to external job manager. """
[docs] @abstractmethod def delete(self, job_id): """ Given job id, return command to stop execution or dequeue specified job. """
[docs] @abstractmethod def get_status(self, job_ids=None): """ Return command to get statuses of specified job ids. """
[docs] @abstractmethod def get_single_status(self, job_id): """ Return command to get the status of a single, specified job. """
[docs] @abstractmethod def parse_status(self, status: str, job_ids: List[str]) -> Dict[str, job_states]: """ Parse the statuses of output from get_status command. """
[docs] @abstractmethod def parse_single_status(self, status: str, job_id: str) -> job_states: """ Parse the status of output from get_single_status command. """
[docs] def get_failure_reason(self, job_id): """ Return the failure reason for the given job_id. """ return None
[docs] def parse_failure_reason(self, reason, job_id): """ Parses the failure reason, assigning it against a """ return None
__all__ = ( "BaseJobExec", "job_states", )