Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.command_factory

import json
import typing
from logging import getLogger
from os import (
    getcwd,
    makedirs,
)
from os.path import (
    abspath,
    join,
)
from typing import Optional

from galaxy import util
from galaxy.job_execution.output_collect import default_exit_code_file
from galaxy.jobs.runners.util.job_script import (
    INTEGRITY_INJECTION,
    ScriptIntegrityChecks,
    write_script,
)
from galaxy.tool_util.deps.container_classes import (
    Container,
    TRAP_KILL_CONTAINER,
)

if typing.TYPE_CHECKING:
    from galaxy.jobs import MinimalJobWrapper
    from galaxy.jobs.runners import BaseJobRunner

log = getLogger(__name__)

CAPTURE_RETURN_CODE = "return_code=$?"
YIELD_CAPTURED_CODE = 'sh -c "exit $return_code"'
SETUP_GALAXY_FOR_METADATA = """
[ "$GALAXY_VIRTUAL_ENV" = "None" ] && GALAXY_VIRTUAL_ENV="$_GALAXY_VIRTUAL_ENV"; _galaxy_setup_environment True"""
PREPARE_DIRS = """mkdir -p working outputs configs
if [ -d _working ]; then
    rm -rf working/ outputs/ configs/; cp -R _working working; cp -R _outputs outputs; cp -R _configs configs
else
    cp -R working _working; cp -R outputs _outputs; cp -R configs _configs
fi
cd working"""


