Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.chronos
import functools
import logging
from galaxy import model
from galaxy.jobs.runners import AsynchronousJobRunner, AsynchronousJobState
from galaxy.util import unicodify
CHRONOS_IMPORT_MSG = ('The Python \'chronos\' package is required to use '
'this feature, please install it or correct the '
'following error:\nImportError {msg!s}')
try:
import chronos
chronos_exceptions = (
chronos.ChronosAPIError,
chronos.UnauthorizedError,
chronos.MissingFieldError,
chronos.OneOfViolationError,
)
except ImportError as e:
chronos = None
CHRONOS_IMPORT_MSG.format(msg=unicodify(e))
__all__ = ('ChronosJobRunner',)
LOGGER = logging.getLogger(__name__)
class ChronosRunnerException(Exception):
pass
def handle_exception_call(func):
# Catch chronos exceptions. The latest version of chronos-python does
# support a hierarchy over the exceptions.
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except chronos_exceptions as e:
LOGGER.error(unicodify(e))
return wrapper
def to_dict(segments, v):
if len(segments) == 0:
return v
return {segments[0]: to_dict(segments[1:], v)}
def _write_logfile(logfile, msg):
with open(logfile, 'w') as fil:
fil.write(msg)
[docs]class ChronosJobRunner(AsynchronousJobRunner):
runner_name = 'ChronosRunner'
RUNNER_PARAM_SPEC_KEY = 'runner_param_specs'
JOB_NAME_PREFIX = 'galaxy-chronos-'
RUNNER_PARAM_SPEC = {
'chronos': {
'map': str,
},
'insecure': {
'map': lambda x: x in ['true', 'True', 'TRUE'],
'default': True,
},
'username': {
'map': str,
},
'password': {
'map': str,
},
'owner': {
'map': str
},
}
DESTINATION_PARAMS_SPEC = {
'docker_cpu': {
'default': 0.1,
'map_name': 'cpus',
'map': float,
},
'docker_memory': {
'default': 128,
'map_name': 'mem',
'map': int,
},
'docker_disk': {
'default': 256,
'map_name': 'disk',
'map': int,
},
'volumes': {
'default': None,
'map_name': 'container/volumes',
'map': (
lambda x: [{'containerPath': x, 'hostPath': x, 'mode': 'RW'}]
if x is not None else [])
},
'max_retries': {
'default': 2,
'map_name': 'retries',
'map': int,
},
}
[docs] def __init__(self, app, nworkers, **kwargs):
"""Initialize this job runner and start the monitor thread"""
assert chronos, CHRONOS_IMPORT_MSG
if self.RUNNER_PARAM_SPEC_KEY not in kwargs:
kwargs[self.RUNNER_PARAM_SPEC_KEY] = {}
kwargs[self.RUNNER_PARAM_SPEC_KEY].update(self.RUNNER_PARAM_SPEC)
super().__init__(app, nworkers, **kwargs)
protocol = 'http' if self.runner_params.get('insecure', True) else 'https'
self._chronos_client = chronos.connect(
self.runner_params['chronos'],
username=self.runner_params.get('username'),
password=self.runner_params.get('password'),
proto=protocol)
self._init_monitor_thread()
self._init_worker_threads()
[docs] @handle_exception_call
def queue_job(self, job_wrapper):
LOGGER.debug("Starting queue_job for job " + job_wrapper.get_id_tag())
if not self.prepare_job(job_wrapper, include_metadata=False,
modify_command_for_container=False):
LOGGER.debug("Not ready " + job_wrapper.get_id_tag())
return
job_destination = job_wrapper.job_destination
chronos_job_spec = self._get_job_spec(job_wrapper)
job_name = chronos_job_spec['name']
self._chronos_client.add(chronos_job_spec)
ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=job_name,
job_destination=job_destination)
self.monitor_queue.put(ajs)
[docs] @handle_exception_call
def stop_job(self, job_wrapper):
job_id = job_wrapper.get_id_tag()
job_name = self.JOB_NAME_PREFIX + job_id
job = self._retrieve_job(job_name)
if job:
msg = 'Job {name!r} is terminated'
self._chronos_client.delete(job_name)
LOGGER.debug(msg.format(name=job_name))
else:
msg = 'Job {name!r} not found. It cannot be terminated.'
LOGGER.error(msg.format(name=job_name))
[docs] def recover(self, job, job_wrapper):
msg = ('(name!r/runner!r) is still in {state!s} state, adding to'
' the runner monitor queue')
job_id = job.get_job_runner_external_id()
ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper)
ajs.job_id = self.JOB_NAME_PREFIX + str(job_id)
ajs.command_line = job.command_line
ajs.job_wrapper = job_wrapper
ajs.job_destination = job_wrapper.job_destination
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
LOGGER.debug(msg.format(
name=job.id, runner=job.job_runner_external_id,
state=job.state))
ajs.old_state = model.Job.states.RUNNING
ajs.running = True
self.monitor_queue.put(ajs)
elif job.state == model.Job.states.QUEUED:
LOGGER.debug(msg.format(
name=job.id, runner=job.job_runner_external_id,
state='queued'))
ajs.old_state = model.Job.states.QUEUED
ajs.running = False
self.monitor_queue.put(ajs)
[docs] @handle_exception_call
def check_watched_item(self, job_state):
job_name = job_state.job_id
job = self._retrieve_job(job_name)
# TODO: how can stopped GxIT jobs be handled here?
if job:
succeeded = job['successCount']
errors = job['errorCount']
if succeeded > 0:
return self._mark_as_successful(job_state)
elif not succeeded and not errors:
return self._mark_as_active(job_state)
elif errors:
max_retries = job['retries']
msg = 'Job {name!r} failed more than {retries!s} times'
reason = msg.format(name=job_name, retries=str(max_retries))
return self._mark_as_failed(job_state, reason)
reason = f'Job {job_name!r} not found'
return self._mark_as_failed(job_state, reason)
def _mark_as_successful(self, job_state):
msg = 'Job {name!r} finished successfully'
_write_logfile(job_state.output_file,
msg.format(name=job_state.job_id))
_write_logfile(job_state.error_file, '')
job_state.running = False
job_state.job_wrapper.change_state(model.Job.states.OK)
self.mark_as_finished(job_state)
return None
def _mark_as_active(self, job_state):
job_state.running = True
job_state.job_wrapper.change_state(model.Job.states.RUNNING)
return job_state
def _mark_as_failed(self, job_state, reason):
_write_logfile(job_state.error_file, reason)
job_state.running = False
job_state.stop_job = True
job_state.job_wrapper.change_state(model.Job.states.ERROR)
job_state.fail_message = reason
self.mark_as_failed(job_state)
return None
[docs] @handle_exception_call
def finish_job(self, job_state):
super().finish_job(job_state)
self._chronos_client.delete(job_state.job_id)
[docs] def parse_destination_params(self, params):
parsed_params = {}
for k, spec in self.DESTINATION_PARAMS_SPEC.items():
value = params.get(k, spec.get('default'))
map_to = spec.get('map_name')
mapper = spec.get('map')
segments = map_to.split('/')
parsed_params.update(to_dict(segments, mapper(value)))
return parsed_params
def _get_job_spec(self, job_wrapper):
job_name = self.JOB_NAME_PREFIX + job_wrapper.get_id_tag()
job_destination = job_wrapper.job_destination
template = {
'async': False,
'command': job_wrapper.runner_command_line,
'owner': self.runner_params['owner'],
'disabled': False,
'schedule': 'R1//PT1S',
'name': job_name,
}
if not job_destination.params.get('docker_enabled'):
raise ChronosRunnerException(
'ChronosJobRunner needs \'docker_enabled\' to be set as True')
destination_params = self.parse_destination_params(
job_destination.params)
template.update(destination_params)
template['container']['type'] = 'DOCKER'
template['container']['image'] = self._find_container(
job_wrapper).container_id
return template
def _retrieve_job(self, job_id):
jobs = self._chronos_client.list()
job = [x for x in jobs if x['name'] == job_id]
if len(job) > 1:
msg = f'Multiple jobs found with name {job_id!r}'
LOGGER.error(msg)
raise ChronosRunnerException(msg)
return job[0] if job else None