galaxy.jobs package

Support for running a tool in Galaxy via an internal job management system

class galaxy.jobs.ComputeEnvironment[source]

Bases: object

Definition of the job as it will be run on the (potentially) remote compute server.

config_directory()[source]

Directory containing config files (potentially remote)

input_paths()[source]

Input DatasetPaths defined by job.

new_file_path()[source]

Absolute path to dump new files for this job on compute server.

output_paths()[source]

Output DatasetPaths defined by job.

sep()[source]

os.path.sep for the platform this job will execute in.

tool_directory()[source]

Absolute path to tool files for this job on compute server.

unstructured_path_rewriter()[source]

Return a function that takes in a value, determines if it is path to be rewritten (will be passed non-path values as well - onus is on this function to determine both if its input is a path and if it should be rewritten.)

version_path()[source]

Location of the version file for the underlying tool.

working_directory()[source]

Job working directory (potentially remote)

class galaxy.jobs.HasResourceParameters[source]
get_resource_parameters(job=None)[source]
class galaxy.jobs.JobConfiguration(app)[source]

Bases: object

A parser and interface to advanced job management features.

These features are configured in the job configuration, by default, job_conf.xml

DEFAULT_NWORKERS = 4
JOB_RESOURCE_CONDITIONAL_XML = '<conditional name="__job_resource">\n <param name="__job_resource__select" type="select" label="Job Resource Parameters">\n <option value="no">Use default job resource parameters</option>\n <option value="yes">Specify job resource parameters</option>\n </param>\n <when value="no"/>\n <when value="yes"/>\n </conditional>'
__init__(app)[source]

Parse the job configuration XML.

convert_legacy_destinations(job_runners)[source]

Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL.

Parameters:job_runners (list of job runner plugins) – All loaded job runner plugins.
default_job_tool_configuration

The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination.

Returns:JobToolConfiguration – a representation of a <tool> element that uses the default handler and destination
get_destination(id_or_tag)[source]

Given a destination ID or tag, return the JobDestination matching the provided ID or tag

Parameters:id_or_tag (str) – A destination ID or tag.
Returns:JobDestination – A valid destination

Destinations are deepcopied as they are expected to be passed in to job runners, which will modify them for persisting params set at runtime.

get_destinations(id_or_tag)[source]

Given a destination ID or tag, return all JobDestinations matching the provided ID or tag

Parameters:id_or_tag (str) – A destination ID or tag.
Returns:list or tuple of JobDestinations

Destinations are not deepcopied, so they should not be passed to anything which might modify them.

get_handler(id_or_tag)[source]

Given a handler ID or tag, return the provided ID or an ID matching the provided tag

Parameters:id_or_tag (str) – A handler ID or tag.
Returns:str – A valid job handler ID.
get_job_runner_plugins(handler_id)[source]

Load all configured job runner plugins

Returns:list of job runner plugins
get_job_tool_configurations(ids)[source]

Get all configured JobToolConfigurations for a tool ID, or, if given a list of IDs, the JobToolConfigurations for the first id in ids matching a tool definition.

Note

You should not mix tool shed tool IDs, versionless tool shed IDs, and tool config tool IDs that refer to the same tool.

Parameters:ids (list or str.) – Tool ID or IDs to fetch the JobToolConfiguration of.
Returns:list – JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s).

Example tool ID strings include:

  • Full tool shed id: toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0
  • Tool shed id less version: toolshed.example.org/repos/nate/filter_tool_repo/filter_tool
  • Tool config tool id: filter_tool
get_tool_resource_xml(tool_id, tool_type)[source]

Given a tool id, return XML elements describing parameters to insert into job resources.

Tool id:A tool ID (a string)
Tool type:A tool type (a string)
Returns:List of parameter elements.
is_handler(server_name)[source]

Given a server name, indicate whether the server is a job handler

Parameters:server_name (str) – The name to check
Returns:bool
is_id(collection)[source]

Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID

Parameters:collection (tuple or list) – A representation of a destination or handler
Returns:bool
is_tag(collection)[source]

Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID

