Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.containers.docker_swarm_manager
#!/usr/bin/env python
"""
Docker Swarm mode management
"""
from __future__ import absolute_import
import argparse
import errno
import logging
import os
import subprocess
import sys
import time
try:
import daemon
import daemon.pidfile
import lockfile
except ImportError:
daemon = None
try:
import galaxy # noqa: F401 this is a test import
except ImportError:
sys.path.insert(0, os.path.abspath(os.path.join(
os.path.dirname(__file__),
os.pardir,
os.pardir)))
from galaxy.containers import (
build_container_interfaces,
ContainerInterfaceConfig,
parse_containers_config,
)
from galaxy.containers.docker_model import (
CPUS_CONSTRAINT,
DockerServiceConstraints,
IMAGE_CONSTRAINT,
)
DESCRIPTION = "Daemon to manage a Docker Swarm (running in Docker Swarm mode)."
SWARM_MANAGER_CONF_DEFAULTS = {
'pid_file': '{xdg_data_home}/galaxy_swarm_manager.pid',
'log_file': '{xdg_data_home}/galaxy_swarm_manager.log',
'service_wait_count_limit': 0,
'service_wait_time_limit': 5,
'slots_min_limit': 0,
'slots_max_limit': sys.maxsize,
'slots_min_spare': 0,
'node_idle_limit': 120,
'limits': [],
'spawn_wait_time': 30,
'spawn_command': '/bin/true',
'destroy_command': '/bin/true',
'command_failure_command': '/bin/true',
'command_retries': 0,
'command_retry_wait': 10,
'terminate_when_idle': True,
}
log = logging.getLogger(__name__)
[docs]class SwarmManager(object):
[docs] def __init__(self, conf, docker_interface):
self._conf = conf
self._cpus = docker_interface._conf.cpus
self._docker_interface = docker_interface
self._state = SwarmState(conf, docker_interface._conf)
self._log_interval = 60
self._last_log = time.time() - self._log_interval
[docs] def run(self):
while True:
self._maintain_pool()
self._check_for_new_nodes()
self._clean_services()
self._log_state()
self._terminate_if_idle()
time.sleep(1)
def _run_command(self, command, command_retries=None, returncodes=None, **kwargs):
stdout = None
attempt = 0
if not command_retries:
command_retries = self._conf.command_retries
allowed_returncodes = (0,)
if returncodes:
allowed_returncodes = returncodes
raw_cmd = command.format(**kwargs)
log.debug('running command: %s', raw_cmd)
success = False
while not success and attempt < command_retries + 1:
attempt += 1
p = subprocess.Popen(raw_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, shell=True)
stdout, stderr = p.communicate()
if p.returncode not in allowed_returncodes:
msg = "error running '%s': returned %s" % (raw_cmd, p.returncode)
if attempt < command_retries + 1:
msg += ', waiting %s seconds' % self._conf.command_retry_wait
time.sleep(self._conf.command_retry_wait)
log.warning(msg + "\nstdout: %s\nstderr: %s\n", stdout, stderr)
else:
msg += ' (final attempt)'
log.error(msg + "\nstdout: %s\nstderr: %s\n", stdout, stderr)
self._run_command(self._conf.command_failure_command.format(failed_command=raw_cmd), command_retries=0)
stdout = None
else:
stdout = stdout.strip()
success = True
if returncodes:
return p.returncode, stdout
else:
return stdout
# main loop methods
def _maintain_pool(self):
waiting = self._docker_interface.services_waiting_by_constraints()
active = self._docker_interface.nodes_active_by_constraints()
for constraints, needed_dict in self._state.slots_needed(waiting, active).items():
services = needed_dict['services']
nodes = needed_dict['nodes']
slots_needed = needed_dict['slots_needed']
if slots_needed > 0:
self._spawn_nodes(constraints, services, slots_needed)
elif slots_needed < 0:
self._destroy_nodes(constraints, nodes, slots_needed)
def _check_for_new_nodes(self):
nodes = None
for node_state in self._state.spawning_nodes():
name = node_state['name']
elapsed = node_state['elapsed']
constraints = node_state['constraints']
state = node_state['state']
if not nodes:
nodes = self._docker_interface.nodes()
node = ([x for x in nodes if x.name == name] + [None])[0]
if not node:
if elapsed > self._conf.spawn_wait_time:
log.warning("spawning node '%s' not found in `docker node ls` and spawn_wait_time exceeded! %d seconds have elapsed", name, elapsed)
self._run_command(self._conf.command_failure_command.format(failed_command='wait_for_spawning_node %s' % name), command_retries=0)
self.mark_spawning_node_timeout(name)
elif node.is_ok():
node.set_labels_for_constraints(constraints)
self._state.mark_spawning_node_ready(name)
log.info("spawning node '%s' is ready!", name)
elif node.state != state:
log.info("spawning node '%s' state changed from '%s' to '%s'", name, state, node.state)
node.set_labels_for_constraints(constraints)
self._state.mark_spawning_node_state(name, node.state)
elif elapsed > self._conf.spawn_wait_time:
log.warning("spawning node '%s' state is '%s' after %s seconds", name, node.state, elapsed)
def _clean_services(self):
cleaned_services = self._docker_interface.services_clean()
if cleaned_services:
self._state.clean_services(cleaned_services)
log.info("cleaned services: %s", ', '.join([x.id for x in cleaned_services]))
def _log_state(self):
if self._last_log < (time.time() - self._log_interval):
services = self._docker_interface.services()
nodes = self._docker_interface.nodes()
log.info('current service states: %s', ', '.join(['%s: %s' % (x.name, x.state) for x in services]))
log.info('current node states: %s', ', '.join(['%s: %s' % (x.name, x.state) for x in nodes]))
# TODO: log services waiting due to limits, idle nodes alive due to limits
self._last_log = time.time()
def _terminate_if_idle(self):
if not self._conf.terminate_when_idle:
return
for node in self._docker_interface.nodes():
if node.task_count > 0:
return # nodes are running a galaxy service
# no tasks running, check that all nodes are destroyed that are going to be destroyed
waiting = self._docker_interface.services_waiting_by_constraints()
active = self._docker_interface.nodes_active_by_constraints()
for constraints, nodes in active.items():
services = waiting.get(constraints, [])
needed, total = self._state.slots_delta(constraints, services, nodes)
if needed > 0:
return # services are waiting
if needed < 0:
extra_slots = max(
self._state.get_limit(constraints, 'slots_min_limit'),
self._state.get_limit(constraints, 'slots_min_spare')
)
if total + needed != extra_slots:
return # otherwise, nodes remaining are for configured minimums
# FIXME: there's a race condition here
log.info('nothing to manage, shutting down')
sys.exit(0)
# other methods
def _get_spawn_property(self, constraints, constraint_name, services):
if services:
# this isn't very nice
if constraint_name == IMAGE_CONSTRAINT:
return services[0].image
elif constraint_name == CPUS_CONSTRAINT:
return services[0].cpus
for constraint in constraints:
if constraint.name == constraint_name:
return constraint.value
return None
def _spawn_nodes(self, constraints, services, slots_needed):
service_ids = [x.id for x in services]
if service_ids:
log.info("requesting node(s) for services needing %s slots with constraints [%s]: %s",
slots_needed, constraints, ', '.join(service_ids))
else:
log.info("requesting node(s) for %s slots (due to minimum limits with constraints [%s]",
slots_needed, constraints)
command = self._conf.spawn_command.format(
service_ids=','.join(service_ids),
service_count=len(services),
image=self._get_spawn_property(constraints, IMAGE_CONSTRAINT, services) or '',
cpus=self._get_spawn_property(constraints, CPUS_CONSTRAINT, services) or '',
slots=slots_needed,
)
rc, output = self._run_command(command, returncodes=(0, 2))
if rc == 2:
log.info('spawn_command indicated that spawning should be retried: %s', output)
elif not output:
log.warning('spawn_command returned no new nodes, cannot manage nodes')
self._state.mark_services_handled(services)
else:
log.info("node allocator will spawn: %s", output)
self._state.nodes_requested(constraints, output.split())
self._state.mark_services_handled(services)
def _destroy_nodes(self, constraints, nodes, slots_needed):
# nodes here is active nodes only, should be fine
destroy_nodes = []
destroyed_slots = 0
for node in nodes:
node_slots = node.cpus / self._cpus
if self._node_ready_for_destruction(node) and (destroyed_slots + node_slots) <= abs(slots_needed):
node.drain()
destroy_nodes.append(node)
destroyed_slots += node_slots
if destroy_nodes:
command = self._conf.destroy_command.format(
nodes=' '.join([x.name for x in destroy_nodes]))
destroyed_nodes = self._run_command(command)
if not destroyed_nodes:
log.warning('destroy_command returned no destroyed nodes')
else:
log.info("destroyed nodes: %s", destroyed_nodes)
def _node_ready_for_destruction(self, node):
ready = False
if node.destroyable():
if self._state.mark_node_idle(node.name):
log.debug("node '%s' is now idle", node.name)
ready = self._state.is_destruction_time(node)
elif self._state.clear_node_idle(node.name):
log.debug("node '%s' is no longer idle", node.name)
return ready
[docs]class SwarmState(object):
[docs] def __init__(self, conf, interface_conf):
self._conf = conf
self._cpus = interface_conf.cpus # this is effectively the slot size
self._service_create_image_constraint = interface_conf.service_create_image_constraint
self._service_create_cpus_constraint = interface_conf.service_create_cpus_constraint
self._handled_services = set()
self._waiting_since = {}
self._spawning_nodes = {}
self._surplus_nodes = {}
self._limits = {}
for limit in conf.limits:
constraints = DockerServiceConstraints.from_constraint_string_list(limit.get('constraints', []))
self._limits[constraints] = self._make_limit_dict(limit)
def _make_limit_dict(self, limit):
return {
'slots_min_limit': limit.get('slots_min_limit', self._conf.slots_min_limit),
'slots_max_limit': limit.get('slots_max_limit', self._conf.slots_max_limit),
'slots_min_spare': limit.get('slots_min_spare', self._conf.slots_min_spare),
'node_idle_limit': limit.get('node_idle_limit', self._conf.node_idle_limit),
}
[docs] def slots_needed(self, waiting_services, active_nodes):
"""Given a list of services waiting of each constraint set, and active nodes of each constraint set, return the
number of slots needed of each constraint set if the maximum wait thresholds have been reached, constrained by
the configured limits.
"""
rval = {}
services_constraints = set(waiting_services.keys())
nodes_constraints = set(active_nodes.keys())
limits_constraints = set(self._limits.keys())
all_constraints = services_constraints.union(nodes_constraints).union(limits_constraints)
if not all_constraints and (self._conf.slots_min_spare or self._conf.slots_min_limit):
if self._service_create_image_constraint or self._service_create_cpus_constraint:
raise Exception("Global 'slots_min_limit' and/or 'slots_min_spare' are set and "
"'service_create_image_constraint' and/or 'service_create_cpus_constraint' are set but "
"constraint-specific limits are unset, minimum nodes cannot be started since the constraints are not "
"known until service creation time. Either disable 'service_create_*_constraint' or create "
"constraint-specific limits in the 'limits' section of 'manager_conf' in containers_conf.yml")
all_constraints.add(DockerServiceConstraints.from_constraint_string_list([]))
for constraints in all_constraints:
services = waiting_services.get(constraints, [])
nodes = active_nodes.get(constraints, [])
# filter out any services that have already been handled
services = [s for s in services if s not in self._handled_services]
# calculate slots needed
slots_needed, total_slots = self.slots_delta(constraints, services, nodes)
# set or clear wait time
if services and constraints not in self._waiting_since:
self._waiting_since[constraints] = time.time()
elif not services and constraints in self._waiting_since:
del self._waiting_since[constraints]
rval[constraints] = {
'services': services,
'nodes': nodes,
'slots_needed': slots_needed,
}
return rval
[docs] def slots_delta(self, constraints, services, nodes):
total = 0
used = 0
for node in nodes:
used += sum([t.cpus for t in node.tasks]) / self._cpus
total += node.cpus / self._cpus
# need at least this many slots
needed = used + self.get_limit(constraints, 'slots_min_spare')
if (len(services) > self._conf.service_wait_count_limit and
time.time() - self._waiting_since.get(constraints, time.time()) > self._conf.service_wait_time_limit):
# add slots for waiting services that have exceeded limits
needed += sum([s.cpus for s in services]) / self._cpus
# subtract slots for spawning nodes
needed -= sum([n.get('slots', 0) for n in self._spawning_nodes.get(constraints, {})])
# ensure no less than slots_min_limit slots will exist (free or used)
needed = max(needed, self.get_limit(constraints, 'slots_min_limit'))
# ensure no more than slots_max_limit slots will exist
needed = min(needed, self.get_limit(constraints, 'slots_max_limit'))
# need to add/remove this many slots
return int(needed - total), total
[docs] def get_limit(self, constraints, limit):
limits = self._limits.get(constraints, {})
return limits.get(limit, self._conf[limit])
[docs] def spawning_nodes(self):
now = time.time()
for constraints in self._spawning_nodes.keys():
for name, node in self._spawning_nodes[constraints].items():
yval = {
'name': name,
'elapsed': now - node['time_requested'],
'constraints': constraints,
}
yval.update(node)
yield yval
[docs] def nodes_requested(self, constraints, nodes):
if constraints not in self._spawning_nodes:
self._spawning_nodes[constraints] = {}
for node in nodes:
name = node.split(':')[0]
try:
slots = int(node.split(':')[1])
except IndexError:
slots = int(1 / self._cpus)
self._spawning_nodes[constraints][name] = {
'state': 'requested',
'time_requested': time.time(),
'slots': slots,
}
def _delete_spawning_node(self, node_name):
for constraints in self._spawning_nodes.keys():
if node_name in self._spawning_nodes[constraints]:
del self._spawning_nodes[constraints][node_name]
[docs] def mark_spawning_node_state(self, node_name, state):
for constraints in self._spawning_nodes.keys():
if node_name in self._spawning_nodes[constraints]:
self._spawning_nodes[constraints][node_name]['state'] = state
[docs] def is_destruction_time(self, node):
now = time.time()
limit = self.get_limit(node.labels_as_constraints, 'node_idle_limit')
return now - self._surplus_nodes.get(node.name, now) > limit
[docs] def mark_node_idle(self, node_name):
if node_name not in self._surplus_nodes:
self._surplus_nodes[node_name] = time.time()
return True
return False
[docs] def clear_node_idle(self, node_name):
if node_name in self._surplus_nodes:
del self._surplus_nodes[node_name]
return True
return False
[docs]def main(argv=None, fork=False):
if not daemon:
log.warning('The daemon module is required to use the swarm manager, install it with `pip install python-daemon`')
return
if argv is None:
argv = sys.argv[1:]
if fork:
p = subprocess.Popen([sys.executable, __file__] + argv)
p.wait()
else:
args = _arg_parser().parse_args(argv)
_run_swarm_manager(args)
def _arg_parser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("-c", "--containers-config-file", default=None)
parser.add_argument("-f", "--foreground", action="store_true", default=False)
parser.add_argument("-d", "--debug", action="store_true", default=False)
parser.add_argument("-s", "--swarm", default="_default_",
help='Swarm name in containers config to manage')
return parser
def _run_swarm_manager(args):
containers_config_file = _containers_config_file(args)
containers_conf = parse_containers_config(containers_config_file)
container_conf = _container_conf(containers_conf, args.swarm)
swarm_manager_conf = _swarm_manager_conf(container_conf)
_configure_logging(args, swarm_manager_conf)
docker_interface = build_container_interfaces(containers_config_file, containers_conf=containers_conf)[args.swarm]
pidfile = _swarm_manager_pidfile(swarm_manager_conf)
if not args.foreground:
_swarm_manager_daemon(pidfile, swarm_manager_conf['log_file'], swarm_manager_conf, docker_interface)
else:
if swarm_manager_conf['terminate_when_idle']:
log.info('running in the foreground, disabling automatic swarm manager termination')
swarm_manager_conf['terminate_when_idle'] = False
else:
log.info("running in the foreground")
pidfile.acquire()
try:
_swarm_manager(swarm_manager_conf, docker_interface)
finally:
pidfile.release()
def _containers_config_file(args):
containers_config_file = args.containers_config_file
if not containers_config_file:
for path in ('./config', '.'):
testf = os.path.join(path, 'containers_conf.yml')
if os.path.exists(testf):
containers_config_file = testf
assert containers_config_file, \
"containers_conf.yml cannot be found, please set with '-c' or '--containers-config-file'"
return containers_config_file
def _container_conf(containers_conf, swarm):
assert swarm in containers_conf, \
"invalid container configuration name: %s" % swarm
assert containers_conf[swarm]['type'] == 'docker_swarm', \
"'%s' container configuration is not 'docker_swarm' type" % swarm
assert containers_conf[swarm].get('managed', True), \
"'%s' swarm is not managed" % swarm
return containers_conf[swarm]
def _swarm_manager_conf(new_conf):
conf = ContainerInterfaceConfig()
conf.update(SWARM_MANAGER_CONF_DEFAULTS)
conf.update(new_conf.get('manager_conf', {}))
xdg_env = _load_xdg_environment()
for opt in ('pid_file', 'log_file'):
conf[opt] = conf[opt].format(xdg_data_home=xdg_env['data_home'])
return conf
def _configure_logging(args, conf):
global log
if args.debug:
log_level = logging.DEBUG
else:
log_level = logging.getLevelName(conf.get('log_level', 'INFO').upper())
assert int(log_level), 'invalid log level: %s' % conf['log_level']
log = logging.getLogger(__name__)
log.setLevel(log_level)
log_format = conf.get('log_format', '%(name)s %(levelname)s %(asctime)s %(message)s')
formatter = logging.Formatter(log_format)
# file logging is handled by daemon
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
log.addHandler(handler)
def _load_xdg_environment():
return dict(
data_home=os.path.expanduser(os.environ.get('XDG_DATA_HOME', '~/.local/share')),
)
def _swarm_manager_pidfile(conf):
try:
os.makedirs(os.path.dirname(conf['pid_file']))
except (IOError, OSError) as exc:
if exc.errno != errno.EEXIST:
raise
return daemon.pidfile.PIDLockFile(conf['pid_file'])
def _swarm_manager_daemon(pidfile, logfile, swarm_manager_conf, docker_interface):
log.debug("daemonizing, logs will be written to '%s'", logfile)
with open(logfile, 'a') as logfh:
try:
with daemon.DaemonContext(
pidfile=pidfile,
stdout=logfh,
stderr=logfh,
):
_swarm_manager(swarm_manager_conf, docker_interface)
except lockfile.AlreadyLocked:
log.debug("attempt to daemonize with swarm manager already running ignored")
def _swarm_manager(conf, docker_interface):
swarm_manager = SwarmManager(conf, docker_interface)
log.debug("swarm manager loaded, running...")
swarm_manager.run()
if __name__ == '__main__':
__name__ = 'swarm_manager'
main()