Source code for galaxy.jobs.runners.util.cli.job.pbs

import json
from logging import getLogger

from .torque import Torque

log = getLogger(__name__)


[docs]class OpenPBS(Torque): ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to OpenPBS CLI plugin: %s"
[docs] def get_status(self, job_ids=None): return "qstat -f -F json"
[docs] def get_single_status(self, job_id): return f"qstat -f {job_id}"
[docs] def parse_status(self, status, job_ids): try: data = json.loads(status) except Exception: log.warning(f"No valid qstat JSON return from `qstat -f -F json`, got the following: {status}") rval = {} for job_id, job in data.get("Jobs", {}).items(): if job_id in job_ids: # map PBS job states to Galaxy job states. rval[id] = self._get_job_state(job["job_state"]) return rval
__all__ = ("OpenPBS",)