Parameters:collection (tuple or list) – A representation of a destination or handler
Returns:bool
class galaxy.jobs.JobDestination(**kwds)[source]

Bases: galaxy.util.bunch.Bunch

Provides details about where a job runs

__init__(**kwds)[source]
class galaxy.jobs.JobToolConfiguration(**kwds)[source]

Bases: galaxy.util.bunch.Bunch

Provides details on what handler and destination a tool should use

A JobToolConfiguration will have the required attribute ‘id’ and optional attributes ‘handler’, ‘destination’, and ‘params’

__init__(**kwds)[source]
get_resource_group()[source]
class galaxy.jobs.JobWrapper(job, queue, use_persisted_destination=False)[source]

Bases: object, galaxy.jobs.HasResourceParameters

Wraps a ‘model.Job’ with convenience methods for running processes and state management.

__init__(job, queue, use_persisted_destination=False)[source]
can_split()[source]
change_ownership_for_run()[source]
change_state(state, info=False, flush=True, job=None)[source]
check_limits(runtime=None)[source]
check_tool_output(stdout, stderr, tool_exit_code, job)[source]
cleanup(delete_files=True)[source]
cleanup_job

Remove the job after it is complete, should return “always”, “onsuccess”, or “never”.

clear_working_directory()[source]
commands_in_new_shell
compute_outputs()[source]
default_compute_environment(job=None)[source]
disable_commands_in_new_shell()[source]

Provide an extension point to disable this isolation, Pulsar builds its own job script so this is not needed for remote jobs.

fail(message, exception=False, stdout='', stderr='', exit_code=None)[source]

Indicate job failure by setting state and message on all output datasets.

finish(stdout, stderr, tool_exit_code=None, remote_working_directory=None, remote_metadata_directory=None)[source]

Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and the contents of the output files.

galaxy_lib_dir
galaxy_system_pwent
galaxy_virtual_env
get_command_line()[source]
get_dataset_finish_context(job_context, dataset)[source]
get_destination_configuration(key, default=None)[source]

Get a destination parameter that can be defaulted back in app.config if it needs to be applied globally.

get_env_setup_clause()[source]
get_id_tag()[source]
get_input_dataset_fnames(ds)[source]
get_input_fnames()[source]
get_input_paths(job=None)[source]
get_job()[source]
get_job_runner()
get_job_runner_url()[source]
get_mutable_output_fnames()[source]
get_output_destination(output_path)[source]

Destination for outputs marked as from_work_dir. This is the normal case, just copy these files directly to the ulimate destination.

get_output_file_id(file)[source]
get_output_fnames()[source]
get_output_hdas_and_fnames()[source]
get_output_sizes()[source]
get_parallelism()[source]
get_param_dict()[source]

Restore the dictionary of parameters from the database.

get_session_id()[source]
get_state()[source]
get_tool_provided_job_metadata()[source]
get_version_string_path()[source]
has_limits()[source]
invalidate_external_metadata()[source]
is_ready_for_resubmission(job=None)[source]
job_destination

Return the JobDestination that this job will use to run. This will either be a configured destination, a randomly selected destination if the configured destination was a tag, or a dynamically generated destination from the dynamic runner.

Calling this method for the first time causes the dynamic runner to do its calculation, if any.

Returns:JobDestination
mark_as_resubmitted(info=None)[source]
pause(job=None, message=None)[source]
prepare(compute_environment=None)[source]

Prepare the job to run by creating the working directory and the config files.

reclaim_ownership()[source]
requires_containerization
requires_setting_metadata
set_job_destination(job_destination, external_id=None, flush=True, job=None)[source]

Persist job destination params in the database for recovery.

self.job_destination is not used because a runner may choose to rewrite parts of the destination (e.g. the params).

set_runner(runner_url, external_id)[source]
setup_external_metadata(exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, resolve_metadata_dependencies=False, set_extension=True, **kwds)[source]
shell
strict_shell
user
user_system_pwent
class galaxy.jobs.NoopQueue[source]

