Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

galaxy.jobs.runners package

Base classes for job runner plugins.

class galaxy.jobs.runners.RunnerParams(specs=None, params=None)[source]

Bases: ParamsWithSpecs

class galaxy.jobs.runners.BaseJobRunner(app: GalaxyManagerApplication, nworkers: int, **kwargs)[source]

Bases: object

runner_name = 'BaseJobRunner'
start_methods = ['_init_monitor_thread', '_init_worker_threads']
DEFAULT_SPECS = {'recheck_missing_job_retries': {'default': 0, 'map': <class 'int'>, 'valid': <function BaseJobRunner.<lambda>>}}
__init__(app: GalaxyManagerApplication, nworkers: int, **kwargs)[source]

Start the job runner

start()[source]
run_next()[source]

Run the next item in the work queue (a job waiting to run)

put(job_wrapper: MinimalJobWrapper)[source]

Add a job to the queue (by job identifier), indicate that the job is ready to run.

mark_as_queued(job_wrapper: MinimalJobWrapper)[source]
shutdown()[source]

Attempts to gracefully shut down the worker threads

url_to_destination(url: str)[source]

Convert a legacy URL to a JobDestination.

Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.

parse_destination_params(params: Dict[str, Any])[source]

Parse the JobDestination params dict and return the runner’s native representation of those params.

prepare_job(job_wrapper: MinimalJobWrapper, include_metadata: bool = False, include_work_dir_outputs: bool = True, modify_command_for_container: bool = True, stream_stdout_stderr: bool = False)[source]

Some sanity checks that all runners’ queue_job() methods are likely to want to do

queue_job(job_wrapper: MinimalJobWrapper) None[source]
stop_job(job_wrapper)[source]
recover(job, job_wrapper)[source]
build_command_line(job_wrapper: MinimalJobWrapper, include_metadata: bool = False, include_work_dir_outputs: bool = True, modify_command_for_container: bool = True, stream_stdout_stderr=False)[source]
get_work_dir_outputs(job_wrapper: MinimalJobWrapper, job_working_directory: str | None = None, tool_working_directory: str | None = None)[source]

Returns list of pairs (source_file, destination) describing path to work_dir output file and ultimate destination.

get_job_file(job_wrapper: MinimalJobWrapper, **kwds) str[source]
write_executable_script(path: str, contents: str, job_io: DescribesScriptIntegrityChecks) None[source]
fail_job(job_state: JobState, exception=False, message='Job failed', full_status=None)[source]
mark_as_resubmitted(job_state: JobState, info: str | None = None)[source]
class galaxy.jobs.runners.JobState(job_wrapper: JobWrapper, job_destination: JobDestination)[source]

Bases: object

Encapsulate state of jobs.

runner_states = <galaxy.util.bunch.Bunch object>
__init__(job_wrapper: JobWrapper, job_destination: JobDestination)[source]
property exit_code_file: str
set_defaults(files_dir)[source]
static default_job_file(files_dir, id_tag)[source]
read_exit_code()[source]
cleanup()[source]
class galaxy.jobs.runners.AsynchronousJobState(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]

Bases: JobState

Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed to communicate with distributed resource manager.

__init__(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]
property running
check_limits(runtime=None)[source]
register_cleanup_file_attribute(attribute)[source]
class galaxy.jobs.runners.AsynchronousJobRunner(app, nworkers, **kwargs)[source]

Bases: BaseJobRunner, Monitors

Parent class for any job runner that runs jobs asynchronously (e.g. via a distributed resource manager). Provides general methods for having a thread to monitor the state of asynchronous jobs and submitting those jobs to the correct methods (queue, finish, cleanup) at appropriate times..

__init__(app, nworkers, **kwargs)[source]

Start the job runner

handle_stop()[source]
monitor()[source]

Watches jobs currently in the monitor queue and deals with state changes (queued to running) and job completion.

monitor_job(job_state)[source]
shutdown()[source]

Attempts to gracefully shut down the monitor thread

check_watched_items()[source]

This method is responsible for iterating over self.watched and handling state changes and updating self.watched with a new list of watched job states. Subclasses can opt to override this directly (as older job runners will initially) or just override check_watched_item and allow the list processing to reuse the logic here.

check_watched_item(job_state)[source]
finish_job(job_state: AsynchronousJobState)[source]

Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.

mark_as_finished(job_state)[source]
mark_as_failed(job_state)[source]

Subpackages

Submodules

galaxy.jobs.runners.chronos module

