Source code for galaxy.jobs.manager

"""
Top-level Galaxy job manager, moves jobs to handler(s)
"""

import logging
from functools import partial

from galaxy.exceptions import (
    HandlerAssignmentError,
    ToolExecutionError,
)
from galaxy.jobs import (
    handler,
    NoopQueue,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.web_stack.message import JobHandlerMessage

log = logging.getLogger(__name__)


[docs]class JobManager: """ Highest level interface to job management. """ job_handler: handler.JobHandlerI
[docs] def __init__(self, app: MinimalManagerApp): self.app = app self.job_lock = False self.job_handler = NoopHandler()
def _check_jobs_at_startup(self): if not self.app.is_job_handler: self.__check_jobs_at_startup()
[docs] def start(self): if self.app.is_job_handler: log.debug("Initializing job handler") self.job_handler = handler.JobHandler(self.app) self.job_handler.start()
def _queue_callback(self, job, tool_id): self.job_handler.job_queue.put(job.id, tool_id) def _message_callback(self, job): return JobHandlerMessage(task="setup", job_id=job.id)
[docs] def enqueue(self, job, tool=None, flush=True): """Queue a job for execution. Due to the nature of some handler assignment methods which are wholly DB-based, the enqueue method will flush the job. Callers who create the job typically should not flush the job before handing it off to ``enqueue()``. If a job handler cannot be assigned, py:class:`ToolExecutionError` is raised. :param job: Job to enqueue. :type job: Instance of :class:`galaxy.model.Job`. :param tool: Tool that the job will execute. :type tool: Instance of :class:`galaxy.tools.Tool`. :raises ToolExecutionError: if a handler was unable to be assigned. :returns: str or None -- Handler ID, tag, or pool assigned to the job. """ tool_id = None configured_handler = None if tool: tool_id = tool.id configured_handler = tool.get_configured_job_handler(job.params) if configured_handler is not None: p = f" (with job params: {str(job.params)})" if job.params else "" log.debug( "(%s) Configured job handler for tool '%s'%s is: %s", job.log_str(), tool_id, p, configured_handler ) queue_callback = partial(self._queue_callback, job, tool_id) message_callback = partial(self._message_callback, job) try: return self.app.job_config.assign_handler( job, configured=configured_handler, queue_callback=queue_callback, message_callback=message_callback, flush=flush, ) except HandlerAssignmentError as exc: raise ToolExecutionError(exc.args[0], job=exc.obj)
[docs] def stop(self, job, message=None): """Stop a job that is currently executing. This can be safely called on jobs that have already terminated. :param job: Job to stop. :type job: Instance of :class:`galaxy.model.Job`. :param message: Message (if any) to be set on the job and output dataset(s) to explain the reason for stopping. :type message: str """ self.job_handler.job_stop_queue.put(job.id, error_msg=message)
[docs] def shutdown(self): self.job_handler.shutdown()
[docs]class NoopManager: """ Implements the JobManager interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_handler = NoopHandler()
[docs] def enqueue(self, *args, **kwargs): pass
[docs] def stop(self, *args, **kwargs): pass
[docs]class NoopHandler(handler.JobHandlerI): """ Implements the JobHandler interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_queue = NoopQueue() self.job_stop_queue = NoopQueue()
[docs] def start(self): pass
[docs] def shutdown(self, *args): pass