import logging
import os
import subprocess
import time
from dataclasses import dataclass
from string import Template
from typing import (
Any,
Dict,
Optional,
)
from typing_extensions import Protocol
from galaxy.util import (
RWXR_XR_X,
unicodify,
)
from galaxy.util.resources import resource_string
from ..fork_safe_write import fork_safe_write
log = logging.getLogger(__name__)
DEFAULT_SHELL = "/bin/bash"
DEFAULT_JOB_FILE_TEMPLATE = Template(resource_string(__name__, "DEFAULT_JOB_FILE_TEMPLATE.sh"))
SLOTS_STATEMENT_CLUSTER_DEFAULT = resource_string(__name__, "CLUSTER_SLOTS_STATEMENT.sh")
MEMORY_STATEMENT_DEFAULT_TEMPLATE = Template(resource_string(__name__, "MEMORY_STATEMENT_TEMPLATE.sh"))
SLOTS_STATEMENT_SINGLE = """
GALAXY_SLOTS="1"
"""
INTEGRITY_INJECTION = """
# The following block can be used by the job system
# to ensure this script is runnable before actually attempting
# to run it.
if [ -n "$ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ" ]; then
exit 42
fi
"""
INTEGRITY_SYNC_COMMAND = "/bin/sync"
DEFAULT_INTEGRITY_CHECK = True
DEFAULT_INTEGRITY_COUNT = 35
DEFAULT_INTEGRITY_SLEEP = 0.25
REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command"]
OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = {
"galaxy_lib": None,
"galaxy_virtual_env": None,
"headers": "",
"env_setup_commands": [],
"slots_statement": SLOTS_STATEMENT_CLUSTER_DEFAULT,
"instrument_pre_commands": "",
"instrument_post_commands": "",
"integrity_injection": INTEGRITY_INJECTION,
"shell": DEFAULT_SHELL,
"preserve_python_environment": True,
"tmp_dir_creation_statement": '""',
}
[docs]def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds):
"""
>>> has_exception = False
>>> try: job_script()
... except Exception as e: has_exception = True
>>> has_exception
True
>>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec')
>>> '\\nuptime\\n' in script
True
>>> 'GALAXY_LIB="None"' in script
True
>>> script.startswith('#!/bin/sh\\n#PBS -test\\n')
False
>>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', headers='#PBS -test')
>>> script.startswith('#!/bin/bash\\n\\n#PBS -test\\n')
True
>>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', slots_statement='GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"')
>>> script.find('GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"\\n') > 0
True
>>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', memory_statement='GALAXY_MEMORY_MB="32768"')
>>> script.find('GALAXY_MEMORY_MB="32768"\\n') > 0
True
"""
if any(param not in kwds for param in REQUIRED_TEMPLATE_PARAMS):
raise Exception("Failed to create job_script, a required parameter is missing.")
job_instrumenter = kwds.get("job_instrumenter", None)
metadata_directory = kwds.get("metadata_directory", kwds["working_directory"])
if job_instrumenter:
del kwds["job_instrumenter"]
kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(metadata_directory) or ""
kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(metadata_directory) or ""
if "memory_statement" not in kwds:
kwds["memory_statement"] = MEMORY_STATEMENT_DEFAULT_TEMPLATE.safe_substitute(
metadata_directory=metadata_directory
)
# Setup home directory var
kwds["home_directory"] = kwds.get("home_directory", os.path.join(kwds["working_directory"], "home"))
template_params = OPTIONAL_TEMPLATE_PARAMS.copy()
template_params.update(**kwds)
env_setup_commands_str = "\n".join(template_params["env_setup_commands"])
template_params["env_setup_commands"] = env_setup_commands_str
for key, value in template_params.items():
template_params[key] = unicodify(value)
if not isinstance(template, Template):
template = Template(template)
return template.safe_substitute(template_params)
class DescribesScriptIntegrityChecks(Protocol):
check_job_script_integrity: bool
check_job_script_integrity_count: Optional[int]
check_job_script_integrity_sleep: Optional[float]
@dataclass
class ScriptIntegrityChecks:
"""Minimal class implementing the DescribesScriptIntegrityChecks protocol"""
check_job_script_integrity: bool
check_job_script_integrity_count: Optional[int] = None
check_job_script_integrity_sleep: Optional[float] = None
[docs]def write_script(path: str, contents, job_io: DescribesScriptIntegrityChecks, mode: int = RWXR_XR_X) -> None:
dir = os.path.dirname(path)
if not os.path.exists(dir):
os.makedirs(dir)
fork_safe_write(path, contents)
os.chmod(path, mode)
if job_io.check_job_script_integrity:
assert job_io.check_job_script_integrity_count is not None
assert job_io.check_job_script_integrity_sleep is not None
_handle_script_integrity(path, job_io.check_job_script_integrity_count, job_io.check_job_script_integrity_sleep)
def _handle_script_integrity(
path: str, check_job_script_integrity_count: int, check_job_script_integrity_sleep: float
) -> None:
script_integrity_verified = False
for _ in range(check_job_script_integrity_count):
try:
returncode = subprocess.call([path], env={"ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ": "1"})
if returncode == 42:
script_integrity_verified = True
break
log.debug("Script integrity error for file '%s': returncode was %d", path, returncode)
# Else we will sync and wait to see if the script becomes
# executable.
try:
# sync file system to avoid "Text file busy" problems.
# These have occurred both in Docker containers and on EC2 clusters
# under high load.
subprocess.check_call(INTEGRITY_SYNC_COMMAND)
except Exception as e:
log.debug("Error syncing the filesystem: %s", unicodify(e))
except Exception as exc:
log.debug("Script not available yet: %s", unicodify(exc))
time.sleep(check_job_script_integrity_sleep)
if not script_integrity_verified:
raise Exception(f"Failed to write job script '{path}', could not verify job script integrity.")
__all__ = (
"job_script",
"write_script",
"INTEGRITY_INJECTION",
)