Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

galaxy.jobs.runners package

Base classes for job runner plugins.

class galaxy.jobs.runners.RunnerParams(specs=None, params=None)[source]

Bases: galaxy.util.ParamsWithSpecs

class galaxy.jobs.runners.BaseJobRunner(app, nworkers, **kwargs)[source]

Bases: object

DEFAULT_SPECS = {'recheck_missing_job_retries': {'default': 0, 'map': <type 'int'>, 'valid': <function <lambda>>}}
__init__(app, nworkers, **kwargs)[source]

Start the job runner

run_next()[source]

Run the next item in the work queue (a job waiting to run)

put(job_wrapper)[source]

Add a job to the queue (by job identifier), indicate that the job is ready to run.

mark_as_queued(job_wrapper)[source]
shutdown()[source]

Attempts to gracefully shut down the worker threads

url_to_destination(url)[source]

Convert a legacy URL to a JobDestination.

Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.

parse_destination_params(params)[source]

Parse the JobDestination params dict and return the runner’s native representation of those params.

prepare_job(job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=True)[source]

Some sanity checks that all runners’ queue_job() methods are likely to want to do

queue_job(job_wrapper)[source]
stop_job(job)[source]
recover(job, job_wrapper)[source]
build_command_line(job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=True)[source]
get_work_dir_outputs(job_wrapper, job_working_directory=None, tool_working_directory=None)[source]

Returns list of pairs (source_file, destination) describing path to work_dir output file and ultimate destination.

get_job_file(job_wrapper, **kwds)[source]
write_executable_script(path, contents, mode=493)[source]
fail_job(job_state, exception=False)[source]
mark_as_resubmitted(job_state, info=None)[source]
class galaxy.jobs.runners.JobState(job_wrapper, job_destination)[source]

Bases: object

Encapsulate state of jobs.

runner_states = <galaxy.util.bunch.Bunch object>
__init__(job_wrapper, job_destination)[source]
set_defaults(files_dir)[source]
static default_job_file(files_dir, id_tag)[source]
static default_exit_code_file(files_dir, id_tag)[source]
cleanup()[source]
class galaxy.jobs.runners.AsynchronousJobState(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]

Bases: galaxy.jobs.runners.JobState

Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed to communicate with distributed resource manager.

__init__(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]
running
check_limits(runtime=None)[source]
register_cleanup_file_attribute(attribute)[source]
class galaxy.jobs.runners.AsynchronousJobRunner(app, nworkers, **kwargs)[source]

Bases: galaxy.util.monitors.Monitors, galaxy.jobs.runners.BaseJobRunner

Parent class for any job runner that runs jobs asynchronously (e.g. via a distributed resource manager). Provides general methods for having a thread to monitor the state of asynchronous jobs and submitting those jobs to the correct methods (queue, finish, cleanup) at appropriate times..

__init__(app, nworkers, **kwargs)[source]
handle_stop()[source]
monitor()[source]

Watches jobs currently in the monitor queue and deals with state changes (queued to running) and job completion.

monitor_job(job_state)[source]
shutdown()[source]

Attempts to gracefully shut down the monitor thread

check_watched_items()[source]

This method is responsible for iterating over self.watched and handling state changes and updating self.watched with a new list of watched job states. Subclasses can opt to override this directly (as older job runners will initially) or just override check_watched_item and allow the list processing to reuse the logic here.

check_watched_item(job_state)[source]
finish_job(job_state)[source]

Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.

mark_as_finished(job_state)[source]
mark_as_failed(job_state)[source]

Submodules

galaxy.jobs.runners.chronos module

galaxy.jobs.runners.cli module

Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).

class galaxy.jobs.runners.cli.ShellJobRunner(app, nworkers)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'ShellRunner'
__init__(app, nworkers)[source]

Start the job runner

get_cli_plugins(shell_params, job_params)[source]
url_to_destination(url)[source]
parse_destination_params(params)[source]
queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

submit(shell, job_interface, job_file, galaxy_id_tag, retry=3, timeout=10)[source]

Handles actual job script submission.

If submission fails will retry retry time with a timeout of timeout seconds. Retuns the returncode of the submission and the stdout, which contains the external job_id.

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

