Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

galaxy.workflow package

Subpackages

Submodules

galaxy.workflow.extract module

This module contains functionality to aid in extracting workflows from histories.

galaxy.workflow.extract.summarize(trans, history=None)[source]

Return mapping of job description to datasets for active items in supplied history - needed for building workflow from a history.

Formerly call get_job_dict in workflow web controller.

galaxy.workflow.extract.extract_workflow(trans, user, history=None, job_ids=None, dataset_ids=None, dataset_collection_ids=None, workflow_name=None, dataset_names=None, dataset_collection_names=None)[source]

galaxy.workflow.modules module

Modules used in building workflows

class galaxy.workflow.modules.NoReplacement[source]

Bases: object

class galaxy.workflow.modules.ConditionalStepWhen(tool, input_source)[source]

Bases: BooleanToolParameter

name: str
galaxy.workflow.modules.to_cwl(value, hda_references, step)[source]
galaxy.workflow.modules.from_cwl(value, hda_references, progress: WorkflowProgress)[source]
galaxy.workflow.modules.evaluate_value_from_expressions(progress, step, execution_state, extra_step_state)[source]
class galaxy.workflow.modules.WorkflowModule(trans, content_id=None, **kwds)[source]

Bases: object

label: str
type: str
name: str
__init__(trans, content_id=None, **kwds)[source]
classmethod from_dict(trans, d, **kwds)[source]
classmethod from_workflow_step(trans, step, **kwds)[source]
save_to_step(step, detached: bool = False) None[source]
get_type()[source]
get_name()[source]
get_version()[source]
get_content_id()[source]

If this component has an identifier external to the step (such as a tool or another workflow) return the identifier for that content.

get_tooltip(static_path='')[source]
get_state(nested=True)[source]

Return a serializable representation of the persistable state of the step.

get_export_state()[source]
get_tool_state()[source]
recover_state(state, **kwds)[source]

Recover tool state (as self.state and self.state.inputs) from dictionary describing configuration state (potentially from persisted step state).

Sub-classes should supply a default_state method which contains the initial state dict with key, value pairs for all available attributes.

Input parameter state may be a typed dictionary or a tool state dictionary generated by the web client (if from_tool_form in kwds is True).

If classes need to distinguish between typed clean dictionary representations of state and the state generated for tool form (with extra nesting logic in the state for conditionals, un-typed string parameters, etc…) they should override the step_state_to_tool_state method to make this transformation.

step_state_to_tool_state(state)[source]
get_errors(**kwargs)[source]

This returns a step related error message as string or None

get_inputs()[source]

This returns inputs displayed in the workflow editor

get_all_inputs(data_only=False, connectable_only=False)[source]
get_data_inputs()[source]

Get configure time data input descriptions.

get_all_outputs(data_only=False)[source]
get_data_outputs()[source]
get_post_job_actions(incoming)[source]
check_and_update_state()[source]

If the state is not in sync with the current implementation of the module, try to update. Returns a list of messages to be displayed

add_dummy_datasets(connections=None, steps=None)[source]

Replace connected inputs with placeholder/dummy values.

get_config_form(step=None)[source]

Serializes input parameters of a module into input dictionaries.

get_runtime_state() DefaultToolState[source]
get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

compute_runtime_state(trans, step=None, step_updates=None)[source]

Determine the runtime state (potentially different from self.state which describes configuration state). This (again unlike self.state) is currently always a DefaultToolState object.

If step is not None, it will be used to search for default values defined in workflow input steps.

If step_updates is None, this is likely for rendering the run form for instance and no runtime properties are available and state must be solely determined by the default runtime state described by the step.

If step_updates are available they describe the runtime properties supplied by the workflow runner.

encode_runtime_state(step, runtime_state)[source]

Takes the computed runtime state and serializes it during run request creation.

decode_runtime_state(step, runtime_state)[source]

Takes the serialized runtime state and decodes it when running the workflow.

execute(trans, progress: WorkflowProgress, invocation_step, use_cached_job: bool = False) bool | None[source]