class galaxy.jobs.runners.chronos.ChronosJobRunner(app, nworkers, **kwargs)[source]

Bases: AsynchronousJobRunner

runner_name = 'ChronosRunner'
RUNNER_PARAM_SPEC_KEY = 'runner_param_specs'
JOB_NAME_PREFIX = 'galaxy-chronos-'
RUNNER_PARAM_SPEC = {'chronos': {'map': <class 'str'>}, 'insecure': {'default': True, 'map': <function ChronosJobRunner.<lambda>>}, 'owner': {'map': <class 'str'>}, 'password': {'map': <class 'str'>}, 'username': {'map': <class 'str'>}}
DESTINATION_PARAMS_SPEC = {'docker_cpu': {'default': 0.1, 'map': <class 'float'>, 'map_name': 'cpus'}, 'docker_disk': {'default': 256, 'map': <class 'int'>, 'map_name': 'disk'}, 'docker_memory': {'default': 128, 'map': <class 'int'>, 'map_name': 'mem'}, 'max_retries': {'default': 2, 'map': <class 'int'>, 'map_name': 'retries'}, 'volumes': {'default': None, 'map': <function ChronosJobRunner.<lambda>>, 'map_name': 'container/volumes'}}
__init__(app, nworkers, **kwargs)[source]

Initialize this job runner and start the monitor thread

queue_job(job_wrapper)[source]
stop_job(job_wrapper)[source]
recover(job, job_wrapper)[source]
check_watched_item(job_state)[source]
finish_job(job_state)[source]

Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.

parse_destination_params(params)[source]

Parse the JobDestination params dict and return the runner’s native representation of those params.

write_command(job_wrapper)[source]

galaxy.jobs.runners.cli module

Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).

class galaxy.jobs.runners.cli.ShellJobRunner(app, nworkers)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'ShellRunner'
__init__(app, nworkers)[source]

Start the job runner

get_cli_plugins(shell_params, job_params)[source]
url_to_destination(url)[source]

Convert a legacy URL to a JobDestination.

Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.

parse_destination_params(params)[source]

Parse the JobDestination params dict and return the runner’s native representation of those params.

queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

submit(shell, job_interface, job_file, galaxy_id_tag, retry=3, timeout=10)[source]

Handles actual job script submission.

If submission fails will retry retry time with a timeout of timeout seconds. Retuns the returncode of the submission and the stdout, which contains the external job_id.

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

handle_metadata_externally(ajs)[source]
stop_job(job_wrapper)[source]

Attempts to delete a dispatched job

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.condor module

Job control via the Condor DRM via CLI.

This plugin has been used in production and isn’t unstable but shouldn’t be taken as an example of how to write Galaxy job runners that interface with a DRM using command-line invocations. When writing new job runners that leverage command-line calls for submitting and checking the status of jobs please check out the CLI runner (cli.py in this directory) start by writing a new job plugin in for that (see examples in /galaxy/jobs/runners/util/cli/job). That approach will result in less boilerplate and allow greater reuse of the DRM specific hooks you’ll need to write. Ideally this plugin would have been written to target that framework, but we don’t have the bandwidth to rewrite it at this time.

class galaxy.jobs.runners.condor.CondorJobRunner(app, nworkers, **kwargs)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'CondorRunner'
queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

stop_job(job_wrapper)[source]

Attempts to delete a job from the DRM queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.drmaa module

Job control via the DRMAA API.

class galaxy.jobs.runners.drmaa.DRMAAJobRunner(app, nworkers, **kwargs)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'DRMAARunner'
restrict_job_name_length = 15
__init__(app, nworkers, **kwargs)[source]

Start the job runner

url_to_destination(url)[source]

Convert a legacy URL to a job destination

get_native_spec(url)[source]

Get any native DRM arguments specified by the site configuration

queue_job(job_wrapper)[source]

Create job script and submit it to the DRM

check_watched_item(ajs, new_watched)[source]

look at a single watched job, determine its state, and deal with errors that could happen in this process. to be called from check_watched_items() returns the state or None if exceptions occurred in the latter case the job is appended to new_watched if a

1 drmaa.InternalException, 2 drmaa.InvalidJobExceptionnot, or 3 drmaa.DrmCommunicationException occurred

(which causes the job to be tested again in the next iteration of check_watched_items)

  • the job is finished as errored if any other exception occurs

  • the job is finished OK or errored after the maximum number of retries depending on the exception

Note that None is returned in all cases where the loop in check_watched_items is to be continued

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

