Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.cli.job.pbs

import json
from logging import getLogger

from .torque import Torque

log = getLogger(__name__)


[docs]class OpenPBS(Torque): ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to OpenPBS CLI plugin: %s"
[docs] def get_status(self, job_ids=None): return "qstat -f -F json"
[docs] def get_single_status(self, job_id): return f"qstat -f {job_id}"
[docs] def parse_status(self, status, job_ids): try: data = json.loads(status) except Exception: log.warning(f"No valid qstat JSON return from `qstat -f -F json`, got the following: {status}") rval = {} for job_id, job in data.get("Jobs", {}).items(): if job_id in job_ids: # map PBS job states to Galaxy job states. rval[id] = self._get_job_state(job["job_state"]) return rval
__all__ = ("OpenPBS",)