Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
galaxy.jobs.runners package
Base classes for job runner plugins.
- class galaxy.jobs.runners.RunnerParams(specs=None, params=None)[source]
Bases:
ParamsWithSpecs
- class galaxy.jobs.runners.BaseJobRunner(app: GalaxyManagerApplication, nworkers: int, **kwargs)[source]
Bases:
object
- runner_name = 'BaseJobRunner'
- start_methods = ['_init_monitor_thread', '_init_worker_threads']
- DEFAULT_SPECS = {'recheck_missing_job_retries': {'default': 0, 'map': <class 'int'>, 'valid': <function BaseJobRunner.<lambda>>}}
- __init__(app: GalaxyManagerApplication, nworkers: int, **kwargs)[source]
Start the job runner
- put(job_wrapper: MinimalJobWrapper)[source]
Add a job to the queue (by job identifier), indicate that the job is ready to run.
- url_to_destination(url: str)[source]
Convert a legacy URL to a JobDestination.
Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.
- parse_destination_params(params: Dict[str, Any])[source]
Parse the JobDestination
params
dict and return the runner’s native representation of those params.
- prepare_job(job_wrapper: MinimalJobWrapper, include_metadata: bool = False, include_work_dir_outputs: bool = True, modify_command_for_container: bool = True, stream_stdout_stderr: bool = False)[source]
Some sanity checks that all runners’ queue_job() methods are likely to want to do
- build_command_line(job_wrapper: MinimalJobWrapper, include_metadata: bool = False, include_work_dir_outputs: bool = True, modify_command_for_container: bool = True, stream_stdout_stderr=False)[source]
- get_work_dir_outputs(job_wrapper: MinimalJobWrapper, job_working_directory: str | None = None, tool_working_directory: str | None = None)[source]
Returns list of pairs (source_file, destination) describing path to work_dir output file and ultimate destination.
- class galaxy.jobs.runners.JobState(job_wrapper: JobWrapper, job_destination: JobDestination)[source]
Bases:
object
Encapsulate state of jobs.
- runner_states = <galaxy.util.bunch.Bunch object>
- __init__(job_wrapper: JobWrapper, job_destination: JobDestination)[source]
- class galaxy.jobs.runners.AsynchronousJobState(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]
Bases:
JobState
Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed to communicate with distributed resource manager.
- __init__(files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None)[source]
- property running
- class galaxy.jobs.runners.AsynchronousJobRunner(app, nworkers, **kwargs)[source]
Bases:
BaseJobRunner
,Monitors
Parent class for any job runner that runs jobs asynchronously (e.g. via a distributed resource manager). Provides general methods for having a thread to monitor the state of asynchronous jobs and submitting those jobs to the correct methods (queue, finish, cleanup) at appropriate times..
- monitor()[source]
Watches jobs currently in the monitor queue and deals with state changes (queued to running) and job completion.
- check_watched_items()[source]
This method is responsible for iterating over self.watched and handling state changes and updating self.watched with a new list of watched job states. Subclasses can opt to override this directly (as older job runners will initially) or just override check_watched_item and allow the list processing to reuse the logic here.
- finish_job(job_state: AsynchronousJobState)[source]
Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.
Subpackages
- galaxy.jobs.runners.state_handlers package
- galaxy.jobs.runners.util package
kill_pid()
- Subpackages
- Submodules
- galaxy.jobs.runners.util.env module
- galaxy.jobs.runners.util.external module
- galaxy.jobs.runners.util.kill module
- galaxy.jobs.runners.util.process_groups module
- galaxy.jobs.runners.util.pykube_util module
ensure_pykube()
find_service_object_by_name()
find_ingress_object_by_name()
find_job_object_by_name()
find_pod_object_by_name()
galaxy_instance_id()
is_pod_running()
is_pod_unschedulable()
job_object_dict()
service_object_dict()
ingress_object_dict()
produce_k8s_job_prefix()
pull_policy()
pykube_client_from_dict()
delete_job()
delete_service()
delete_ingress()
get_volume_mounts_for_job()
parse_pvc_param_line()
- galaxy.jobs.runners.util.sudo module
Submodules
galaxy.jobs.runners.chronos module
- class galaxy.jobs.runners.chronos.ChronosJobRunner(app, nworkers, **kwargs)[source]
Bases:
AsynchronousJobRunner
- runner_name = 'ChronosRunner'
- RUNNER_PARAM_SPEC_KEY = 'runner_param_specs'
- JOB_NAME_PREFIX = 'galaxy-chronos-'
- RUNNER_PARAM_SPEC = {'chronos': {'map': <class 'str'>}, 'insecure': {'default': True, 'map': <function ChronosJobRunner.<lambda>>}, 'owner': {'map': <class 'str'>}, 'password': {'map': <class 'str'>}, 'username': {'map': <class 'str'>}}
- DESTINATION_PARAMS_SPEC = {'docker_cpu': {'default': 0.1, 'map': <class 'float'>, 'map_name': 'cpus'}, 'docker_disk': {'default': 256, 'map': <class 'int'>, 'map_name': 'disk'}, 'docker_memory': {'default': 128, 'map': <class 'int'>, 'map_name': 'mem'}, 'max_retries': {'default': 2, 'map': <class 'int'>, 'map_name': 'retries'}, 'volumes': {'default': None, 'map': <function ChronosJobRunner.<lambda>>, 'map_name': 'container/volumes'}}
- finish_job(job_state)[source]
Get the output/error for a finished job, pass to job_wrapper.finish and cleanup all the job’s temporary files.
galaxy.jobs.runners.cli module
Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).
- class galaxy.jobs.runners.cli.ShellJobRunner(app, nworkers)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'ShellRunner'
- url_to_destination(url)[source]
Convert a legacy URL to a JobDestination.
Job runner URLs are deprecated, JobDestinations should be used instead. This base class method converts from a URL to a very basic JobDestination without destination params.
- parse_destination_params(params)[source]
Parse the JobDestination
params
dict and return the runner’s native representation of those params.
- submit(shell, job_interface, job_file, galaxy_id_tag, retry=3, timeout=10)[source]
Handles actual job script submission.
If submission fails will retry retry time with a timeout of timeout seconds. Retuns the returncode of the submission and the stdout, which contains the external job_id.
galaxy.jobs.runners.condor module
Job control via the Condor DRM via CLI.
This plugin has been used in production and isn’t unstable but shouldn’t be taken as an example of how to write Galaxy job runners that interface with a DRM using command-line invocations. When writing new job runners that leverage command-line calls for submitting and checking the status of jobs please check out the CLI runner (cli.py in this directory) start by writing a new job plugin in for that (see examples in /galaxy/jobs/runners/util/cli/job). That approach will result in less boilerplate and allow greater reuse of the DRM specific hooks you’ll need to write. Ideally this plugin would have been written to target that framework, but we don’t have the bandwidth to rewrite it at this time.
- class galaxy.jobs.runners.condor.CondorJobRunner(app, nworkers, **kwargs)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'CondorRunner'
galaxy.jobs.runners.drmaa module
Job control via the DRMAA API.
- class galaxy.jobs.runners.drmaa.DRMAAJobRunner(app, nworkers, **kwargs)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'DRMAARunner'
- restrict_job_name_length = 15
- check_watched_item(ajs, new_watched)[source]
look at a single watched job, determine its state, and deal with errors that could happen in this process. to be called from check_watched_items() returns the state or None if exceptions occurred in the latter case the job is appended to new_watched if a
1 drmaa.InternalException, 2 drmaa.InvalidJobExceptionnot, or 3 drmaa.DrmCommunicationException occurred
(which causes the job to be tested again in the next iteration of check_watched_items)
the job is finished as errored if any other exception occurs
the job is finished OK or errored after the maximum number of retries depending on the exception
Note that None is returned in all cases where the loop in check_watched_items is to be continued
- check_watched_items()[source]
Called by the monitor thread to look at each watched job and deal with state changes.
- recover(job, job_wrapper)[source]
Recovers jobs stuck in the queued/running state when Galaxy started
galaxy.jobs.runners.godocker module
- class galaxy.jobs.runners.godocker.GodockerJobRunner(app, nworkers, **kwargs)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'GodockerJobRunner'
- __init__(app, nworkers, **kwargs)[source]
1: Get runner_param_specs from the job config 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads
- check_watched_item(job_state)[source]
- Get the job current status from GoDocker
using job_id and update the status in galaxy.
- If the job execution is successful, call
mark_as_finished() and return ‘None’ to galaxy.
- else if the job failed, call mark_as_failed()
and return ‘None’ to galaxy.
- else if the job is running or in pending state, simply
return the ‘AsynchronousJobState object’ (job_state).
- recover(job, job_wrapper)[source]
Recovers jobs stuck in the queued/running state when Galaxy started
- create_log_file(job_state, job_status_god)[source]
Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful
- login(apikey, login, server, noCert=False)[source]
Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method.
- post_task(job_wrapper)[source]
Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method.
- get_task(job_id)[source]
Get job details from GoDocker and return the job. Pass job_id to the http_get_request method.
- task_suspend(job_id)[source]
Suspend actively running job in galaxy. Pass job_id to the http_get_request method.
galaxy.jobs.runners.kubernetes module
Offload jobs to a Kubernetes cluster.
- class galaxy.jobs.runners.kubernetes.KubernetesJobRunner(app, nworkers, **kwargs)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'KubernetesRunner'
- LABEL_START = re.compile('^[A-Za-z0-9]')
- LABEL_END = re.compile('[A-Za-z0-9]$')
- LABEL_REGEX = re.compile('[^-A-Za-z0-9_.]')
- check_watched_item(job_state)[source]
Checks the state of a job already submitted on k8s. Job state is an AsynchronousJobState
galaxy.jobs.runners.local module
Job runner plugin for executing jobs on the local system via the command line.
- class galaxy.jobs.runners.local.LocalJobRunner(app, nworkers)[source]
Bases:
BaseJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'LocalRunner'
galaxy.jobs.runners.pbs module
- class galaxy.jobs.runners.pbs.PBSJobRunner(app, nworkers)[source]
Bases:
AsynchronousJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'PBSRunner'
- property default_pbs_server
- parse_destination_params(params)[source]
A wrapper method around __args_to_attrs() that allow administrators to define PBS params as either command-line options (as in
qsub(1B)
) or more human-readable “long” args (as inpbs_submit(3B)
).- Returns:
list of dicts – The dicts map directly to pbs attropl structs (see
pbs_submit(3B)
)
- check_watched_items()[source]
Called by the monitor thread to look at each watched job and deal with state changes.
- check_all_jobs()[source]
Returns a list of servers that failed to be contacted and a dict of “job_id : status” pairs (where status is a bunchified version of the API’s structure.
- check_single_job(pbs_server_name, job_id)[source]
Returns the state of a single job, used to make sure a job is really dead.
galaxy.jobs.runners.pulsar module
Job runner used to execute Galaxy jobs through Pulsar.
More information on Pulsar can be found at https://pulsar.readthedocs.io/ .
- class galaxy.jobs.runners.pulsar.PulsarLegacyJobRunner(app, nworkers, **kwds)[source]
Bases:
PulsarJobRunner
Flavor of Pulsar job runner mimicking behavior of old LWR runner.
- destination_defaults = {'dependency_resolution': 'local', 'rewrite_parameters': 'false'}
- class galaxy.jobs.runners.pulsar.PulsarRESTJobRunner(app, nworkers, **kwds)[source]
Bases:
PulsarJobRunner
Flavor of Pulsar job runner with sensible defaults for RESTful usage.
- destination_defaults = {'default_file_action': 'transfer', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true', 'url': <object object>}
- class galaxy.jobs.runners.pulsar.PulsarMQJobRunner(app, nworkers, **kwds)[source]
Bases:
PulsarJobRunner
Flavor of Pulsar job runner with sensible defaults for message queue communication.
- use_mq = True
- poll = False
- destination_defaults = {'default_file_action': 'remote_transfer', 'dependency_resolution': 'remote', 'jobs_directory': <object object>, 'private_token': <object object>, 'rewrite_parameters': 'true', 'url': <object object>}
- class galaxy.jobs.runners.pulsar.PulsarEmbeddedJobRunner(app, nworkers, **kwds)[source]
Bases:
PulsarJobRunner
Flavor of Puslar job runner that runs Pulsar’s server code directly within Galaxy.
This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service.
- destination_defaults = {'default_file_action': 'copy', 'dependency_resolution': 'remote', 'rewrite_parameters': 'true'}
- default_build_pulsar_app = True
- class galaxy.jobs.runners.pulsar.PulsarEmbeddedMQJobRunner(app, nworkers, **kwds)[source]
Bases:
PulsarMQJobRunner
- default_build_pulsar_app = True
galaxy.jobs.runners.slurm module
SLURM job control via the DRMAA API.
- class galaxy.jobs.runners.slurm.SlurmJobRunner(app, nworkers, **kwargs)[source]
Bases:
DRMAAJobRunner
- runner_name = 'SlurmRunner'
- restrict_job_name_length = False
galaxy.jobs.runners.state_handler_factory module
galaxy.jobs.runners.tasks module
- class galaxy.jobs.runners.tasks.TaskedJobRunner(app, nworkers)[source]
Bases:
BaseJobRunner
Job runner backed by a finite pool of worker threads. FIFO scheduling
- runner_name = 'TaskRunner'
galaxy.jobs.runners.univa module
Job control via the DRMAA API / qstat and qacct.
Known to work on the UNIVA grid engine.
known bugs/problems:
if a job runs longer than the time limits of the queue two things happen
at the soft limit (s_rt) SIGUSR1 is sent to the job
at the hard limit (h_rt) SIGKILL is sent to the job
The second case is covered by the runner – it’s the same mechanism that kills a job when the job time limit is reached. The first case is currently not covered. The good thing is that most programs ignore SIGUSR1. For the second case it seems that jobs are marked as failed (killed by the DRM) at random.
Possible solutions:
configure the job destinations such that the queue limits are never reached. the time limits can be determined with for i in `qconf -sql; do qconf -sq $i | grep -E ‘(h|s)_rt’; done`
extend the job runner to cover s_rt cases (some ideas):
I’m unsure if the programs reaction to SIGUSR1 can be determined easily because it depends on the program. It seems that exit code is in most cases 10.
The sheduler logs contains quite useful information.