This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.lazy_process

import subprocess
import threading
import time

[docs]class LazyProcess: """Abstraction describing a command line launching a service - probably as needed as functionality is accessed in Galaxy. """
[docs] def __init__(self, command_and_args): self.command_and_args = command_and_args self.thread_lock = threading.Lock() self.allow_process_request = True self.process = None
[docs] def start_process(self): with self.thread_lock: if self.allow_process_request: self.allow_process_request = False t = threading.Thread(target=self.__start) t.daemon = True t.start()
def __start(self): with self.thread_lock: self.process = subprocess.Popen(self.command_and_args, close_fds=True)
[docs] def shutdown(self): with self.thread_lock: self.allow_process_request = False if self.running: assert self.process # tell type checker it can not be None if self.running self.process.terminate() time.sleep(0.01) if self.running: self.process.kill()
@property def running(self): return self.process and not self.process.poll()
[docs]class NoOpLazyProcess: """LazyProcess abstraction meant to describe potentially optional services, in those cases where one is not configured or valid, this class can be used in place of LazyProcess. """
[docs] def start_process(self): return
[docs] def shutdown(self): return
@property def running(self): return False