Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.deferred

"""
Queue for running deferred code via plugins.
"""
import logging
import os
import threading
from Queue import Queue

from galaxy import model
from galaxy.util.bunch import Bunch
from galaxy.util.sleeper import Sleeper

log = logging.getLogger(__name__)


[docs]class DeferredJobQueue(object): job_states = Bunch(READY='ready', WAIT='wait', INVALID='invalid')
[docs] def __init__(self, app): self.app = app self.sa_session = app.model.context.current self.queue = Queue() self.plugins = {} self._load_plugins() self.sleeper = Sleeper() self.running = True self.waiting_jobs = [] self.__check_jobs_at_startup() self.monitor_thread = threading.Thread(target=self.__monitor) self.monitor_thread.start() log.info('Deferred job queue started')
def _load_plugins(self): for fname in os.listdir(os.path.dirname(__file__)): if not fname.startswith('_') and fname.endswith('.py'): name = fname[:-3] module_name = 'galaxy.jobs.deferred.' + name try: module = __import__(module_name) except: log.exception('Deferred job plugin appears to exist but is not loadable: %s', module_name) continue for comp in module_name.split(".")[1:]: module = getattr(module, comp) if '__all__' not in dir(module): log.error('Plugin "%s" does not contain a list of exported classes in __all__' % module_name) continue for obj in module.__all__: display_name = ':'.join((module_name, obj)) plugin = getattr(module, obj) for name in ('check_job', 'run_job'): if name not in dir(plugin): log.error('Plugin "%s" does not contain required method "%s()"' % (display_name, name)) break else: self.plugins[obj] = plugin(self.app) self.plugins[obj].job_states = self.job_states log.debug('Loaded deferred job plugin: %s' % display_name) def __check_jobs_at_startup(self): waiting_jobs = self.sa_session.query(model.DeferredJob) \ .filter(model.DeferredJob.state == model.DeferredJob.states.WAITING).all() for job in waiting_jobs: if not self.__check_job_plugin(job): continue if 'check_interval' in dir(self.plugins[job.plugin]): job.check_interval = self.plugins[job.plugin].check_interval log.info('Recovered deferred job (id: %s) at startup' % job.id) # Pass the job ID as opposed to the job, since the monitor thread # needs to load it in its own threadlocal scoped session. self.waiting_jobs.append(job.id) def __monitor(self): while self.running: try: self.__monitor_step() except: log.exception('Exception in monitor_step') self.sleeper.sleep(1) log.info('job queue stopped') def __monitor_step(self): # TODO: Querying the database with this frequency is bad, we need message passing new_jobs = self.sa_session.query(model.DeferredJob) \ .filter(model.DeferredJob.state == model.DeferredJob.states.NEW).all() for job in new_jobs: if not self.__check_job_plugin(job): continue job.state = model.DeferredJob.states.WAITING self.sa_session.add(job) self.sa_session.flush() if 'check_interval' in dir(self.plugins[job.plugin]): job.check_interval = self.plugins[job.plugin].check_interval self.waiting_jobs.append(job) new_waiting = [] for job in self.waiting_jobs: try: # Recovered jobs are passed in by ID assert type(job) is int job = self.sa_session.query(model.DeferredJob).get(job) except: pass if job.is_check_time: try: job_state = self.plugins[job.plugin].check_job(job) except Exception: self.__fail_job(job) log.exception('Set deferred job %s to error because of an exception in check_job()' % job.id) continue if job_state == self.job_states.READY: try: self.plugins[job.plugin].run_job(job) except Exception: self.__fail_job(job) log.exception('Set deferred job %s to error because of an exception in run_job()' % job.id) continue elif job_state == self.job_states.INVALID: self.__fail_job(job) log.error('Unable to run deferred job (id: %s): Plugin "%s" marked it as invalid' % (job.id, job.plugin)) continue else: new_waiting.append(job) job.last_check = 'now' else: new_waiting.append(job) self.waiting_jobs = new_waiting def __check_job_plugin(self, job): if job.plugin not in self.plugins: log.error('Invalid deferred job plugin: %s') % job.plugin job.state = model.DeferredJob.states.ERROR self.sa_session.add(job) self.sa_session.flush() return False return True def __check_if_ready_to_run(self, job): return self.plugins[job.plugin].check_job(job) def __fail_job(self, job): job.state = model.DeferredJob.states.ERROR self.sa_session.add(job) self.sa_session.flush()
[docs] def shutdown(self): self.running = False self.sleeper.wake()
[docs]class FakeTrans(object): """A fake trans for calling the external set metadata tool"""
[docs] def __init__(self, app, history=None, user=None): class Dummy(object): def __init__(self): self.id = None self.app = app self.sa_session = app.model.context.current self.dummy = Dummy() if not history: self.history = Dummy() else: self.history = history if not user: self.user = Dummy() else: self.user = user self.model = app.model
[docs] def get_galaxy_session(self): return self.dummy
[docs] def log_event(self, message, tool_id=None): pass
[docs] def get_current_user_roles(self): if self.user: return self.user.all_roles() else: return []
[docs] def db_dataset_for(self, dbkey): if self.history is None: return None datasets = self.sa_session.query(self.app.model.HistoryDatasetAssociation) \ .filter_by(deleted=False, history_id=self.history.id, extension="len") for ds in datasets: if dbkey == ds.dbkey: return ds return None