Execute the given workflow invocation step.

Use the supplied workflow progress object to track outputs, find inputs, etc….

Return a False if there is additional processing required to on subsequent workflow scheduling runs, None or True means the workflow step executed properly.

do_invocation_step_action(step, action)[source]

Update or set the workflow invocation state action - generic extension point meant to allows users to interact with interactive workflow modules. The action object returned from this method will be attached to the WorkflowInvocationStep and be available the next time the workflow scheduler visits the workflow.

recover_mapping(invocation_step, progress)[source]

Re-populate progress object with information about connections from previously executed steps recorded via invocation_steps.

get_informal_replacement_parameters(step) List[str][source]

Return a list of informal replacement parameters.

If replacement is handled via formal workflow inputs - do not include it in this list.

compute_collection_info(progress: WorkflowProgress, step, all_inputs)[source]

Use get_all_inputs (if implemented) to determine collection mapping for execution.

class galaxy.workflow.modules.SubWorkflowModule(trans, content_id=None, **kwds)[source]

Bases: WorkflowModule

type: str = 'subworkflow'
name: str = 'Subworkflow'
subworkflow: Workflow
__init__(trans, content_id=None, **kwds)[source]
classmethod from_dict(trans, d, **kwds)[source]
classmethod from_workflow_step(trans, step, **kwds)[source]
save_to_step(step, detached=False)[source]
get_name()[source]
get_all_inputs(data_only=False, connectable_only=False)[source]

Get configure time data input descriptions.

get_modules()[source]
property version_changes
check_and_update_state()[source]

If the state is not in sync with the current implementation of the module, try to update. Returns a list of messages to be displayed

get_errors(**kwargs)[source]

This returns a step related error message as string or None

get_all_outputs(data_only=False)[source]
get_post_job_actions(incoming)[source]
get_content_id()[source]

If this component has an identifier external to the step (such as a tool or another workflow) return the identifier for that content.

execute(trans, progress: WorkflowProgress, invocation_step: WorkflowInvocationStep, use_cached_job: bool = False) bool | None[source]

Execute the given workflow step in the given workflow invocation. Use the supplied workflow progress object to track outputs, find inputs, etc…

get_runtime_state()[source]
get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

get_informal_replacement_parameters(step) List[str][source]

Return a list of replacement parameters.

class galaxy.workflow.modules.InputProxy(input, prefixed_name)[source]

Bases: object

Provide InputParameter-interfaces over inputs but renamed for workflow context.

__init__(input, prefixed_name)[source]
property name
property label
to_dict(*args, **kwds)[source]
galaxy.workflow.modules.optional_param(optional)[source]
galaxy.workflow.modules.format_param(trans, formats)[source]
class galaxy.workflow.modules.InputModuleState[source]

Bases: TypedDict

optional: bool
format: List[str]
tag: str
class galaxy.workflow.modules.InputModule(trans, content_id=None, **kwds)[source]

Bases: WorkflowModule

default_optional = False
get_runtime_state()[source]
get_all_inputs(data_only=False, connectable_only=False)[source]
execute(trans, progress: WorkflowProgress, invocation_step, use_cached_job: bool = False) bool | None[source]

Execute the given workflow invocation step.

Use the supplied workflow progress object to track outputs, find inputs, etc….

Return a False if there is additional processing required to on subsequent workflow scheduling runs, None or True means the workflow step executed properly.

recover_mapping(invocation_step: WorkflowInvocationStep, progress: WorkflowProgress)[source]

Re-populate progress object with information about connections from previously executed steps recorded via invocation_steps.

get_export_state()[source]
step_state_to_tool_state(state)[source]
save_to_step(step, detached=False)[source]
label: str
type: str
name: str
class galaxy.workflow.modules.InputDataModule(trans, content_id=None, **kwds)[source]

Bases: InputModule

type: str = 'data_input'
name: str = 'Input dataset'
get_all_outputs(data_only=False)[source]
get_filter_set(connections=None)[source]
get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