stop_job(job)[source]

Attempts to delete a dispatched job

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.condor module

Job control via the Condor DRM.

class galaxy.jobs.runners.condor.CondorJobRunner(app, nworkers)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'CondorRunner'
__init__(app, nworkers)[source]

Initialize this job runner and start the monitor thread

queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

stop_job(job)[source]

Attempts to delete a job from the DRM queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.drmaa module

Job control via the DRMAA API.

class galaxy.jobs.runners.drmaa.DRMAAJobRunner(app, nworkers, **kwargs)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'DRMAARunner'
restrict_job_name_length = 15
__init__(app, nworkers, **kwargs)[source]

Start the job runner

url_to_destination(url)[source]

Convert a legacy URL to a job destination

get_native_spec(url)[source]

Get any native DRM arguments specified by the site configuration

queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

stop_job(job)[source]

Attempts to delete a job from the DRM queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

store_jobtemplate(job_wrapper, jt)[source]

Stores the content of a DRMAA JobTemplate object in a file as a JSON string. Path is hard-coded, but it’s no worse than other path in this module. Uses Galaxy’s JobID, so file is expected to be unique.

external_runjob(external_runjob_script, jobtemplate_filename, username)[source]

runs an external script the will QSUB a new job. The external script needs to be run with sudo, and will setuid() to the specified user. Effectively, will QSUB as a different user (then the one used by Galaxy).

galaxy.jobs.runners.godocker module

class galaxy.jobs.runners.godocker.GodockerJobRunner(app, nworkers, **kwargs)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'GodockerJobRunner'
__init__(app, nworkers, **kwargs)[source]

1: Get runner_param_specs from job_conf.xml 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads

queue_job(job_wrapper)[source]

Create job script and submit it to godocker

check_watched_item(job_state)[source]
Get the job current status from GoDocker
using job_id and update the status in galaxy.
If the job execution is successful, call
mark_as_finished() and return ‘None’ to galaxy.
else if the job failed, call mark_as_failed()
and return ‘None’ to galaxy.
else if the job is running or in pending state, simply
return the ‘AsynchronousJobState object’ (job_state).
stop_job(job)[source]

Attempts to delete a dispatched executing Job in GoDocker

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

create_log_file(job_state, job_status_god)[source]

Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful

login(apikey, login, server, noCert=False)[source]

Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method.

post_task(job_wrapper)[source]

Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method.

get_task(job_id)[source]

Get job details from GoDocker and return the job. Pass job_id to the http_get_request method.

task_suspend(job_id)[source]

Suspend actively running job in galaxy. Pass job_id to the http_get_request method.

get_task_status(job_id)[source]

Get job status from GoDocker and return the status of job. Pass job_id to http_get_request method.

delete_task(job_id)[source]

Delete a suspended task in GoDocker. Pass job_id to http_delete_request method.

galaxy.jobs.runners.kubernetes module

Offload jobs to a Kubernetes cluster.

class galaxy.jobs.runners.kubernetes.KubernetesJobRunner(app, nworkers, **kwargs)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'KubernetesRunner'
__init__(app, nworkers, **kwargs)[source]
queue_job(job_wrapper)[source]

Create job script and submit it to Kubernetes cluster

check_watched_item(job_state)[source]

Checks the state of a job already submitted on k8s. Job state is a AsynchronousJobState

fail_job(job_state)[source]

Kubernetes runner overrides fail_job (called by mark_as_failed) to rescue the pod’s log files which are left as stdout (pods logs are the natural stdout and stderr of the running processes inside the pods) and are deleted in the parent implementation as part of the failing the job process.

Parameters:job_state
Returns:
stop_job(job)[source]

Attempts to delete a dispatched job to the k8s cluster

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.local module

Job runner plugin for executing jobs on the local system via the command line.

class galaxy.jobs.runners.local.LocalJobRunner(app, nworkers)[source]

Bases: galaxy.jobs.runners.BaseJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'LocalRunner'
__init__(app, nworkers)[source]

Start the job runner

queue_job(job_wrapper)[source]
stop_job(job)[source]
recover(job, job_wrapper)[source]
shutdown()[source]

