Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.manager

"""
Top-level Galaxy job manager, moves jobs to handler(s)
"""
import logging
from functools import partial

from galaxy.exceptions import (
    HandlerAssignmentError,
    ToolExecutionError,
)
from galaxy.jobs import (
    handler,
    NoopQueue,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.web_stack.message import JobHandlerMessage

log = logging.getLogger(__name__)


[docs]class JobManager: """ Highest level interface to job management. """ job_handler: handler.JobHandlerI
[docs] def __init__(self, app: MinimalManagerApp): self.app = app self.job_lock = False self.job_handler = NoopHandler()
def _check_jobs_at_startup(self): if not self.app.is_job_handler: self.__check_jobs_at_startup()
[docs] def start(self): if self.app.is_job_handler: log.debug("Initializing job handler") self.job_handler = handler.JobHandler(self.app) self.job_handler.start()
def _queue_callback(self, job, tool_id): self.job_handler.job_queue.put(job.id, tool_id) def _message_callback(self, job): return JobHandlerMessage(task="setup", job_id=job.id)
[docs] def enqueue(self, job, tool=None, flush=True): """Queue a job for execution. Due to the nature of some handler assignment methods which are wholly DB-based, the enqueue method will flush the job. Callers who create the job typically should not flush the job before handing it off to ``enqueue()``. If a job handler cannot be assigned, py:class:`ToolExecutionError` is raised. :param job: Job to enqueue. :type job: Instance of :class:`galaxy.model.Job`. :param tool: Tool that the job will execute. :type tool: Instance of :class:`galaxy.tools.Tool`. :raises ToolExecutionError: if a handler was unable to be assigned. :returns: str or None -- Handler ID, tag, or pool assigned to the job. """ tool_id = None configured_handler = None if tool: tool_id = tool.id configured_handler = tool.get_configured_job_handler(job.params) if configured_handler is not None: p = f" (with job params: {str(job.params)})" if job.params else "" log.debug( "(%s) Configured job handler for tool '%s'%s is: %s", job.log_str(), tool_id, p, configured_handler ) queue_callback = partial(self._queue_callback, job, tool_id) message_callback = partial(self._message_callback, job) try: return self.app.job_config.assign_handler( job, configured=configured_handler, queue_callback=queue_callback, message_callback=message_callback, flush=flush, ) except HandlerAssignmentError as exc: raise ToolExecutionError(exc.args[0], job=exc.obj)
[docs] def stop(self, job, message=None): """Stop a job that is currently executing. This can be safely called on jobs that have already terminated. :param job: Job to stop. :type job: Instance of :class:`galaxy.model.Job`. :param message: Message (if any) to be set on the job and output dataset(s) to explain the reason for stopping. :type message: str """ self.job_handler.job_stop_queue.put(job.id, error_msg=message)
[docs] def shutdown(self): self.job_handler.shutdown()
[docs]class NoopManager: """ Implements the JobManager interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_handler = NoopHandler()
[docs] def enqueue(self, *args, **kwargs): pass
[docs] def stop(self, *args, **kwargs): pass
[docs]class NoopHandler(handler.JobHandlerI): """ Implements the JobHandler interface but does nothing """
[docs] def __init__(self, *args, **kwargs): self.job_queue = NoopQueue() self.job_stop_queue = NoopQueue()
[docs] def start(self): pass
[docs] def shutdown(self, *args): pass