get_inputs()[source]

This returns inputs displayed in the workflow editor

label: str
class galaxy.workflow.modules.InputDataCollectionModule(trans, content_id=None, **kwds)[source]

Bases: InputModule

type: str = 'data_collection_input'
name: str = 'Input dataset collection'
default_collection_type = 'list'
collection_type = 'list'
get_inputs()[source]

This returns inputs displayed in the workflow editor

get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

get_all_outputs(data_only=False)[source]
label: str
class galaxy.workflow.modules.InputParameterModule(trans, content_id=None, **kwds)[source]

Bases: WorkflowModule

POSSIBLE_PARAMETER_TYPES = ['text', 'integer', 'float', 'boolean', 'color']
type: str = 'parameter_input'
name: str = 'Input parameter'
default_parameter_type = 'text'
default_optional = False
default_default_value = None
parameter_type = 'text'
optional = False
default_value = None
get_inputs()[source]

This returns inputs displayed in the workflow editor

restrict_options(step, connections: Iterable[WorkflowStepConnection], default_value)[source]
get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

get_runtime_state()[source]
get_all_outputs(data_only=False)[source]
execute(trans, progress: WorkflowProgress, invocation_step, use_cached_job: bool = False) bool | None[source]

Execute the given workflow invocation step.

Use the supplied workflow progress object to track outputs, find inputs, etc….

Return a False if there is additional processing required to on subsequent workflow scheduling runs, None or True means the workflow step executed properly.

step_state_to_tool_state(state)[source]
get_export_state()[source]
save_to_step(step, detached=False)[source]
label: str
class galaxy.workflow.modules.PauseModule(trans, content_id=None, **kwds)[source]

Bases: WorkflowModule

Initially this module will unconditionally pause a workflow - will aim to allow conditional pausing later on.

type: str = 'pause'
name: str = 'Pause for dataset review'
get_all_inputs(data_only=False, connectable_only=False)[source]
get_all_outputs(data_only=False)[source]
get_runtime_state()[source]
execute(trans, progress: WorkflowProgress, invocation_step, use_cached_job: bool = False) bool | None[source]

Execute the given workflow invocation step.

Use the supplied workflow progress object to track outputs, find inputs, etc….

Return a False if there is additional processing required to on subsequent workflow scheduling runs, None or True means the workflow step executed properly.

recover_mapping(invocation_step, progress)[source]

Re-populate progress object with information about connections from previously executed steps recorded via invocation_steps.

do_invocation_step_action(step, action)[source]

Update or set the workflow invocation state action - generic extension point meant to allows users to interact with interactive workflow modules. The action object returned from this method will be attached to the WorkflowInvocationStep and be available the next time the workflow scheduler visits the workflow.

label: str
class galaxy.workflow.modules.ToolModule(trans, tool_id, tool_version=None, exact_tools=True, tool_uuid=None, **kwds)[source]

Bases: WorkflowModule

type: str = 'tool'
name: str = 'Tool'
__init__(trans, tool_id, tool_version=None, exact_tools=True, tool_uuid=None, **kwds)[source]
classmethod from_dict(trans, d, **kwds)[source]
classmethod from_workflow_step(trans, step, **kwds)[source]
save_to_step(step, detached=False)[source]
get_name()[source]
get_content_id()[source]

If this component has an identifier external to the step (such as a tool or another workflow) return the identifier for that content.

get_version()[source]
get_tooltip(static_path=None)[source]
get_errors(include_tool_id=False, **kwargs)[source]

This returns a step related error message as string or None

get_inputs()[source]

This returns inputs displayed in the workflow editor

get_all_inputs(data_only=False, connectable_only=False)[source]
get_all_outputs(data_only=False)[source]
get_config_form(step=None)[source]

Serializes input parameters of a module into input dictionaries.

check_and_update_state()[source]

If the state is not in sync with the current implementation of the module, try to update. Returns a list of messages to be displayed

add_dummy_datasets(connections=None, steps=None)[source]