galaxy.jobs.runners.pbs module

class galaxy.jobs.runners.pbs.PBSJobRunner(app, nworkers)[source]

Bases: galaxy.jobs.runners.AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'PBSRunner'
__init__(app, nworkers)[source]

Start the job runner

default_pbs_server
url_to_destination(url)[source]

Convert a legacy URL to a job destination

parse_destination_params(params)[source]

A wrapper method around __args_to_attrs() that allow administrators to define PBS params as either command-line options (as in qsub(1B)) or more human-readable “long” args (as in pbs_submit(3B)).

Returns:list of dicts – The dicts map directly to pbs attropl structs (see pbs_submit(3B))
queue_job(job_wrapper)[source]

Create PBS script for a job and submit it to the PBS queue

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

check_all_jobs()[source]

Returns a list of servers that failed to be contacted and a dict of “job_id : status” pairs (where status is a bunchified version of the API’s structure.

convert_statjob_to_bunches(statjob_out)[source]
check_single_job(pbs_server_name, job_id)[source]

Returns the state of a single job, used to make sure a job is really dead.

fail_job(pbs_job_state)[source]

Separated out so we can use the worker threads for it.

get_stage_in_out(fnames, symlink=False)[source]

Convenience function to create a stagein/stageout list

stop_job(job)[source]

Attempts to delete a job from the PBS queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.pulsar module

Job runner used to execute Galaxy jobs through Pulsar.

More infromation on Pulsar can be found at http://pulsar.readthedocs.org/.

class galaxy.jobs.runners.pulsar.PulsarLegacyJobRunner(app, nworkers, **kwds)[source]

Bases: galaxy.jobs.runners.pulsar.PulsarJobRunner

Flavor of Pulsar job runner mimicking behavior of old LWR runner.

destination_defaults = {'rewrite_parameters': 'false', 'dependency_resolution': 'local'}
class galaxy.jobs.runners.pulsar.PulsarMQJobRunner(app, nworkers, **kwds)[source]

Bases: galaxy.jobs.runners.pulsar.PulsarJobRunner

Flavor of Pulsar job runner with sensible defaults for message queue communication.

destination_defaults = {'jobs_directory': <object object>, 'private_token': <object object>, 'dependency_resolution': 'remote', 'url': <object object>, 'rewrite_parameters': 'true', 'default_file_action': 'remote_transfer'}
class galaxy.jobs.runners.pulsar.PulsarRESTJobRunner(app, nworkers, **kwds)[source]

Bases: galaxy.jobs.runners.pulsar.PulsarJobRunner

Flavor of Pulsar job runner with sensible defaults for RESTful usage.

destination_defaults = {'url': <object object>, 'rewrite_parameters': 'true', 'dependency_resolution': 'remote', 'default_file_action': 'transfer'}
class galaxy.jobs.runners.pulsar.PulsarEmbeddedJobRunner(app, nworkers, **kwds)[source]

Bases: galaxy.jobs.runners.pulsar.PulsarJobRunner

Flavor of Puslar job runnner that runs Pulsar’s server code directly within Galaxy.

This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service.

destination_defaults = {'rewrite_parameters': 'true', 'dependency_resolution': 'remote', 'default_file_action': 'copy'}
default_build_pulsar_app = True

galaxy.jobs.runners.slurm module

SLURM job control via the DRMAA API.

class galaxy.jobs.runners.slurm.SlurmJobRunner(app, nworkers, **kwargs)[source]

Bases: galaxy.jobs.runners.drmaa.DRMAAJobRunner

runner_name = 'SlurmRunner'
restrict_job_name_length = False

galaxy.jobs.runners.state_handler_factory module

galaxy.jobs.runners.state_handler_factory.build_state_handlers()[source]

galaxy.jobs.runners.tasks module

class galaxy.jobs.runners.tasks.TaskedJobRunner(app, nworkers)[source]

Bases: galaxy.jobs.runners.BaseJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'TaskRunner'
__init__(app, nworkers)[source]

Start the job runner with ‘nworkers’ worker threads

queue_job(job_wrapper)[source]
stop_job(job)[source]
recover(job, job_wrapper)[source]