Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.lazy_process
import subprocess
import threading
import time
[docs]class LazyProcess:
"""Abstraction describing a command line launching a service - probably
as needed as functionality is accessed in Galaxy.
"""
[docs] def __init__(self, command_and_args):
self.command_and_args = command_and_args
self.thread_lock = threading.Lock()
self.allow_process_request = True
self.process = None
[docs] def start_process(self):
with self.thread_lock:
if self.allow_process_request:
self.allow_process_request = False
t = threading.Thread(target=self.__start)
t.daemon = True
t.start()
def __start(self):
with self.thread_lock:
self.process = subprocess.Popen(self.command_and_args, close_fds=True)
[docs] def shutdown(self):
with self.thread_lock:
self.allow_process_request = False
if self.running:
assert self.process # tell type checker it can not be None if self.running
self.process.terminate()
time.sleep(0.01)
if self.running:
self.process.kill()
@property
def running(self):
return self.process and not self.process.poll()
[docs]class NoOpLazyProcess:
"""LazyProcess abstraction meant to describe potentially optional
services, in those cases where one is not configured or valid, this
class can be used in place of LazyProcess.
"""
@property
def running(self):
return False