Replace connected inputs with placeholder/dummy values.

get_post_job_actions(incoming)[source]
recover_state(state, **kwds)[source]

Recover state dict from simple dictionary describing configuration state (potentially from persisted step state).

Sub-classes should supply a default_state method which contains the initial state dict with key, value pairs for all available attributes.

augment_tool_state_for_input_connections(**kwds)[source]

Update tool state to accommodate specified input connections.

Top-level and conditional inputs will automatically get populated with connected data outputs at runtime, but if there are not enough repeat instances in the tool state - the runtime replacement code will never visit the input elements it needs to in order to connect the data parameters to the tool state. This code then populates the required repeat instances in the tool state in order for these instances to be visited and inputs properly connected at runtime. I believe this should be run before check_and_update_param_values in recover_state so non-data parameters are properly populated with default values. The need to populate defaults is why this is done here instead of at runtime - but this might also be needed at runtime at some point (for workflows installed before their corresponding tools?).

See the test case test_inputs_to_steps for an example of a workflow test case that exercises this code.

get_runtime_state()[source]
get_runtime_inputs(step, connections: Iterable[WorkflowStepConnection] | None = None)[source]

Used internally by modules and when displaying inputs in workflow editor and run workflow templates.

compute_runtime_state(trans, step=None, step_updates=None)[source]

Determine the runtime state (potentially different from self.state which describes configuration state). This (again unlike self.state) is currently always a DefaultToolState object.

If step is not None, it will be used to search for default values defined in workflow input steps.

If step_updates is None, this is likely for rendering the run form for instance and no runtime properties are available and state must be solely determined by the default runtime state described by the step.

If step_updates are available they describe the runtime properties supplied by the workflow runner.

decode_runtime_state(step, runtime_state)[source]

Take runtime state from persisted invocation and convert it into a DefaultToolState object for use during workflow invocation.

execute(trans, progress: WorkflowProgress, invocation_step, use_cached_job: bool = False) bool | None[source]

Execute the given workflow invocation step.

Use the supplied workflow progress object to track outputs, find inputs, etc….

Return a False if there is additional processing required to on subsequent workflow scheduling runs, None or True means the workflow step executed properly.

get_informal_replacement_parameters(step: WorkflowStep) List[str][source]

Return a list of replacement parameters.

label: str
class galaxy.workflow.modules.WorkflowModuleFactory(module_types: Dict[str, Type[WorkflowModule]])[source]

Bases: object

__init__(module_types: Dict[str, Type[WorkflowModule]])[source]
from_dict(trans, d, **kwargs) WorkflowModule[source]

Return module initialized from the data in dictionary d.

from_workflow_step(trans, step: WorkflowStep, **kwargs) WorkflowModule[source]

Return module initialized from the WorkflowStep object step.

galaxy.workflow.modules.load_module_sections(trans)[source]

Get abstract description of the workflow modules this Galaxy instance is configured with.

exception galaxy.workflow.modules.DelayedWorkflowEvaluation(why=None)[source]

Bases: Exception

__init__(why=None)[source]
exception galaxy.workflow.modules.CancelWorkflowEvaluation(why: InvocationMessageUnion)[source]

Bases: Exception

__init__(why: InvocationMessageUnion)[source]
exception galaxy.workflow.modules.FailWorkflowEvaluation(why: InvocationMessageUnion)[source]

Bases: Exception

__init__(why: InvocationMessageUnion)[source]
exception galaxy.workflow.modules.SkipWorkflowStepEvaluation[source]

Bases: Exception

class galaxy.workflow.modules.WorkflowModuleInjector(trans, allow_tool_state_corrections=False)[source]

Bases: object

Injects workflow step objects from the ORM with appropriate module and module generated/influenced state.

__init__(trans, allow_tool_state_corrections=False)[source]
inject(step: WorkflowStep, step_args=None, steps=None, **kwargs)[source]

Pre-condition: step is an ORM object coming from the database, if supplied step_args is the representation of the inputs for that step supplied via web form.

