Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.cli.job.torque

from logging import getLogger

from galaxy.util import parse_xml_string
from . import (
    BaseJobExec,
    job_states,
)

log = getLogger(__name__)


argmap = {
    "destination": "-q",
    "Execution_Time": "-a",
    "Account_Name": "-A",
    "Checkpoint": "-c",
    "Error_Path": "-e",
    "Group_List": "-g",
    "Hold_Types": "-h",
    "Join_Paths": "-j",
    "Keep_Files": "-k",
    "Resource_List": "-l",
    "Mail_Points": "-m",
    "Mail_Users": "-M",
    "Job_Name": "-N",
    "Output_Path": "-o",
    "Priority": "-p",
    "Rerunable": "-r",
    "Shell_Path_List": "-S",
    "job_array_request": "-t",
    "User_List": "-u",
    "Variable_List": "-v",
}


[docs]class Torque(BaseJobExec): ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to Torque CLI plugin: %s"
[docs] def job_script_kwargs(self, ofile, efile, job_name): pbsargs = {"-o": ofile, "-e": efile, "-N": job_name} for k, v in self.params.items(): if k == "plugin": continue try: if not k.startswith("-"): k = argmap[k] pbsargs[k] = v except KeyError: log.warning(self.ERROR_MESSAGE_UNRECOGNIZED_ARG, k) template_pbsargs = "" for k, v in pbsargs.items(): template_pbsargs += f"#PBS {k} {v}\n" return dict(headers=template_pbsargs)
[docs] def submit(self, script_file): return f"qsub {script_file}"
[docs] def delete(self, job_id): return f"qdel {job_id}"
[docs] def get_status(self, job_ids=None): return "qstat -x"
[docs] def get_single_status(self, job_id): return f"qstat -f {job_id}"
[docs] def parse_status(self, status, job_ids): # in case there's noise in the output, find the big blob 'o xml tree = None rval = {} for line in status.strip().splitlines(): try: tree = parse_xml_string(line.strip()) assert tree.tag == "Data" break except Exception: tree = None if tree is None: log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}") return {} else: for job in tree.findall("Job"): job_id_elem = job.find("Job_Id") assert job_id_elem is not None id_ = job_id_elem.text if id_ in job_ids: job_state_elem = job.find("job_state") assert job_state_elem is not None state = job_state_elem.text assert state # map PBS job states to Galaxy job states. rval[id_] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id): for line in status.splitlines(): line = line.split(" = ") if line[0].strip() == "job_state": return self._get_job_state(line[1].strip()) # no state found, job has exited return job_states.OK
def _get_job_state(self, state: str) -> job_states: try: return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}[state] except KeyError: raise KeyError(f"Failed to map torque status code [{state}] to job state.")
__all__ = ("Torque",)