Bases: object

Implements the JobQueue / JobStopQueue interface but does nothing

put(*args, **kwargs)[source]
put_stop(*args)[source]
shutdown()[source]
class galaxy.jobs.ParallelismInfo(tag)[source]

Bases: object

Stores the information (if any) for running multiple instances of the tool in parallel on the same set of inputs.

__init__(tag)[source]
class galaxy.jobs.SharedComputeEnvironment(job_wrapper, job)[source]

Bases: galaxy.jobs.SimpleComputeEnvironment

Default ComputeEnviornment for job and task wrapper to pass to ToolEvaluator - valid when Galaxy and compute share all the relevant file systems.

__init__(job_wrapper, job)[source]
input_paths()[source]
new_file_path()[source]
output_paths()[source]
tool_directory()[source]
version_path()[source]
working_directory()[source]
class galaxy.jobs.SimpleComputeEnvironment[source]

Bases: object

config_directory()[source]
sep()[source]
unstructured_path_rewriter()[source]
class galaxy.jobs.TaskWrapper(task, queue)[source]

Bases: galaxy.jobs.JobWrapper

Extension of JobWrapper intended for running tasks. Should be refactored into a generalized executable unit wrapper parent, then jobs and tasks.

__init__(task, queue)[source]
can_split()[source]
change_state(state, info=False, flush=True, job=None)[source]
cleanup(delete_files=True)[source]
fail(message, exception=False)[source]
finish(stdout, stderr, tool_exit_code=None)[source]

Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and the contents of the output files.

get_command_line()[source]
get_dataset_finish_context(job_context, dataset)[source]
get_exit_code()[source]
get_id_tag()[source]
get_job()[source]
get_output_destination(output_path)[source]

Destination for outputs marked as from_work_dir. These must be copied with the same basenme as the path for the ultimate output destination. This is required in the task case so they can be merged.

get_output_file_id(file)[source]
get_param_dict()[source]

Restore the dictionary of parameters from the database.

get_session_id()[source]
get_state()[source]
get_task()[source]
get_tool_provided_job_metadata()[source]
prepare(compute_environment=None)[source]

Prepare the job to run by creating the working directory and the config files.

set_runner(runner_url, external_id)[source]
setup_external_metadata(exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds)[source]
galaxy.jobs.config_exception(e, file)[source]

Subpackages

Submodules

galaxy.jobs.command_factory module

galaxy.jobs.command_factory.build_command(runner, job_wrapper, container=None, modify_command_for_container=True, include_metadata=False, include_work_dir_outputs=True, create_tool_working_directory=True, remote_command_params={}, metadata_directory=None)[source]

Compose the sequence of commands necessary to execute a job. This will currently include:

  • environment settings corresponding to any requirement tags
  • preparing input files
  • command line taken from job wrapper
  • commands to set metadata (if include_metadata is True)

galaxy.jobs.datasets module

Utility classes allowing Job interface to reason about datasets.

class galaxy.jobs.datasets.DatasetPath(dataset_id, real_path, false_path=None, false_extra_files_path=None, mutable=True)[source]

Bases: object

__init__(dataset_id, real_path, false_path=None, false_extra_files_path=None, mutable=True)[source]
with_path_for_job(false_path, false_extra_files_path=None)[source]

Clone the dataset path but with a new false_path.

class galaxy.jobs.datasets.DatasetPathRewriter[source]

Bases: object

Used by runner to rewrite paths.

rewrite_dataset_path(dataset, dataset_type)[source]

Dataset type is ‘input’ or ‘output’. Return None to indicate not to rewrite this path.

class galaxy.jobs.datasets.NullDatasetPathRewriter[source]

Bases: object

Used by default for jobwrapper, do not rewrite anything.

rewrite_dataset_path(dataset, dataset_type)[source]

Keep path the same.

class galaxy.jobs.datasets.OutputsToWorkingDirectoryPathRewriter(working_directory)[source]

Bases: object