[docs]def build_command( runner: "BaseJobRunner", job_wrapper: "MinimalJobWrapper", container: Optional[Container] = None, modify_command_for_container: bool = True, include_metadata: bool = False, include_work_dir_outputs: bool = True, create_tool_working_directory: bool = True, remote_command_params=None, remote_job_directory=None, stream_stdout_stderr: bool = False, ): """ Compose the sequence of commands necessary to execute a job. This will currently include: - environment settings corresponding to any requirement tags - preparing input files - command line taken from job wrapper - commands to set metadata (if include_metadata is True) """ remote_command_params = remote_command_params or {} shell = job_wrapper.shell base_command_line = job_wrapper.get_command_line() # job_id = job_wrapper.job_id # log.debug( 'Tool evaluation for job (%s) produced command-line: %s' % ( job_id, base_command_line ) ) commands_builder = CommandsBuilder(base_command_line) # Dependency resolution and task splitting are prepended to the # command - so they need to appear in the following order to ensure that # the underlying application used by version command is available in the # environment after dependency resolution, but the task splitting command # is still executed in Galaxy's Python environment. # One could imagine also allowing dependencies inside of the container but # that is too sophisticated for a first crack at this - build your # containers ready to go! if not container or container.resolve_dependencies: __handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params) __handle_task_splitting(commands_builder, job_wrapper) for_pulsar = "pulsar_version" in remote_command_params if container: if core_job_metric_plugin := runner.app.job_metrics.default_job_instrumenter.get_configured_plugin("core"): directory = join(job_wrapper.working_directory, "metadata") if for_pulsar else job_wrapper.working_directory makedirs(directory, exist_ok=True) container_file_path = core_job_metric_plugin.get_container_file_path(directory) with open(container_file_path, "w") as container_file: container_file.write( json.dumps({"container_id": container.container_id, "container_type": container.container_type}) ) if (container and modify_command_for_container) or job_wrapper.commands_in_new_shell: if container and modify_command_for_container: # Many Docker containers do not have /bin/bash. external_command_shell = container.shell else: external_command_shell = shell externalized_commands = __externalize_commands( job_wrapper, external_command_shell, commands_builder, remote_command_params, container=container ) if container and modify_command_for_container: # Stop now and build command before handling metadata and copying # working directory files back. These should always happen outside # of docker container - no security implications when generating # metadata and means no need for Galaxy to be available to container # and not copying workdir outputs back means on can be more restrictive # of where container can write to in some circumstances. run_in_container_command = container.containerize_command(externalized_commands) commands_builder = CommandsBuilder(run_in_container_command) else: commands_builder = CommandsBuilder(externalized_commands) # Galaxy writes I/O files to outputs, Pulsar uses metadata. metadata seems like # it should be preferred - at least if the directory exists. io_directory = "../metadata" if for_pulsar else "../outputs" commands_builder.capture_stdout_stderr( f"{io_directory}/tool_stdout", f"{io_directory}/tool_stderr", stream_stdout_stderr=stream_stdout_stderr ) # Don't need to create a separate tool working directory for Pulsar # jobs - that is handled by Pulsar. if create_tool_working_directory: # usually working will already exist, but it will not for task # split jobs. # Copy working and outputs before job submission so that these can be restored on resubmission # xref https://github.com/galaxyproject/galaxy/issues/3289 commands_builder.prepend_command(PREPARE_DIRS) __handle_remote_command_line_building(commands_builder, job_wrapper, for_pulsar=for_pulsar) if container_monitor_command := job_wrapper.container_monitor_command(container): commands_builder.prepend_command(container_monitor_command) working_directory = remote_job_directory or job_wrapper.working_directory commands_builder.capture_return_code(default_exit_code_file(working_directory, job_wrapper.job_id)) if job_wrapper.is_cwl_job: # Minimal metadata needed by the relocate script assert job_wrapper.tool cwl_metadata_params = { "job_metadata": join("working", job_wrapper.tool.provided_metadata_file), "job_id_tag": job_wrapper.get_id_tag(), } cwl_metadata_params_path = join(job_wrapper.working_directory, "cwl_params.json") with open(cwl_metadata_params_path, "w") as f: json.dump(cwl_metadata_params, f) relocate_script_file = join(job_wrapper.working_directory, "relocate_dynamic_outputs.py") relocate_contents = ( "from galaxy_ext.cwl.handle_outputs import relocate_dynamic_outputs; relocate_dynamic_outputs()" ) write_script(relocate_script_file, relocate_contents, ScriptIntegrityChecks(check_job_script_integrity=False)) commands_builder.append_command(SETUP_GALAXY_FOR_METADATA) commands_builder.append_command(f"python '{relocate_script_file}'") if include_work_dir_outputs: __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params) if include_metadata and job_wrapper.requires_setting_metadata: commands_builder.append_command(f"cd '{working_directory}'") __handle_metadata(commands_builder, job_wrapper, runner, remote_command_params) return commands_builder.build()
def __externalize_commands( job_wrapper: "MinimalJobWrapper", shell, commands_builder, remote_command_params, script_name="tool_script.sh", container: Optional[Container] = None, ): local_container_script = join(job_wrapper.working_directory, script_name) tool_commands = commands_builder.build() integrity_injection = "" # Setting shell to none in the job config disables creating a tool command script, # set -e doesn't work for composite commands but this is necessary for Windows jobs # for instance. if shell and shell.lower() == "none": return tool_commands if job_wrapper.job_io.check_job_script_integrity: integrity_injection = INTEGRITY_INJECTION set_e = "" if job_wrapper.strict_shell: set_e = "set -e\n" source_command = "" if container: source_command = container.source_environment script_contents = f"#!{shell}\n{integrity_injection}{set_e}{source_command}{tool_commands}" write_script( local_container_script, script_contents, job_io=job_wrapper.job_io, ) commands = f"{shell} {local_container_script}" # TODO: Cleanup for_pulsar hack. # - Integrate Pulsar sending tool_stdout/tool_stderr back # https://github.com/galaxyproject/pulsar/pull/202 # *and* # - Get Galaxy to write these files to an output directory so the container itself # doesn't need to mount the job directory (rw) and then eliminate this hack # (or restrict to older Pulsar versions). # https://github.com/galaxyproject/galaxy/pull/8449 for_pulsar = "pulsar_version" in remote_command_params if for_pulsar: commands = f"{shell} {join(remote_command_params['script_directory'], script_name)}" log.info(f"Built script [{local_container_script}] for tool command [{tool_commands}]") return commands def __handle_remote_command_line_building(commands_builder, job_wrapper: "MinimalJobWrapper", for_pulsar=False): if job_wrapper.remote_command_line: sep = "" if for_pulsar else "&&" command = 'PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" python "$GALAXY_LIB"/galaxy/tools/remote_tool_eval.py' if for_pulsar: # TODO: that's not how to do this, pulsar doesn't execute an externalized script by default. # This also breaks rewriting paths etc, so it doesn't really work if there are no shared paths command = f"{command} && bash ../tool_script.sh" commands_builder.prepend_command(command, sep=sep) def __handle_task_splitting(commands_builder, job_wrapper: "MinimalJobWrapper"): # prepend getting input files (if defined) prepare_input_files_cmds = getattr(job_wrapper, "prepare_input_files_cmds", None) if prepare_input_files_cmds: commands_builder.prepend_commands(prepare_input_files_cmds) def __handle_dependency_resolution(commands_builder, job_wrapper: "MinimalJobWrapper", remote_command_params): local_dependency_resolution = remote_command_params.get("dependency_resolution", "local") == "local" # Prepend dependency injection if local_dependency_resolution and job_wrapper.dependency_shell_commands: commands_builder.prepend_commands(job_wrapper.dependency_shell_commands) def __handle_work_dir_outputs( commands_builder, job_wrapper: "MinimalJobWrapper", runner: "BaseJobRunner", remote_command_params ): # Append commands to copy job outputs based on from_work_dir attribute. work_dir_outputs_kwds = {} if "working_directory" in remote_command_params: work_dir_outputs_kwds["job_working_directory"] = remote_command_params["working_directory"] work_dir_outputs = runner.get_work_dir_outputs(job_wrapper, **work_dir_outputs_kwds) if work_dir_outputs: copy_commands = map(__copy_if_exists_command, work_dir_outputs) commands_builder.append_commands(copy_commands) def __handle_metadata( commands_builder, job_wrapper: "MinimalJobWrapper", runner: "BaseJobRunner", remote_command_params ): # Append metadata setting commands, we don't want to overwrite metadata # that was copied over in init_meta(), as per established behavior metadata_kwds = remote_command_params.get("metadata_kwds", {}) exec_dir = metadata_kwds.get("exec_dir", abspath(getcwd())) tmp_dir = metadata_kwds.get("tmp_dir", job_wrapper.working_directory) dataset_files_path = metadata_kwds.get("dataset_files_path", runner.app.model.Dataset.file_path) output_fnames = metadata_kwds.get("output_fnames", job_wrapper.job_io.get_output_fnames()) config_root = metadata_kwds.get("config_root", None) config_file = metadata_kwds.get("config_file", None) datatypes_config = metadata_kwds.get("datatypes_config", None) compute_tmp_dir = metadata_kwds.get("compute_tmp_dir", None) version_path = job_wrapper.job_io.version_path resolve_metadata_dependencies = job_wrapper.commands_in_new_shell metadata_command = ( job_wrapper.setup_external_metadata( exec_dir=exec_dir, tmp_dir=tmp_dir, dataset_files_path=dataset_files_path, output_fnames=output_fnames, set_extension=False, config_root=config_root, config_file=config_file, datatypes_config=datatypes_config, compute_tmp_dir=compute_tmp_dir, compute_version_path=version_path, resolve_metadata_dependencies=resolve_metadata_dependencies, use_bin=job_wrapper.use_metadata_binary, kwds={"overwrite": False}, ) or "" ) metadata_command = metadata_command.strip() if metadata_command: # Place Galaxy and its dependencies in environment for metadata regardless of tool. if not job_wrapper.is_cwl_job: commands_builder.append_command(SETUP_GALAXY_FOR_METADATA) commands_builder.append_command(metadata_command) def __copy_if_exists_command(work_dir_output): source_file, destination = work_dir_output if "?" in source_file or "*" in source_file: source_file = source_file.replace("*", '"*"').replace("?", '"?"') # Check if source and destination exist. # Users can purge outputs before the job completes, # in that case we don't want to copy the output to a purged path. # Static, non work_dir_output files are handled in job_finish code. return f'\nif [ -f "{source_file}" -a -f "{destination}" ] ; then cp "{source_file}" "{destination}" ; fi' class CommandsBuilder: def __init__(self, initial_command=""): # Remove trailing semi-colon so we can start hacking up this command. # TODO: Refactor to compose a list and join with ';', would be more clean. self.raw_command = initial_command initial_command = util.unicodify(initial_command or "") commands = initial_command.rstrip("; ") self.commands = commands # Coping work dir outputs or setting metadata will mask return code of # tool command. If these are used capture the return code and ensure # the last thing that happens is an exit with return code. self.return_code_captured = False def prepend_command(self, command, sep=";"): if command: self.commands = f"{command}{sep} {self.commands}" return self def prepend_commands(self, commands): return self.prepend_command("; ".join(c for c in commands if c)) def append_command(self, command, sep=";"): if command: self.commands = f"{self.commands}{sep} {command}" return self def append_commands(self, commands): self.append_command("; ".join(c for c in commands if c)) def capture_stdout_stderr(self, stdout_file, stderr_file, stream_stdout_stderr=False): if not stream_stdout_stderr: self.append_command(f"> '{stdout_file}' 2> '{stderr_file}'", sep="") return trap_command = """trap 'rm -f "$__out" "$__err"' EXIT""" if TRAP_KILL_CONTAINER in self.commands: # We need to replace the container kill trap with one that removes the named pipes and kills the container self.commands = self.commands.replace(TRAP_KILL_CONTAINER, "") trap_command = """trap 'rm -f "$__out" "$__err"; _on_exit' EXIT""" self.prepend_command( f"""__out="${{TMPDIR:-.}}/out.$$" __err="${{TMPDIR:-.}}/err.$$" mkfifo "$__out" "$__err" {trap_command} tee -a '{stdout_file}' < "$__out" & tee -a '{stderr_file}' < "$__err" >&2 &""", sep="", ) self.append_command('> "$__out" 2> "$__err"', sep="") def capture_return_code(self, exit_code_path): self.append_command(CAPTURE_RETURN_CODE) self.append_command(f"echo $return_code > {exit_code_path}") self.return_code_captured = True def build(self): if self.return_code_captured: self.append_command(YIELD_CAPTURED_CODE) return self.commands __all__ = ("build_command",)