Post-condition: The supplied step has new non-persistent attributes useful during workflow invocation. These include ‘upgrade_messages’, ‘state’, ‘input_connections_by_name’, and ‘module’.

If step_args is provided from a web form this is applied to generate ‘state’ else it is just obtained from the database.

inject_all(workflow: Workflow, param_map=None, ignore_tool_missing_exception=False, **kwargs)[source]
compute_runtime_state(step: WorkflowStep, step_args=None)[source]
galaxy.workflow.modules.populate_module_and_state(trans, workflow: Workflow, param_map, allow_tool_state_corrections=False, module_injector=None)[source]

Used by API but not web controller, walks through a workflow’s steps and populates transient module and state attributes on each.

galaxy.workflow.render module

class galaxy.workflow.render.WorkflowCanvas[source]

Bases: object

__init__()[source]
finish(for_embed=False)[source]
add_boxes(step_dict, width, name_fill)[source]
add_text(module_data_inputs, module_data_outputs, step, module_name)[source]
add_connection(step_dict, conn, output_dict)[source]
add_steps(highlight_errors=False)[source]
populate_data_for_step(step, module_name, module_data_inputs, module_data_outputs, tool_errors=None)[source]

galaxy.workflow.run module

galaxy.workflow.run.queue_invoke(trans: GalaxyWebTransaction, workflow: Workflow, workflow_run_config: WorkflowRunConfig, request_params: Dict[str, Any] | None = None, populate_state: bool = True, flush: bool = True) WorkflowInvocation[source]
class galaxy.workflow.run.WorkflowRunConfig(target_history: History, replacement_dict: Dict[str, Any] | None = None, inputs: Dict[int, Any] | None = None, param_map: Dict[int, Any] | None = None, allow_tool_state_corrections: bool = False, copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Dict[int, Any] | None = None, preferred_object_store_id: str | None = None, preferred_outputs_object_store_id: str | None = None, preferred_intermediate_object_store_id: str | None = None, effective_outputs: List[EffectiveOutput] | None = None)[source]

Bases: object

Wrapper around all the ways a workflow execution can be parameterized.

Parameters:
  • target_history (galaxy.model.History.) – History to execute workflow in.

  • replacement_dict (dict) – Workflow level parameters used for renaming post job actions.

  • copy_inputs_to_history (bool) – Should input data parameters be copied to target_history. (Defaults to False)

  • inputs (dict) – Map from step ids to dict’s containing HDA for these steps.

  • inputs_by (str) – How inputs maps to inputs (datasets/collections) to workflows steps - by unencoded database id (‘step_id’), index in workflow ‘step_index’ (independent of database), or by input name for that step (‘name’).

  • param_map (dict) – Override step parameters - should be dict with step id keys and tool param name-value dicts as values.

__init__(target_history: History, replacement_dict: Dict[str, Any] | None = None, inputs: Dict[int, Any] | None = None, param_map: Dict[int, Any] | None = None, allow_tool_state_corrections: bool = False, copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Dict[int, Any] | None = None, preferred_object_store_id: str | None = None, preferred_outputs_object_store_id: str | None = None, preferred_intermediate_object_store_id: str | None = None, effective_outputs: List[EffectiveOutput] | None = None) None[source]

galaxy.workflow.run_request module

class galaxy.workflow.run_request.WorkflowRunConfig(target_history: History, replacement_dict: Dict[str, Any] | None = None, inputs: Dict[int, Any] | None = None, param_map: Dict[int, Any] | None = None, allow_tool_state_corrections: bool = False, copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Dict[int, Any] | None = None, preferred_object_store_id: str | None = None, preferred_outputs_object_store_id: str | None = None, preferred_intermediate_object_store_id: str | None = None, effective_outputs: List[EffectiveOutput] | None = None)[source]

Bases: object

Wrapper around all the ways a workflow execution can be parameterized.