stop_job(job_wrapper)[source]

Attempts to delete a job from the DRM queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

store_jobtemplate(job_wrapper, jt)[source]

Stores the content of a DRMAA JobTemplate object in a file as a JSON string.

external_runjob(external_runjob_script, jobtemplate_filename, username)[source]

runs an external script that will QSUB a new job. The external script needs to be run with sudo, and will setuid() to the specified user. Effectively, will QSUB as a different user (than the one used by Galaxy).

galaxy.jobs.runners.godocker module

class galaxy.jobs.runners.godocker.GodockerJobRunner(app, nworkers, **kwargs)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'GodockerJobRunner'
__init__(app, nworkers, **kwargs)[source]

1: Get runner_param_specs from the job config 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads

queue_job(job_wrapper)[source]

Create job script and submit it to godocker

check_watched_item(job_state)[source]
Get the job current status from GoDocker

using job_id and update the status in galaxy.

If the job execution is successful, call

mark_as_finished() and return ‘None’ to galaxy.

else if the job failed, call mark_as_failed()

and return ‘None’ to galaxy.

else if the job is running or in pending state, simply

return the ‘AsynchronousJobState object’ (job_state).

stop_job(job_wrapper)[source]

Attempts to delete a dispatched executing Job in GoDocker

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

create_log_file(job_state, job_status_god)[source]

Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful

login(apikey, login, server, noCert=False)[source]

Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method.

post_task(job_wrapper)[source]

Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method.

get_task(job_id)[source]

Get job details from GoDocker and return the job. Pass job_id to the http_get_request method.

task_suspend(job_id)[source]

Suspend actively running job in galaxy. Pass job_id to the http_get_request method.

get_task_status(job_id)[source]

Get job status from GoDocker and return the status of job. Pass job_id to http_get_request method.

delete_task(job_id)[source]

Delete a suspended task in GoDocker. Pass job_id to http_delete_request method.

galaxy.jobs.runners.kubernetes module

Offload jobs to a Kubernetes cluster.

class galaxy.jobs.runners.kubernetes.KubernetesJobRunner(app, nworkers, **kwargs)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'KubernetesRunner'
LABEL_START = re.compile('^[A-Za-z0-9]')
LABEL_END = re.compile('[A-Za-z0-9]$')
LABEL_REGEX = re.compile('[^-A-Za-z0-9_.]')
__init__(app, nworkers, **kwargs)[source]

Start the job runner

setup_base_volumes()[source]
queue_job(job_wrapper)[source]

Create job script and submit it to Kubernetes cluster

check_watched_item(job_state)[source]

Checks the state of a job already submitted on k8s. Job state is an AsynchronousJobState

stop_job(job_wrapper)[source]

Attempts to delete a dispatched job to the k8s cluster

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

fail_job(job_state: JobState, exception=False, message='Job failed', full_status=None)[source]
finish_job(job_state)[source]

Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.

galaxy.jobs.runners.local module

Job runner plugin for executing jobs on the local system via the command line.

class galaxy.jobs.runners.local.LocalJobRunner(app, nworkers)[source]

Bases: BaseJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'LocalRunner'
__init__(app, nworkers)[source]

Start the job runner

queue_job(job_wrapper)[source]
stop_job(job_wrapper)[source]
recover(job, job_wrapper)[source]
shutdown()[source]

Attempts to gracefully shut down the worker threads

galaxy.jobs.runners.pbs module

class galaxy.jobs.runners.pbs.PBSJobRunner(app, nworkers)[source]

Bases: AsynchronousJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'PBSRunner'
__init__(app, nworkers)[source]

Start the job runner

property default_pbs_server
url_to_destination(url)[source]

Convert a legacy URL to a job destination

parse_destination_params(params)[source]

A wrapper method around __args_to_attrs() that allow administrators to define PBS params as either command-line options (as in qsub(1B)) or more human-readable “long” args (as in pbs_submit(3B)).

Returns:

list of dicts – The dicts map directly to pbs attropl structs (see pbs_submit(3B))

queue_job(job_wrapper)[source]

Create PBS script for a job and submit it to the PBS queue

check_watched_items()[source]

Called by the monitor thread to look at each watched job and deal with state changes.

check_all_jobs()[source]

