import logging
import os
import time
import traceback
from datetime import timedelta
try:
import pbs
PBS_IMPORT_MESSAGE = None
except ImportError as exc:
pbs = None
PBS_IMPORT_MESSAGE = (
"The Python pbs-python package is required to use "
"this feature, please install it or correct the "
f"following error:\nImportError {exc}"
)
from galaxy import (
model,
util,
)
from galaxy.jobs import JobDestination
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState,
)
from galaxy.util.bunch import Bunch
log = logging.getLogger(__name__)
__all__ = ("PBSJobRunner",)
CLUSTER_ERROR_MESSAGE = "Job cannot be completed due to a cluster error, please retry it later: %s"
# The last two lines execute the command and then retrieve the command's
# exit code ($?) and write it to a file.
pbs_symlink_template = """
for dataset in %s; do
dir=`dirname $dataset`
file=`basename $dataset`
[ ! -d $dir ] && mkdir -p $dir
[ ! -e $dataset ] && ln -s %s/$file $dataset
done
mkdir -p %s
"""
PBS_ARGMAP = {
"destination": "-q",
"Execution_Time": "-a",
"Account_Name": "-A",
"Checkpoint": "-c",
"Error_Path": "-e",
"Group_List": "-g",
"Hold_Types": "-h",
"Join_Paths": "-j",
"Keep_Files": "-k",
"Resource_List": "-l",
"Mail_Points": "-m",
"Mail_Users": "-M",
"Job_Name": "-N",
"Output_Path": "-o",
"Priority": "-p",
"Rerunable": "-r",
"Shell_Path_List": "-S",
"job_array_request": "-t",
"User_List": "-u",
"Variable_List": "-v",
}
# From pbs' pbs_job.h
JOB_EXIT_STATUS = {
0: "job exec successful",
-1: "job exec failed, before files, no retry",
-2: "job exec failed, after files, no retry",
-3: "job execution failed, do retry",
-4: "job aborted on MOM initialization",
-5: "job aborted on MOM init, chkpt, no migrate",
-6: "job aborted on MOM init, chkpt, ok migrate",
-7: "job restart failed",
-8: "exec() of user command failed",
-9: "could not create/open stdout stderr files",
-10: "job exceeded a memory limit",
-11: "job exceeded a walltime limit",
-12: "job exceeded a cpu time limit",
}
[docs]class PBSJobRunner(AsynchronousJobRunner):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
runner_name = "PBSRunner"
[docs] def __init__(self, app, nworkers):
"""Start the job runner"""
# Check if PBS was importable, fail if not
assert pbs is not None, PBS_IMPORT_MESSAGE
if app.config.pbs_application_server and app.config.outputs_to_working_directory:
raise Exception(
"pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive"
)
# Set the default server during startup
self.__default_pbs_server = None
self.default_pbs_server # noqa: B018 this is a method with a property decorator, so this causes the default server to be set
# Proceed with general initialization
super().__init__(app, nworkers)
@property
def default_pbs_server(self):
if self.__default_pbs_server is None:
self.__default_pbs_server = pbs.pbs_default()
log.debug(f"Set default PBS server to {self.default_pbs_server}")
return self.__default_pbs_server
[docs] def url_to_destination(self, url):
"""Convert a legacy URL to a job destination"""
if not url:
return
# Determine the PBS server
url_split = url.split("/")
server = url_split[2]
if server == "":
server = self.default_pbs_server
if server is None:
raise Exception("Could not find TORQUE server")
# Determine the queue, set the PBS destination (not the same thing as a Galaxy job destination)
pbs_destination = f"@{server}"
if (pbs_queue := url_split[3] or None) is not None:
pbs_destination = f"{pbs_queue}{pbs_destination}"
params = dict(destination=pbs_destination)
# Determine the args (long-format args were never supported in URLs so they are not supported here)
try:
opts = url.split("/")[4].strip().lstrip("-").split(" -")
assert opts != [""]
# stripping the - comes later (in parse_destination_params)
for i, opt in enumerate(opts):
opts[i] = f"-{opt}"
except Exception:
opts = []
for opt in opts:
param, value = opt.split(None, 1)
params[param] = value
log.debug(f"Converted URL '{url}' to destination runner=pbs, params={params}")
# Create a dynamic JobDestination
return JobDestination(runner="pbs", params=params)
[docs] def parse_destination_params(self, params):
"""A wrapper method around __args_to_attrs() that allow administrators to define PBS
params as either command-line options (as in ``qsub(1B)``) or more human-readable "long"
args (as in ``pbs_submit(3B)``).
:returns: list of dicts -- The dicts map directly to pbs attropl structs (see ``pbs_submit(3B)``)
"""
args = {}
for arg, value in params.items():
try:
if not arg.startswith("-"):
arg = PBS_ARGMAP[arg]
arg = arg.lstrip("-")
args[arg] = value
except Exception:
log.warning(f"Unrecognized long argument in destination params: {arg}")
return self.__args_to_attrs(args)
# Internal stuff
def __args_to_attrs(self, args):
"""Convert a list of PBS command-line args (as in ``qsub(1B)``) to PBS' internal attribute representations.
:returns: list of dicts -- The dicts map directly to pbs attropl structs (see ``pbs_submit(3B)``)
"""
rval = []
for arg, value in args.items():
if arg == "l":
resource_attrs = value.split(",")
for res, val in [a.split("=", 1) for a in resource_attrs]:
rval.append(dict(name=pbs.ATTR_l, value=val, resource=res))
else:
try:
rval.append(dict(name=getattr(pbs, f"ATTR_{arg}"), value=value))
except AttributeError as e:
raise Exception(f"Invalid parameter '{arg}': {e}")
return rval
def __get_pbs_server(self, job_destination_params):
if job_destination_params is None:
return None
return job_destination_params["destination"].split("@")[-1]
[docs] def queue_job(self, job_wrapper):
"""Create PBS script for a job and submit it to the PBS queue"""
# prepare the job
if not self.prepare_job(job_wrapper, include_metadata=not (self.app.config.pbs_stage_path)):
return
job_destination = job_wrapper.job_destination
# Determine the job's PBS destination (server/queue) and options from the job destination definition
pbs_queue_name = None
pbs_server_name = self.default_pbs_server
pbs_options = []
if "-q" in job_destination.params and "destination" not in job_destination.params:
job_destination.params["destination"] = job_destination.params.pop("-q")
if "destination" in job_destination.params:
if "@" in job_destination.params["destination"]:
# Destination includes a server
pbs_queue_name, pbs_server_name = job_destination.params["destination"].split("@")
if pbs_queue_name == "":
# e.g. `qsub -q @server`
pbs_queue_name = None
else:
# Destination is just a queue
pbs_queue_name = job_destination.params["destination"]
job_destination.params.pop("destination")
# Parse PBS params
pbs_options = self.parse_destination_params(job_destination.params)
# Explicitly set the determined PBS destination in the persisted job destination for recovery
job_destination.params["destination"] = f"{pbs_queue_name or ''}@{pbs_server_name}"
c = pbs.pbs_connect(util.smart_str(pbs_server_name))
if c <= 0:
errno, text = pbs.error()
job_wrapper.fail("Unable to queue job for execution. Resubmitting the job may succeed.")
log.error(f"Connection to PBS server for submit failed: {errno}: {text}")
return
# define job attributes
ofile = f"{job_wrapper.working_directory}/{job_wrapper.job_id}.o"
efile = f"{job_wrapper.working_directory}/{job_wrapper.job_id}.e"
ecfile = f"{job_wrapper.working_directory}/{job_wrapper.job_id}.ec"
output_fnames = job_wrapper.job_io.get_output_fnames()
# If an application server is set, we're staging
if self.app.config.pbs_application_server:
pbs_ofile = f"{self.app.config.pbs_application_server}:{ofile}"
pbs_efile = f"{self.app.config.pbs_application_server}:{efile}"
output_files = [str(o) for o in output_fnames]
output_files.append(ecfile)
stagein = self.get_stage_in_out(job_wrapper.job_io.get_input_fnames() + output_files, symlink=True)
stageout = self.get_stage_in_out(output_files)
attrs = [
dict(name=pbs.ATTR_o, value=pbs_ofile),
dict(name=pbs.ATTR_e, value=pbs_efile),
dict(name=pbs.ATTR_stagein, value=stagein),
dict(name=pbs.ATTR_stageout, value=stageout),
]
# If not, we're using NFS
else:
attrs = [
dict(name=pbs.ATTR_o, value=ofile),
dict(name=pbs.ATTR_e, value=efile),
]
# define PBS job options
attrs.append(dict(name=pbs.ATTR_N, value=str(f"{job_wrapper.job_id}_{job_wrapper.tool.id}_{job_wrapper.user}")))
job_attrs = pbs.new_attropl(len(attrs) + len(pbs_options))
for i, attr in enumerate(attrs + pbs_options):
job_attrs[i].name = attr["name"]
job_attrs[i].value = attr["value"]
if "resource" in attr:
job_attrs[i].resource = attr["resource"]
exec_dir = os.path.abspath(job_wrapper.working_directory)
# write the job script
if self.app.config.pbs_stage_path != "":
# touch the ecfile so that it gets staged
with open(ecfile, "a"):
os.utime(ecfile, None)
stage_commands = pbs_symlink_template % (
" ".join(job_wrapper.job_io.get_input_fnames() + output_files),
self.app.config.pbs_stage_path,
exec_dir,
)
else:
stage_commands = ""
env_setup_commands = [stage_commands]
script = self.get_job_file(
job_wrapper, exit_code_path=ecfile, env_setup_commands=env_setup_commands, shell=job_wrapper.shell
)
job_file = f"{job_wrapper.working_directory}/{job_wrapper.job_id}.sh"
self.write_executable_script(job_file, script, job_io=job_wrapper.job_io)
# job was deleted while we were preparing it
if job_wrapper.get_state() in (model.Job.states.DELETED, model.Job.states.STOPPED):
log.debug(f"Job {job_wrapper.job_id} deleted/stopped by user before it entered the PBS queue")
pbs.pbs_disconnect(c)
if job_wrapper.cleanup_job in ("always", "onsuccess"):
self.cleanup((ofile, efile, ecfile, job_file))
job_wrapper.cleanup()
return
# submit
# The job tag includes the job and the task identifier
# (if a TaskWrapper was passed in):
galaxy_job_id = job_wrapper.get_id_tag()
log.debug(f"({galaxy_job_id}) submitting file {job_file}")
tries = 0
while tries < 5:
job_id = pbs.pbs_submit(c, job_attrs, job_file, pbs_queue_name, None)
tries += 1
if job_id:
pbs.pbs_disconnect(c)
break
errno, text = pbs.error()
log.warning("(%s) pbs_submit failed (try %d/5), PBS error %d: %s" % (galaxy_job_id, tries, errno, text))
time.sleep(2)
else:
log.error(f"({galaxy_job_id}) All attempts to submit job failed")
job_wrapper.fail("Unable to run this job due to a cluster error, please retry it later")
return
if pbs_queue_name is None:
log.debug(f"({galaxy_job_id}) queued in default queue as {job_id}")
else:
log.debug(f"({galaxy_job_id}) queued in {pbs_queue_name} queue as {job_id}")
# persist destination
job_wrapper.set_job_destination(job_destination, job_id)
# Store PBS related state information for job
job_state = AsynchronousJobState(
job_wrapper=job_wrapper,
job_id=job_id,
exit_code_file=ecfile,
job_destination=job_destination,
job_file=job_file,
output_file=ofile,
error_file=efile,
)
job_state.old_state = "N"
job_state.running = False
# Add to our 'queue' of jobs to monitor
self.monitor_queue.put(job_state)
[docs] def check_watched_items(self):
"""
Called by the monitor thread to look at each watched job and deal
with state changes.
"""
new_watched = []
# reduce pbs load by batching status queries
(failures, statuses) = self.check_all_jobs()
for pbs_job_state in self.watched:
job_id = pbs_job_state.job_id
galaxy_job_id = pbs_job_state.job_wrapper.get_id_tag()
old_state = pbs_job_state.old_state
pbs_server_name = self.__get_pbs_server(pbs_job_state.job_destination.params)
if pbs_server_name in failures:
log.debug(f"({galaxy_job_id}/{job_id}) Skipping state check because PBS server connection failed")
new_watched.append(pbs_job_state)
continue
try:
status = statuses[job_id]
except KeyError:
if pbs_job_state.job_wrapper.get_state() == model.Job.states.DELETED:
continue
try:
# Recheck to make sure it wasn't a communication problem
self.check_single_job(pbs_server_name, job_id)
log.warning(
f"({galaxy_job_id}/{job_id}) PBS job was not in state check list, but was found with individual state check"
)
new_watched.append(pbs_job_state)
except Exception:
errno, text = pbs.error()
if errno == 15001:
# 15001 == job not in queue
log.debug(f"({galaxy_job_id}/{job_id}) PBS job has left queue")
self.work_queue.put((self.finish_job, pbs_job_state))
else:
# Unhandled error, continue to monitor
log.info(
"(%s/%s) PBS state check resulted in error (%d): %s" % (galaxy_job_id, job_id, errno, text)
)
new_watched.append(pbs_job_state)
continue
if status.job_state != old_state:
log.debug(f"({galaxy_job_id}/{job_id}) PBS job state changed from {old_state} to {status.job_state}")
if status.job_state == "R" and not pbs_job_state.running:
pbs_job_state.running = True
pbs_job_state.job_wrapper.change_state(model.Job.states.RUNNING)
if status.job_state == "R" and status.get("resources_used", False):
# resources_used may not be in the status for new jobs
h, m, s = (int(i) for i in status.resources_used.walltime.split(":"))
runtime = timedelta(0, s, 0, 0, m, h)
if pbs_job_state.check_limits(runtime=runtime):
self.work_queue.put((self.fail_job, pbs_job_state))
continue
elif status.job_state == "C":
# "keep_completed" is enabled in PBS, so try to check exit status
try:
assert (
int(status.exit_status) == 0
or pbs_job_state.job_wrapper.get_state() == model.Job.states.STOPPED
)
log.debug(f"({galaxy_job_id}/{job_id}) PBS job has completed successfully")
except AssertionError:
exit_status = int(status.exit_status)
error_message = JOB_EXIT_STATUS.get(exit_status, f"Unknown error: {status.exit_status}")
pbs_job_state.fail_message = CLUSTER_ERROR_MESSAGE % error_message
log.error(f"({galaxy_job_id}/{job_id}) PBS job failed: {error_message}")
pbs_job_state.stop_job = False
self.work_queue.put((self.fail_job, pbs_job_state))
continue
except AttributeError:
# No exit_status, can't verify proper completion so we just have to assume success.
log.debug(f"({galaxy_job_id}/{job_id}) PBS job has completed")
self.work_queue.put((self.finish_job, pbs_job_state))
continue
pbs_job_state.old_state = status.job_state
new_watched.append(pbs_job_state)
# Replace the watch list with the updated version
self.watched = new_watched
[docs] def check_all_jobs(self):
"""
Returns a list of servers that failed to be contacted and a dict
of "job_id : status" pairs (where status is a bunchified version
of the API's structure.
"""
servers = []
failures = []
statuses = {}
for pbs_job_state in self.watched:
pbs_server_name = self.__get_pbs_server(pbs_job_state.job_destination.params)
if pbs_server_name not in servers:
servers.append(pbs_server_name)
pbs_job_state.check_count += 1
for pbs_server_name in servers:
c = pbs.pbs_connect(util.smart_str(pbs_server_name))
if c <= 0:
log.debug(f"connection to PBS server {pbs_server_name} for state check failed")
failures.append(pbs_server_name)
continue
stat_attrl = pbs.new_attrl(3)
stat_attrl[0].name = pbs.ATTR_state
stat_attrl[1].name = pbs.ATTR_used
stat_attrl[2].name = pbs.ATTR_exitstat
jobs = pbs.pbs_statjob(c, None, stat_attrl, None)
pbs.pbs_disconnect(c)
statuses.update(self.convert_statjob_to_bunches(jobs))
return (failures, statuses)
[docs] def convert_statjob_to_bunches(self, statjob_out):
statuses = {}
for job in statjob_out:
status = {}
for attrib in job.attribs:
if attrib.resource is None:
status[attrib.name] = attrib.value
else:
if attrib.name not in status:
status[attrib.name] = Bunch()
status[attrib.name][attrib.resource] = attrib.value
statuses[job.name] = Bunch(**status)
return statuses
[docs] def check_single_job(self, pbs_server_name, job_id):
"""
Returns the state of a single job, used to make sure a job is
really dead.
"""
c = pbs.pbs_connect(util.smart_str(pbs_server_name))
if c <= 0:
log.debug(f"connection to PBS server {pbs_server_name} for state check failed")
return None
stat_attrl = pbs.new_attrl(1)
stat_attrl[0].name = pbs.ATTR_state
jobs = pbs.pbs_statjob(c, job_id, stat_attrl, None)
pbs.pbs_disconnect(c)
return jobs[0].attribs[0].value
[docs] def get_stage_in_out(self, fnames, symlink=False):
"""Convenience function to create a stagein/stageout list"""
stage = ""
for fname in fnames:
if os.access(fname, os.R_OK):
if stage:
stage += ","
# pathnames are now absolute
if symlink and self.app.config.pbs_stage_path:
stage_name = os.path.join(self.app.config.pbs_stage_path, os.path.split(fname)[1])
else:
stage_name = fname
stage += f"{stage_name}@{self.app.config.pbs_dataset_server}:{fname}"
return stage
[docs] def stop_job(self, job_wrapper):
"""Attempts to delete a job from the PBS queue"""
job = job_wrapper.get_job()
job_id = job.get_job_runner_external_id().encode("utf-8")
job_tag = f"({job.get_id_tag()}/{job_id})"
log.debug(f"{job_tag} Stopping PBS job")
# Declare the connection handle c so that it can be cleaned up:
c = None
try:
pbs_server_name = self.__get_pbs_server(job.destination_params)
if pbs_server_name is None:
log.debug("(%s) Job queued but no destination stored in job params, cannot delete", job_tag)
return
c = pbs.pbs_connect(util.smart_str(pbs_server_name))
if c <= 0:
log.debug(f"({job_tag}) Connection to PBS server for job delete failed")
return
pbs.pbs_deljob(c, job_id, "")
log.debug(f"{job_tag} Removed from PBS queue before job completion")
except Exception:
e = traceback.format_exc()
log.debug(f"{job_tag} Unable to stop job: {e}")
finally:
# Cleanup: disconnect from the server.
if None is not c:
pbs.pbs_disconnect(c)
[docs] def recover(self, job, job_wrapper):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
job_id = job.get_job_runner_external_id()
pbs_job_state = AsynchronousJobState(
job_wrapper=job_wrapper,
job_id=job_id,
job_file=f"{job_wrapper.working_directory}/{job.id}.sh",
output_file=f"{job_wrapper.working_directory}/{job.id}.o",
error_file=f"{job_wrapper.working_directory}/{job.id}.e",
exit_code_file=f"{job_wrapper.working_directory}/{job.id}.ec",
job_destination=job_wrapper.job_destination,
)
pbs_job_state.runner_url = job_wrapper.get_job_runner_url()
job_wrapper.command_line = job.command_line
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the PBS queue"
)
pbs_job_state.old_state = "R"
pbs_job_state.running = True
self.monitor_queue.put(pbs_job_state)
elif job.state == model.Job.states.QUEUED:
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in PBS queued state, adding to the PBS queue"
)
pbs_job_state.old_state = "Q"
pbs_job_state.running = False
self.monitor_queue.put(pbs_job_state)