Rewrites all paths to place them in the specified working directory for normal jobs when Galaxy is configured with app.config.outputs_to_working_directory. Job runner base class is responsible for copying these out after job is complete.

__init__(working_directory)[source]
rewrite_dataset_path(dataset, dataset_type)[source]

Keep path the same.

class galaxy.jobs.datasets.TaskPathRewriter(working_directory, job_dataset_path_rewriter)[source]

Bases: object

Rewrites all paths to place them in the specified working directory for TaskWrapper. TaskWrapper is responsible for putting them there and pulling them out.

__init__(working_directory, job_dataset_path_rewriter)[source]
rewrite_dataset_path(dataset, dataset_type)[source]
galaxy.jobs.datasets.dataset_path_rewrites(dataset_paths)[source]

galaxy.jobs.error_level module

class galaxy.jobs.error_level.StdioErrorLevel[source]

Bases: object

FATAL = 3
LOG = 1
MAX = 3
NO_ERROR = 0
WARNING = 2
static desc(error_level)[source]
descs = {0: 'No error', 1: 'Log', 2: 'Warning', 3: 'Fatal error'}

galaxy.jobs.handler module

Galaxy job handler, prepares, runs, tracks, and finishes Galaxy jobs

class galaxy.jobs.handler.DefaultJobDispatcher(app)[source]

Bases: object

__init__(app)[source]
put(job_wrapper)[source]
recover(job, job_wrapper)[source]
shutdown()[source]
stop(job)[source]

Stop the given job. The input variable job may be either a Job or a Task.

url_to_destination(url)[source]

This is used by the runner mapper (a.k.a. dynamic runner) and recovery methods to have runners convert URLs to destinations.

New-style runner plugin IDs must match the URL’s scheme for this to work.

class galaxy.jobs.handler.JobHandler(app)[source]

Bases: object

Handle the preparation, running, tracking, and finishing of jobs

__init__(app)[source]
shutdown()[source]
start()[source]
class galaxy.jobs.handler.JobHandlerQueue(app, dispatcher)[source]

Bases: object

Job Handler’s Internal Queue, this is what actually implements waiting for jobs to be runnable and dispatching to a JobRunner.

STOP_SIGNAL = <object object>
__init__(app, dispatcher)[source]

Initializes the Job Handler Queue, creates (unstarted) monitoring thread

get_total_job_count_per_destination()[source]
get_user_job_count(user_id)[source]
get_user_job_count_per_destination(user_id)[source]
increase_running_job_count(user_id, destination_id)[source]
job_pair_for_id(id)[source]
job_wrapper(job, use_persisted_destination=False)[source]
put(job_id, tool_id)[source]

Add a job to the queue (by job identifier)

shutdown()[source]

Attempts to gracefully shut down the worker thread

start()[source]

Starts the JobHandler’s thread after checking for any unhandled jobs.

class galaxy.jobs.handler.JobHandlerStopQueue(app, dispatcher)[source]

Bases: object

A queue for jobs which need to be terminated prematurely.

STOP_SIGNAL = <object object>
__init__(app, dispatcher)[source]
monitor()[source]

Continually iterate the waiting jobs, stop any that are found.

monitor_step()[source]

Called repeatedly by monitor to stop jobs.

put(job_id, error_msg=None)[source]
shutdown()[source]

Attempts to gracefully shut down the worker thread

galaxy.jobs.manager module

Top-level Galaxy job manager, moves jobs to handler(s)

class galaxy.jobs.manager.JobManager(app)[source]

Bases: object

Highest level interface to job management.

TODO: Currently the app accesses “job_queue” and “job_stop_queue” directly.
This should be decoupled.
__init__(app)[source]
shutdown()[source]
start()[source]
class galaxy.jobs.manager.NoopHandler(*args, **kwargs)[source]

Bases: object

__init__(*args, **kwargs)[source]
shutdown(*args)[source]
start()[source]

galaxy.jobs.mapper module

exception galaxy.jobs.mapper.JobMappingException(failure_message)[source]

Bases: exceptions.Exception

