from logging import getLogger

from galaxy.util import parse_xml_string
from ..job import (

log = getLogger(__name__)

argmap = {
    "destination": "-q",
    "Execution_Time": "-a",
    "Account_Name": "-A",
    "Checkpoint": "-c",
    "Error_Path": "-e",
    "Group_List": "-g",
    "Hold_Types": "-h",
    "Join_Paths": "-j",
    "Keep_Files": "-k",
    "Resource_List": "-l",
    "Mail_Points": "-m",
    "Mail_Users": "-M",
    "Job_Name": "-N",
    "Output_Path": "-o",
    "Priority": "-p",
    "Rerunable": "-r",
    "Shell_Path_List": "-S",
    "job_array_request": "-t",
    "User_List": "-u",
    "Variable_List": "-v",

[docs]class Torque(BaseJobExec): ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to Torque CLI plugin: %s"
[docs] def job_script_kwargs(self, ofile, efile, job_name): pbsargs = {"-o": ofile, "-e": efile, "-N": job_name} for k, v in self.params.items(): if k == "plugin": continue try: if not k.startswith("-"): k = argmap[k] pbsargs[k] = v except KeyError: log.warning(self.ERROR_MESSAGE_UNRECOGNIZED_ARG, k) template_pbsargs = "" for k, v in pbsargs.items(): template_pbsargs += f"#PBS {k} {v}\n" return dict(headers=template_pbsargs)
[docs] def submit(self, script_file): return f"qsub {script_file}"
[docs] def delete(self, job_id): return f"qdel {job_id}"
[docs] def get_status(self, job_ids=None): return "qstat -x"
[docs] def get_single_status(self, job_id): return f"qstat -f {job_id}"
[docs] def parse_status(self, status, job_ids): # in case there's noise in the output, find the big blob 'o xml tree = None rval = {} for line in status.strip().splitlines(): try: tree = parse_xml_string(line.strip()) assert tree.tag == "Data" break except Exception: tree = None if tree is None: log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}") return None else: for job in tree.findall("Job"): id = job.find("Job_Id").text if id in job_ids: state = job.find("job_state").text # map PBS job states to Galaxy job states. rval[id] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id): for line in status.splitlines(): line = line.split(" = ") if line[0].strip() == "job_state": return self._get_job_state(line[1].strip()) # no state found, job has exited return job_states.OK
def _get_job_state(self, state): try: return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}.get( state ) except KeyError: raise KeyError(f"Failed to map torque status code [{state}] to job state.")
__all__ = ("Torque",)