Returns a list of servers that failed to be contacted and a dict of “job_id : status” pairs (where status is a bunchified version of the API’s structure.

convert_statjob_to_bunches(statjob_out)[source]
check_single_job(pbs_server_name, job_id)[source]

Returns the state of a single job, used to make sure a job is really dead.

get_stage_in_out(fnames, symlink=False)[source]

Convenience function to create a stagein/stageout list

stop_job(job_wrapper)[source]

Attempts to delete a job from the PBS queue

recover(job, job_wrapper)[source]

Recovers jobs stuck in the queued/running state when Galaxy started

galaxy.jobs.runners.pulsar module

Job runner used to execute Galaxy jobs through Pulsar.

More information on Pulsar can be found at https://pulsar.readthedocs.io/ .

class galaxy.jobs.runners.pulsar.PulsarLegacyJobRunner(app, nworkers, **kwds)[source]

Bases: PulsarJobRunner

Flavor of Pulsar job runner mimicking behavior of old LWR runner.

destination_defaults = {'dependency_resolution': 'local', 'rewrite_parameters': 'false'}
class galaxy.jobs.runners.pulsar.PulsarRESTJobRunner(app, nworkers, **kwds)[source]

Bases: PulsarJobRunner

Flavor of Pulsar job runner with sensible defaults for RESTful usage.

destination_defaults = {'default_file_action': 'transfer', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true', 'url': <object object>}
class galaxy.jobs.runners.pulsar.PulsarMQJobRunner(app, nworkers, **kwds)[source]

Bases: PulsarJobRunner

Flavor of Pulsar job runner with sensible defaults for message queue communication.

use_mq = True
poll = False
destination_defaults = {'default_file_action': 'remote_transfer', 'dependency_resolution': 'remote', 'jobs_directory': <object object>, 'private_token': <object object>, 'rewrite_parameters': 'true', 'url': <object object>}
class galaxy.jobs.runners.pulsar.PulsarEmbeddedJobRunner(app, nworkers, **kwds)[source]

Bases: PulsarJobRunner

Flavor of Puslar job runner that runs Pulsar’s server code directly within Galaxy.

This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service.

destination_defaults = {'default_file_action': 'copy', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true'}
default_build_pulsar_app = True
class galaxy.jobs.runners.pulsar.PulsarEmbeddedMQJobRunner(app, nworkers, **kwds)[source]

Bases: PulsarMQJobRunner

default_build_pulsar_app = True

galaxy.jobs.runners.slurm module

SLURM job control via the DRMAA API.

class galaxy.jobs.runners.slurm.SlurmJobRunner(app, nworkers, **kwargs)[source]

Bases: DRMAAJobRunner

runner_name = 'SlurmRunner'
restrict_job_name_length = False

galaxy.jobs.runners.state_handler_factory module

galaxy.jobs.runners.state_handler_factory.build_state_handlers()[source]

galaxy.jobs.runners.tasks module

class galaxy.jobs.runners.tasks.TaskedJobRunner(app, nworkers)[source]

Bases: BaseJobRunner

Job runner backed by a finite pool of worker threads. FIFO scheduling

runner_name = 'TaskRunner'
__init__(app, nworkers)[source]

Start the job runner with ‘nworkers’ worker threads

queue_job(job_wrapper)[source]
stop_job(job_wrapper)[source]
recover(job, job_wrapper)[source]

galaxy.jobs.runners.univa module

Job control via the DRMAA API / qstat and qacct.

Known to work on the UNIVA grid engine.

known bugs/problems:

  • if a job runs longer than the time limits of the queue two things happen

    1. at the soft limit (s_rt) SIGUSR1 is sent to the job

    2. at the hard limit (h_rt) SIGKILL is sent to the job

    The second case is covered by the runner – it’s the same mechanism that kills a job when the job time limit is reached. The first case is currently not covered. The good thing is that most programs ignore SIGUSR1. For the second case it seems that jobs are marked as failed (killed by the DRM) at random.

    Possible solutions:

    • configure the job destinations such that the queue limits are never reached. the time limits can be determined with for i in `qconf -sql; do qconf -sq $i | grep -E ‘(h|s)_rt’; done`

    • extend the job runner to cover s_rt cases (some ideas):

      • I’m unsure if the programs reaction to SIGUSR1 can be determined easily because it depends on the program. It seems that exit code is in most cases 10.

      • The sheduler logs contains quite useful information.

class galaxy.jobs.runners.univa.UnivaJobRunner(app, nworkers, **kwargs)[source]

Bases: DRMAAJobRunner

runner_name = 'UnivaJobRunner'
check_watched_item(ajs, new_watched)[source]

get state with job_status/qstat

since qstat returns undetermined for finished jobs we return DONE here