Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
galaxy.jobs.runners package¶
Base classes for job runner plugins.
-
class
galaxy.jobs.runners.
RunnerParams
(specs=None, params=None)[source]¶ Bases:
galaxy.util.ParamsWithSpecs
-
class
galaxy.jobs.runners.
BaseJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
object
-
DEFAULT_SPECS
= {'recheck_missing_job_retries': {'map': <class 'int'>, 'valid': <function BaseJobRunner.<lambda> at 0x7f2f25cf51e0>, 'default': 0}}¶
-
put
(job_wrapper)[source]¶ Add a job to the queue (by job identifier), indicate that the job is ready to run.
-
url_to_destination
(url)[source]¶ Convert a legacy URL to a JobDestination.
Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.
-
parse_destination_params
(params)[source]¶ Parse the JobDestination
params
dict and return the runner’s native representation of those params.
-
prepare_job
(job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=True, stdout_file=None, stderr_file=None)[source]¶ Some sanity checks that all runners’ queue_job() methods are likely to want to do
-
build_command_line
(job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=True, stdout_file=None, stderr_file=None)[source]¶
-
-
class
galaxy.jobs.runners.
JobState
(job_wrapper, job_destination)[source]¶ Bases:
object
Encapsulate state of jobs.
-
runner_states
= <galaxy.util.bunch.Bunch object>¶
-
-
class
galaxy.jobs.runners.
AsynchronousJobState
(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]¶ Bases:
galaxy.jobs.runners.JobState
Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed to communicate with distributed resource manager.
-
__init__
(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]¶
-
running
¶
-
-
class
galaxy.jobs.runners.
AsynchronousJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.BaseJobRunner
,galaxy.util.monitors.Monitors
Parent class for any job runner that runs jobs asynchronously (e.g. via a distributed resource manager). Provides general methods for having a thread to monitor the state of asynchronous jobs and submitting those jobs to the correct methods (queue, finish, cleanup) at appropriate times..
-
monitor
()[source]¶ Watches jobs currently in the monitor queue and deals with state changes (queued to running) and job completion.
-
check_watched_items
()[source]¶ This method is responsible for iterating over self.watched and handling state changes and updating self.watched with a new list of watched job states. Subclasses can opt to override this directly (as older job runners will initially) or just override check_watched_item and allow the list processing to reuse the logic here.
-
Subpackages¶
- galaxy.jobs.runners.state_handlers package
- galaxy.jobs.runners.util package
- Subpackages
- Submodules
- galaxy.jobs.runners.util.env module
- galaxy.jobs.runners.util.external module
- galaxy.jobs.runners.util.kill module
- galaxy.jobs.runners.util.process_groups module
- galaxy.jobs.runners.util.pykube_util module
- galaxy.jobs.runners.util.sudo module
Submodules¶
galaxy.jobs.runners.chronos module¶
-
class
galaxy.jobs.runners.chronos.
ChronosJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
-
runner_name
= 'ChronosRunner'¶
-
RUNNER_PARAM_SPEC_KEY
= 'runner_param_specs'¶
-
JOB_NAME_PREFIX
= 'galaxy-chronos-'¶
-
RUNNER_PARAM_SPEC
= {'chronos': {'map': <class 'str'>}, 'insecure': {'map': <function ChronosJobRunner.<lambda> at 0x7f2f13e38ae8>, 'default': True}, 'owner': {'map': <class 'str'>}, 'password': {'map': <class 'str'>}, 'username': {'map': <class 'str'>}}¶
-
DESTINATION_PARAMS_SPEC
= {'docker_cpu': {'default': 0.1, 'map_name': 'cpus', 'map': <class 'float'>}, 'docker_disk': {'default': 256, 'map_name': 'disk', 'map': <class 'int'>}, 'docker_memory': {'default': 128, 'map_name': 'mem', 'map': <class 'int'>}, 'max_retries': {'default': 2, 'map_name': 'retries', 'map': <class 'int'>}, 'volumes': {'default': None, 'map_name': 'container/volumes', 'map': <function ChronosJobRunner.<lambda> at 0x7f2f13e38b70>}}¶
-
galaxy.jobs.runners.cli module¶
Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).
-
class
galaxy.jobs.runners.cli.
ShellJobRunner
(app, nworkers)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'ShellRunner'¶
-
submit
(shell, job_interface, job_file, galaxy_id_tag, retry=3, timeout=10)[source]¶ Handles actual job script submission.
If submission fails will retry retry time with a timeout of timeout seconds. Retuns the returncode of the submission and the stdout, which contains the external job_id.
-
galaxy.jobs.runners.condor module¶
Job control via the Condor DRM via CLI.
This plugin has been used in production and isn’t unstable but shouldn’t be taken as an example of how to write Galaxy job runners that interface with a DRM using command-line invocations. When writing new job runners that leverage command-line calls for submitting and checking the status of jobs please check out the CLI runner (cli.py in this directory) start by writing a new job plugin in for that (see examples in /galaxy/jobs/runners/util/cli/job). That approach will result in less boilerplate and allow greater reuse of the DRM specific hooks you’ll need to write. Ideally this plugin would have been written to target that framework, but we don’t have the bandwidth to rewrite it at this time.
-
class
galaxy.jobs.runners.condor.
CondorJobRunner
(app, nworkers)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'CondorRunner'¶
-
galaxy.jobs.runners.drmaa module¶
Job control via the DRMAA API.
-
class
galaxy.jobs.runners.drmaa.
DRMAAJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'DRMAARunner'¶
-
restrict_job_name_length
= 15¶
-
check_watched_item
(ajs, new_watched)[source]¶ look at a single watched job, determine its state, and deal with errors that could happen in this process. to be called from check_watched_items() returns the state or None if exceptions occurred in the latter case the job is appended to new_watched if a 1 drmaa.InternalException, 2 drmaa.InvalidJobExceptionnot, or 3 drmaa.DrmCommunicationException occurred (which causes the job to be tested again in the next iteration of check_watched_items) - the job is finished as errored if any other exception occurs - the job is finished OK or errored after the maximum number of retries
depending on the exceptionNote that None is returned in all cases where the loop in check_watched_items is to be continued
-
check_watched_items
()[source]¶ Called by the monitor thread to look at each watched job and deal with state changes.
-
recover
(job, job_wrapper)[source]¶ Recovers jobs stuck in the queued/running state when Galaxy started
-
galaxy.jobs.runners.godocker module¶
-
class
galaxy.jobs.runners.godocker.
GodockerJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'GodockerJobRunner'¶
-
__init__
(app, nworkers, **kwargs)[source]¶ 1: Get runner_param_specs from job_conf.xml 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads
-
check_watched_item
(job_state)[source]¶ - Get the job current status from GoDocker
- using job_id and update the status in galaxy.
- If the job execution is successful, call
- mark_as_finished() and return ‘None’ to galaxy.
- else if the job failed, call mark_as_failed()
- and return ‘None’ to galaxy.
- else if the job is running or in pending state, simply
- return the ‘AsynchronousJobState object’ (job_state).
-
recover
(job, job_wrapper)[source]¶ Recovers jobs stuck in the queued/running state when Galaxy started
-
create_log_file
(job_state, job_status_god)[source]¶ Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful
-
login
(apikey, login, server, noCert=False)[source]¶ Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method.
-
post_task
(job_wrapper)[source]¶ Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method.
-
get_task
(job_id)[source]¶ Get job details from GoDocker and return the job. Pass job_id to the http_get_request method.
-
task_suspend
(job_id)[source]¶ Suspend actively running job in galaxy. Pass job_id to the http_get_request method.
-
galaxy.jobs.runners.kubernetes module¶
Offload jobs to a Kubernetes cluster.
-
class
galaxy.jobs.runners.kubernetes.
KubernetesJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'KubernetesRunner'¶
-
check_watched_item
(job_state)[source]¶ Checks the state of a job already submitted on k8s. Job state is an AsynchronousJobState
-
galaxy.jobs.runners.local module¶
Job runner plugin for executing jobs on the local system via the command line.
-
class
galaxy.jobs.runners.local.
LocalJobRunner
(app, nworkers)[source]¶ Bases:
galaxy.jobs.runners.BaseJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'LocalRunner'¶
-
galaxy.jobs.runners.pbs module¶
-
class
galaxy.jobs.runners.pbs.
PBSJobRunner
(app, nworkers)[source]¶ Bases:
galaxy.jobs.runners.AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'PBSRunner'¶
-
default_pbs_server
¶
-
parse_destination_params
(params)[source]¶ A wrapper method around __args_to_attrs() that allow administrators to define PBS params as either command-line options (as in
qsub(1B)
) or more human-readable “long” args (as inpbs_submit(3B)
).Returns: list of dicts – The dicts map directly to pbs attropl structs (see pbs_submit(3B)
)
-
check_watched_items
()[source]¶ Called by the monitor thread to look at each watched job and deal with state changes.
-
check_all_jobs
()[source]¶ Returns a list of servers that failed to be contacted and a dict of “job_id : status” pairs (where status is a bunchified version of the API’s structure.
-
check_single_job
(pbs_server_name, job_id)[source]¶ Returns the state of a single job, used to make sure a job is really dead.
-
galaxy.jobs.runners.pulsar module¶
Job runner used to execute Galaxy jobs through Pulsar.
More information on Pulsar can be found at https://pulsar.readthedocs.io/ .
-
class
galaxy.jobs.runners.pulsar.
PulsarLegacyJobRunner
(app, nworkers, **kwds)[source]¶ Bases:
galaxy.jobs.runners.pulsar.PulsarJobRunner
Flavor of Pulsar job runner mimicking behavior of old LWR runner.
-
destination_defaults
= {'dependency_resolution': 'local', 'rewrite_parameters': 'false'}¶
-
-
class
galaxy.jobs.runners.pulsar.
PulsarMQJobRunner
(app, nworkers, **kwds)[source]¶ Bases:
galaxy.jobs.runners.pulsar.PulsarJobRunner
Flavor of Pulsar job runner with sensible defaults for message queue communication.
-
use_mq
= True¶
-
poll
= False¶
-
destination_defaults
= {'default_file_action': 'remote_transfer', 'dependency_resolution': 'remote', 'jobs_directory': <object object at 0x7f2f1518c1a0>, 'private_token': <object object at 0x7f2f1518c1b0>, 'rewrite_parameters': 'true', 'url': <object object at 0x7f2f1518c1b0>}¶
-
-
class
galaxy.jobs.runners.pulsar.
PulsarRESTJobRunner
(app, nworkers, **kwds)[source]¶ Bases:
galaxy.jobs.runners.pulsar.PulsarJobRunner
Flavor of Pulsar job runner with sensible defaults for RESTful usage.
-
destination_defaults
= {'default_file_action': 'transfer', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true', 'url': <object object at 0x7f2f1518c1a0>}¶
-
-
class
galaxy.jobs.runners.pulsar.
PulsarEmbeddedJobRunner
(app, nworkers, **kwds)[source]¶ Bases:
galaxy.jobs.runners.pulsar.PulsarJobRunner
Flavor of Puslar job runner that runs Pulsar’s server code directly within Galaxy.
This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service.
-
destination_defaults
= {'default_file_action': 'copy', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true'}¶
-
default_build_pulsar_app
= True¶
-
-
class
galaxy.jobs.runners.pulsar.
PulsarEmbeddedMQJobRunner
(app, nworkers, **kwds)[source]¶ Bases:
galaxy.jobs.runners.pulsar.PulsarMQJobRunner
-
default_build_pulsar_app
= True¶
-
galaxy.jobs.runners.slurm module¶
SLURM job control via the DRMAA API.
-
class
galaxy.jobs.runners.slurm.
SlurmJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.drmaa.DRMAAJobRunner
-
runner_name
= 'SlurmRunner'¶
-
restrict_job_name_length
= False¶
-
galaxy.jobs.runners.state_handler_factory module¶
galaxy.jobs.runners.tasks module¶
-
class
galaxy.jobs.runners.tasks.
TaskedJobRunner
(app, nworkers)[source]¶ Bases:
galaxy.jobs.runners.BaseJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
-
runner_name
= 'TaskRunner'¶
-
galaxy.jobs.runners.univa module¶
Job control via the DRMAA API / qstat and qacct.
Known to work on the UNIVA grid engine.
known bugs/problems: - if a job runs longer than the time limits of the queue two things happen
- at the soft limit (s_rt) SIGUSR1 is sent to the job
2. at the hard limit (h_rt) SIGKILL is sent to the job The second case is covered by the runner – it’s the same mechanism that kills a job when the job time limit is reached. The first case is currently not covered. The good thing is that most programs ignore SIGUSR1. For the second case it seems that jobs are marked as failed (killed by the DRM) at random.
Possible solutions: - configure the job destinations such that the queue limits are never reached.
the time limits can be determined with for i in `qconf -sql; do qconf -sq $i | grep -E ‘(h|s)_rt’; done`
extend the job runner to cover s_rt cases (some ideas): * I’m unsure if the programs reaction to SIGUSR1 can be determined easily
because it depends on the program. It seems that exit code is in most cases 10.
- The sheduler logs contains quite useful information.
-
class
galaxy.jobs.runners.univa.
UnivaJobRunner
(app, nworkers, **kwargs)[source]¶ Bases:
galaxy.jobs.runners.drmaa.DRMAAJobRunner
-
runner_name
= 'UnivaJobRunner'¶
-