__init__(failure_message)[source]
exception galaxy.jobs.mapper.JobNotReadyException(job_state=None, message=None)[source]

Bases: exceptions.Exception

__init__(job_state=None, message=None)[source]
class galaxy.jobs.mapper.JobRunnerMapper(job_wrapper, url_to_destination, job_config)[source]

Bases: object

This class is responsible to managing the mapping of jobs (in the form of job_wrappers) to job runner url strings.

__init__(job_wrapper, url_to_destination, job_config)[source]
cache_job_destination(raw_job_destination)[source]
get_job_destination(params)[source]

Cache the job_destination to avoid recalculation.

galaxy.jobs.output_checker module

galaxy.jobs.output_checker.check_output(tool, stdout, stderr, tool_exit_code, job)[source]

Check the output of a tool - given the stdout, stderr, and the tool’s exit code, return True if the tool exited succesfully and False otherwise. No exceptions should be thrown. If this code encounters an exception, it returns True so that the workflow can continue; otherwise, a bug in this code could halt workflow progress.

Note that, if the tool did not define any exit code handling or any stdio/stderr handling, then it reverts back to previous behavior: if stderr contains anything, then False is returned.

Note that the job id is just for messages.

galaxy.jobs.rule_helper module

class galaxy.jobs.rule_helper.RuleHelper(app)[source]

Bases: object

Utility to allow job rules to interface cleanly with the rest of Galaxy and shield them from low-level details of models, metrics, etc....

Currently focus is on figuring out job statistics for a given user, but could interface with other stuff as well.

__init__(app)[source]
choose_one(lst, hash_value=None)[source]

Choose a random value from supplied list. If hash_value is passed in then every request with that same hash_value would produce the same choice from the supplied list.

job_count(**kwds)[source]
job_hash(job, hash_by=None)[source]

Produce a reproducible hash for the given job on various criteria - for instance if hash_by is “workflow_invocation,history” - all jobs within the same workflow invocation will receive the same hash - for jobs outside of workflows all jobs within the same history will receive the same hash, other jobs will be hashed on job’s id randomly.

Primarily intended for use with choose_one above - to consistent route or schedule related jobs.

metric_query(select, metric_name, plugin, numeric=True)[source]
query(select_expression)[source]
should_burst(destination_ids, num_jobs, job_states=None)[source]

Check if the specified destinations destination_ids have at least num_jobs assigned to it - send in job_state as queued to limit this check to number of jobs queued.

See stock_rules for an simple example of using this function - but to get the most out of it - it should probably be used with custom job rules that can respond to the bursting by allocating resources, launching cloud nodes, etc....

sum_job_runtime(**kwds)[source]
supports_docker(job_or_tool)[source]

Job rules can pass this function a job, job_wrapper, or tool and determine if the underlying tool believes it can be containered.

galaxy.jobs.stock_rules module

Stock job ‘dynamic’ rules for use in job_conf.xml - these may cover some simple use cases but will just proxy into functions in rule_helper so similar functionality - but more tailored and composable can be utilized in custom rules.

galaxy.jobs.stock_rules.burst(rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None)[source]
galaxy.jobs.stock_rules.choose_one(rule_helper, job, destination_ids, hash_by='job')[source]
galaxy.jobs.stock_rules.docker_dispatch(rule_helper, tool, docker_destination_id, default_destination_id)[source]

galaxy.jobs.transfer_manager module

Manage transfers from arbitrary URLs to temporary files. Socket interface for IPC with multiple process configurations.

class galaxy.jobs.transfer_manager.TransferManager(app)[source]

Bases: object

Manage simple data transfers from URLs to temporary locations.

__init__(app)[source]
get_state(transfer_jobs, via_socket=False)[source]
new(path=None, **kwd)[source]
run(transfer_jobs)[source]

This method blocks, so if invoking the transfer manager ever starts taking too long, we should move it to a thread. However, the transfer_manager will either daemonize or return after submitting to a running daemon, so it should be fairly quick to return.

shutdown()[source]