Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.manager
"""
Top-level Galaxy job manager, moves jobs to handler(s)
"""
import logging
from functools import partial
from sqlalchemy.sql.expression import null
from galaxy.exceptions import HandlerAssignmentError, ToolExecutionError
from galaxy.jobs import handler, NoopQueue
from galaxy.model import Job
from galaxy.structured_app import MinimalManagerApp
from galaxy.web_stack.message import JobHandlerMessage
log = logging.getLogger(__name__)
[docs]class JobManager:
"""
Highest level interface to job management.
"""
job_handler: handler.JobHandlerI
[docs] def __init__(self, app: MinimalManagerApp):
self.app = app
self.job_lock = False
self.job_handler = NoopHandler()
def _check_jobs_at_startup(self):
if not self.app.is_job_handler:
self.__check_jobs_at_startup()
def __check_jobs_at_startup(self):
if self.app.job_config.use_messaging:
jobs_at_startup = self.app.model.context.query(Job).enable_eagerloads(False) \
.filter((Job.state == Job.states.NEW) & (Job.handler == null())).all()
if jobs_at_startup:
log.info(
'No handler assigned at startup for the following jobs, will dispatch via message: %s',
', '.join(str(j.id) for j in jobs_at_startup))
for job in jobs_at_startup:
tool = self.app.toolbox.get_tool(job.tool_id, job.tool_version, exact=True)
self.enqueue(job, tool)
[docs] def start(self):
if self.app.is_job_handler:
log.debug("Initializing job handler")
self.job_handler = handler.JobHandler(self.app)
self.job_handler.start()
def _queue_callback(self, job, tool_id):
self.job_handler.job_queue.put(job.id, tool_id)
def _message_callback(self, job):
return JobHandlerMessage(task='setup', job_id=job.id)
[docs] def enqueue(self, job, tool=None, flush=True):
"""Queue a job for execution.
Due to the nature of some handler assignment methods which are wholly DB-based, the enqueue method will flush
the job. Callers who create the job typically should not flush the job before handing it off to ``enqueue()``.
If a job handler cannot be assigned, py:class:`ToolExecutionError` is raised.
:param job: Job to enqueue.
:type job: Instance of :class:`galaxy.model.Job`.
:param tool: Tool that the job will execute.
:type tool: Instance of :class:`galaxy.tools.Tool`.
:raises ToolExecutionError: if a handler was unable to be assigned.
:returns: str or None -- Handler ID, tag, or pool assigned to the job.
"""
tool_id = None
configured_handler = None
if tool:
tool_id = tool.id
configured_handler = tool.get_configured_job_handler(job.params)
if configured_handler is not None:
p = " (with job params: %s)" % str(job.params) if job.params else ""
log.debug("(%s) Configured job handler for tool '%s'%s is: %s", job.log_str(), tool_id, p, configured_handler)
queue_callback = partial(self._queue_callback, job, tool_id)
message_callback = partial(self._message_callback, job)
try:
return self.app.job_config.assign_handler(
job, configured=configured_handler, queue_callback=queue_callback, message_callback=message_callback, flush=flush)
except HandlerAssignmentError as exc:
raise ToolExecutionError(exc.args[0], job=exc.obj)
[docs] def stop(self, job, message=None):
"""Stop a job that is currently executing.
This can be safely called on jobs that have already terminated.
:param job: Job to stop.
:type job: Instance of :class:`galaxy.model.Job`.
:param message: Message (if any) to be set on the job and output dataset(s) to explain the reason for stopping.
:type message: str
"""
self.job_handler.job_stop_queue.put(job.id, error_msg=message)