Parameters:
  • target_history (galaxy.model.History.) – History to execute workflow in.

  • replacement_dict (dict) – Workflow level parameters used for renaming post job actions.

  • copy_inputs_to_history (bool) – Should input data parameters be copied to target_history. (Defaults to False)

  • inputs (dict) – Map from step ids to dict’s containing HDA for these steps.

  • inputs_by (str) – How inputs maps to inputs (datasets/collections) to workflows steps - by unencoded database id (‘step_id’), index in workflow ‘step_index’ (independent of database), or by input name for that step (‘name’).

  • param_map (dict) – Override step parameters - should be dict with step id keys and tool param name-value dicts as values.

__init__(target_history: History, replacement_dict: Dict[str, Any] | None = None, inputs: Dict[int, Any] | None = None, param_map: Dict[int, Any] | None = None, allow_tool_state_corrections: bool = False, copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Dict[int, Any] | None = None, preferred_object_store_id: str | None = None, preferred_outputs_object_store_id: str | None = None, preferred_intermediate_object_store_id: str | None = None, effective_outputs: List[EffectiveOutput] | None = None) None[source]
galaxy.workflow.run_request.build_workflow_run_configs(trans: GalaxyWebTransaction, workflow: Workflow, payload: Dict[str, Any]) List[WorkflowRunConfig][source]
galaxy.workflow.run_request.workflow_run_config_to_request(trans: GalaxyWebTransaction, run_config: WorkflowRunConfig, workflow: Workflow) WorkflowInvocation[source]
galaxy.workflow.run_request.workflow_request_to_run_config(workflow_invocation: WorkflowInvocation, use_cached_job: bool = False) WorkflowRunConfig[source]

galaxy.workflow.scheduling_manager module

class galaxy.workflow.scheduling_manager.WorkflowSchedulingManager(app)[source]

Bases: ConfiguresHandlers

A workflow scheduling manager based loosely on pattern established by galaxy.manager.JobManager. Only schedules workflows on handler processes.

DEFAULT_BASE_HANDLER_POOLS: Tuple[str, ...] = ('workflow-schedulers', 'job-handlers')
__init__(app)[source]
shutdown()[source]
queue(workflow_invocation, request_params, flush=True)[source]
class galaxy.workflow.scheduling_manager.WorkflowRequestMonitor(app, workflow_scheduling_manager)[source]

Bases: Monitors

__init__(app, workflow_scheduling_manager)[source]
start()[source]
shutdown()[source]

galaxy.workflow.steps module

This module contains utility methods for reasoning about and ordering workflow steps.

galaxy.workflow.steps.attach_ordered_steps(workflow)[source]

Attempt to topologically order steps and comments, and attach to workflow. If ordering Steps fails - the workflow contains cycles so it mark it as such.

galaxy.workflow.steps.order_workflow_steps(steps, comments)[source]

Perform topological sort of the steps and comments, return (ordered steps, ordered comments) or (None, ordered comments)

galaxy.workflow.steps.has_cycles(workflow)[source]
galaxy.workflow.steps.edgelist_for_workflow_steps(steps)[source]

Create a list of tuples representing edges between WorkflowStep s based on associated WorkflowStepConnection s

galaxy.workflow.steps.order_workflow_steps_with_levels(steps)[source]

galaxy.workflow.trs_proxy module

galaxy.workflow.trs_proxy.parse_search_kwds(search_query)[source]
class galaxy.workflow.trs_proxy.TrsProxy(config=None)[source]

Bases: object

__init__(config=None)[source]
get_servers()[source]
get_server(trs_server)[source]
server_from_url(trs_url)[source]
match_url(url, ip_allowlist: List[IPv4Address | IPv6Address | IPv4Network | IPv6Network])[source]
class galaxy.workflow.trs_proxy.TrsServer(trs_url)[source]

Bases: object

__init__(trs_url)[source]
get_tools(**kwd)[source]
get_tool(tool_id, **kwd)[source]
get_versions(tool_id, **kwd)[source]
get_version(tool_id, version_id, **kwd)[source]
get_version_descriptor(tool_id, version_id, **kwd)[source]