Source code for

Abstract base class for cli job plugins.
from abc import (
from enum import Enum

    from galaxy.model import Job

    job_states = Job.states
except ImportError:

    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    class job_states(str, Enum):  # type: ignore[no-redef]
        RUNNING = "running"
        OK = "complete"
        QUEUED = "queued"
        ERROR = "failed"

[docs]class BaseJobExec(metaclass=ABCMeta):
[docs] def __init__(self, **params): """ Constructor for CLI job executor. """ self.params = params.copy()
[docs] def job_script_kwargs(self, ofile, efile, job_name): """Return extra keyword argument for consumption by job script module. """ return {}
[docs] @abstractmethod def submit(self, script_file): """ Given specified script_file path, yield command to submit it to external job manager. """
[docs] @abstractmethod def delete(self, job_id): """ Given job id, return command to stop execution or dequeue specified job. """
[docs] @abstractmethod def get_status(self, job_ids=None): """ Return command to get statuses of specified job ids. """
[docs] @abstractmethod def get_single_status(self, job_id): """ Return command to get the status of a single, specified job. """
[docs] @abstractmethod def parse_status(self, status, job_ids): """ Parse the statuses of output from get_status command. """
[docs] @abstractmethod def parse_single_status(self, status, job_id): """ Parse the status of output from get_single_status command. """
[docs] def get_failure_reason(self, job_id): """ Return the failure reason for the given job_id. """ return None
[docs] def parse_failure_reason(self, reason, job_id): """ Parses the failure reason, assigning it against a """ return None
__all__ = ( "BaseJobExec", "job_states", )