Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli.job.pbs
import json
from logging import getLogger
from .torque import Torque
log = getLogger(__name__)
[docs]class OpenPBS(Torque):
ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to OpenPBS CLI plugin: %s"
[docs] def parse_status(self, status, job_ids):
try:
data = json.loads(status)
except Exception:
log.warning(f"No valid qstat JSON return from `qstat -f -F json`, got the following: {status}")
rval = {}
for job_id, job in data.get("Jobs", {}).items():
if job_id in job_ids:
# map PBS job states to Galaxy job states.
rval[id] = self._get_job_state(job["job_state"])
return rval
__all__ = ("OpenPBS",)