Source code for galaxy.jobs.runners.util.cli.job.torque

from logging import getLogger

from galaxy.util import parse_xml_string
from . import (
    BaseJobExec,
    job_states,
)

log = getLogger(__name__)


argmap = {
    "destination": "-q",
    "Execution_Time": "-a",
    "Account_Name": "-A",
    "Checkpoint": "-c",
    "Error_Path": "-e",
    "Group_List": "-g",
    "Hold_Types": "-h",
    "Join_Paths": "-j",
    "Keep_Files": "-k",
    "Resource_List": "-l",
    "Mail_Points": "-m",
    "Mail_Users": "-M",
    "Job_Name": "-N",
    "Output_Path": "-o",
    "Priority": "-p",
    "Rerunable": "-r",
    "Shell_Path_List": "-S",
    "job_array_request": "-t",
    "User_List": "-u",
    "Variable_List": "-v",
}


[docs]class Torque(BaseJobExec): ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to Torque CLI plugin: %s"
[docs] def job_script_kwargs(self, ofile, efile, job_name): pbsargs = {"-o": ofile, "-e": efile, "-N": job_name} for k, v in self.params.items(): if k == "plugin": continue try: if not k.startswith("-"): k = argmap[k] pbsargs[k] = v except KeyError: log.warning(self.ERROR_MESSAGE_UNRECOGNIZED_ARG, k) template_pbsargs = "" for k, v in pbsargs.items(): template_pbsargs += f"#PBS {k} {v}\n" return dict(headers=template_pbsargs)
[docs] def submit(self, script_file): return f"qsub {script_file}"
[docs] def delete(self, job_id): return f"qdel {job_id}"
[docs] def get_status(self, job_ids=None): return "qstat -x"
[docs] def get_single_status(self, job_id): return f"qstat -f {job_id}"
[docs] def parse_status(self, status, job_ids): # in case there's noise in the output, find the big blob 'o xml tree = None rval = {} for line in status.strip().splitlines(): try: tree = parse_xml_string(line.strip()) assert tree.tag == "Data" break except Exception: tree = None if tree is None: log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}") return {} else: for job in tree.findall("Job"): job_id_elem = job.find("Job_Id") assert job_id_elem is not None id_ = job_id_elem.text if id_ in job_ids: job_state_elem = job.find("job_state") assert job_state_elem is not None state = job_state_elem.text assert state # map PBS job states to Galaxy job states. rval[id_] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id): for line in status.splitlines(): line = line.split(" = ") if line[0].strip() == "job_state": return self._get_job_state(line[1].strip()) # no state found, job has exited return job_states.OK
def _get_job_state(self, state: str) -> job_states: try: return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}[state] except KeyError: raise KeyError(f"Failed to map torque status code [{state}] to job state.")
__all__ = ("Torque",)