Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.cli.shell.local

import os
from logging import getLogger
from subprocess import (
    PIPE,
    Popen,
)
from tempfile import TemporaryFile
from time import sleep

from galaxy.util.bunch import Bunch
from . import BaseShellExec
from ....util.process_groups import (
    check_pg,
    kill_pg,
)

log = getLogger(__name__)

TIMEOUT_ERROR_MESSAGE = "Execution timed out"
TIMEOUT_RETURN_CODE = -1
DEFAULT_TIMEOUT = 60
DEFAULT_TIMEOUT_CHECK_INTERVAL = 3


[docs]class LocalShell(BaseShellExec): """ >>> shell = LocalShell() >>> def exec_python(script, **kwds): return shell.execute(['python', '-c', script], **kwds) >>> exec_result = exec_python("from __future__ import print_function; print('Hello World')") >>> exec_result.stderr == u'' True >>> exec_result.stdout.strip() == u'Hello World' True >>> exec_result.returncode 0 >>> exec_result = exec_python("import time; time.sleep(10)", timeout=1, timeout_check_interval=.1) >>> exec_result.stdout == u'' True >>> exec_result.stderr == 'Execution timed out' True >>> exec_result.returncode == TIMEOUT_RETURN_CODE True >>> shell.execute('echo hi').stdout == "hi\\n" True """
[docs] def __init__(self, **kwds): pass
[docs] def execute( self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds ): is_cmd_string = isinstance(cmd, str) outf = TemporaryFile() p = Popen(cmd, stdin=None, stdout=outf, stderr=PIPE, shell=is_cmd_string, preexec_fn=os.setpgrp) # check process group until timeout for _ in range(int(timeout / timeout_check_interval)): sleep(0.1) # For fast returning commands if not check_pg(p.pid): break sleep(timeout_check_interval) else: kill_pg(p.pid) return Bunch(stdout="", stderr=TIMEOUT_ERROR_MESSAGE, returncode=TIMEOUT_RETURN_CODE) outf.seek(0) # Need to poll once to establish return code p.poll() return Bunch(stdout=_read_str(outf), stderr=_read_str(p.stderr), returncode=p.returncode)
def _read_str(stream): contents = stream.read() return contents.decode("UTF-8") if isinstance(contents, bytes) else contents __all__ = ("LocalShell",)