Source code for

Top-level Galaxy job manager, moves jobs to handler(s)

import logging
from functools import partial

from galaxy.exceptions import (
from import (
from galaxy.structured_app import MinimalManagerApp
from galaxy.web_stack.message import JobHandlerMessage

log = logging.getLogger(__name__)

[docs]class JobManager: """ Highest level interface to job management. """ job_handler: handler.JobHandlerI
[docs] def __init__(self, app: MinimalManagerApp): = app self.job_lock = False self.job_handler = NoopHandler()
def _check_jobs_at_startup(self): if not self.__check_jobs_at_startup()
[docs] def start(self): if log.debug("Initializing job handler") self.job_handler = handler.JobHandler( self.job_handler.start()
def _queue_callback(self, job, tool_id): self.job_handler.job_queue.put(, tool_id) def _message_callback(self, job): return JobHandlerMessage(task="setup",
[docs] def enqueue(self, job, tool=None, flush=True): """Queue a job for execution. Due to the nature of some handler assignment methods which are wholly DB-based, the enqueue method will flush the job. Callers who create the job typically should not flush the job before handing it off to ``enqueue()``. If a job handler cannot be assigned, py:class:`ToolExecutionError` is raised. :param job: Job to enqueue. :type job: Instance of :class:`galaxy.model.Job`. :param tool: Tool that the job will execute. :type tool: Instance of :class:``. :raises ToolExecutionError: if a handler was unable to be assigned. :returns: str or None -- Handler ID, tag, or pool assigned to the job. """ tool_id = None configured_handler = None if tool: tool_id = configured_handler = tool.get_configured_job_handler(job.params) if configured_handler is not None: p = f" (with job params: {str(job.params)})" if job.params else "" log.debug( "(%s) Configured job handler for tool '%s'%s is: %s", job.log_str(), tool_id, p, configured_handler ) queue_callback = partial(self._queue_callback, job, tool_id) message_callback = partial(self._message_callback, job) try: return job, configured=configured_handler, queue_callback=queue_callback, message_callback=message_callback, flush=flush, ) except HandlerAssignmentError as exc: raise ToolExecutionError(exc.args[0], job=exc.obj)
[docs] def stop(self, job, message=None): """Stop a job that is currently executing. This can be safely called on jobs that have already terminated. :param job: Job to stop. :type job: Instance of :class:`galaxy.model.Job`. :param message: Message (if any) to be set on the job and output dataset(s) to explain the reason for stopping. :type message: str """ self.job_handler.job_stop_queue.put(, error_msg=message)
[docs] def shutdown(self): self.job_handler.shutdown()
[docs]class NoopManager: """ Implements the JobManager interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_handler = NoopHandler()
[docs] def enqueue(self, *args, **kwargs): pass
[docs] def stop(self, *args, **kwargs): pass
[docs]class NoopHandler(handler.JobHandlerI): """ Implements the JobHandler interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_queue = NoopQueue() self.job_stop_queue = NoopQueue()
[docs] def start(self): pass
[docs] def shutdown(self, *args): pass