Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.containers.docker_swarm

"""
Docker Swarm mode interface
"""
from __future__ import absolute_import

import logging
import os.path
import subprocess
from functools import partial

try:
    import docker.types
except ImportError:
    from galaxy.util.bunch import Bunch
    docker = Bunch(types=Bunch(
        ContainerSpec=None,
        RestartPolicy=None,
        Resources=None,
        Placement=None,
    ))

from galaxy.containers.docker import (
    DockerAPIInterface,
    DockerCLIInterface,
    DockerInterface
)
from galaxy.containers.docker_decorators import docker_columns, docker_json
from galaxy.containers.docker_model import (
    CPUS_CONSTRAINT,
    DockerNode,
    DockerService,
    DockerTask,
    IMAGE_CONSTRAINT
)
from galaxy.exceptions import ContainerRunError
from galaxy.util import unicodify
from galaxy.util.json import safe_dumps_formatted

log = logging.getLogger(__name__)

SWARM_MANAGER_PATH = os.path.abspath(
    os.path.join(
        os.path.dirname(__file__),
        os.path.pardir,
        os.path.pardir,
        os.path.pardir,
        'scripts',
        'docker_swarm_manager.py'))


[docs]class DockerSwarmInterface(DockerInterface): container_class = DockerService conf_defaults = { 'ignore_volumes': False, 'node_prefix': None, 'service_create_image_constraint': False, 'service_create_cpus_constraint': False, 'resolve_image_digest': False, 'managed': True, 'manager_autostart': True, } publish_port_list_required = True supports_volumes = False
[docs] def validate_config(self): super(DockerSwarmInterface, self).validate_config() self._node_prefix = self._conf.node_prefix
[docs] def run_in_container(self, command, image=None, **kwopts): """Run a service like a detached container """ kwopts['replicas'] = 1 kwopts['restart_condition'] = 'none' if kwopts.get('publish_all_ports', False): # not supported for services # TODO: inspect image (or query registry if possible) for port list if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): # assume this covers for publish_all_ports del kwopts['publish_all_ports'] else: raise ContainerRunError( "Publishing all ports is not supported in Docker swarm" " mode, use `publish_port_random` or `ports`", image=image, command=command ) if not kwopts.get('detach', True): raise ContainerRunError( "Running attached containers is not supported in Docker swarm mode", image=image, command=command ) elif kwopts.get('detach', None): del kwopts['detach'] if kwopts.get('volumes', None): if self._conf.ignore_volumes: log.warning( "'volumes' kwopt is set and not supported in Docker swarm " "mode, volumes will not be passed (set 'ignore_volumes: " "False' in containers config to fail instead): %s" % kwopts['volumes'] ) else: raise ContainerRunError( "'volumes' kwopt is set and not supported in Docker swarm " "mode (set 'ignore_volumes: True' in containers config to " "warn instead): %s" % kwopts['volumes'], image=image, command=command ) # ensure the volumes key is removed from kwopts kwopts.pop('volumes', None) service = self.service_create(command, image=image, **kwopts) self._run_swarm_manager() return service
# # helpers # def _run_swarm_manager(self): if self._conf.managed and self._conf.manager_autostart: try: # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', self.containers_config_file, '--swarm', self.key]) except subprocess.CalledProcessError as exc: log.error('Failed to launch swarm manager: %s', unicodify(exc)) def _get_image(self, image): """Get the image string, either from the argument, or from the configured interface default if ``image`` is ``None``. Optionally resolve the image to its digest if ``resolve_image_digest`` is set in the interface configuration. If the image has not been pulled, the repo digest cannot be determined and the image name will be returned. :type image: str or None :param image: image id or name :returns: image name or image repo digest """ if not image: image = self._conf.image assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" if self._conf.resolve_image_digest: image = self.image_repodigest(image) return image def _objects_by_attribute(self, generator, attribute_name): rval = {} for obj in generator: attr = getattr(obj, attribute_name) if attr not in rval: rval[attr] = [] rval[attr].append(obj) return rval # # docker object generators #
[docs] def services(self, id=None, name=None): for service_dict in self.service_ls(id=id, name=name): service_id = service_dict['ID'] service = DockerService(self, service_id, inspect=service_dict) if service.name.startswith(self._name_prefix): yield service
[docs] def service(self, id=None, name=None): try: return self.services(id=id, name=name).next() except StopIteration: return None
[docs] def services_in_state(self, desired, current, tasks='any'): for service in self.services(): if service.in_state(desired, current, tasks=tasks): yield service
[docs] def service_tasks(self, service): for task_dict in self.service_ps(service.id): yield DockerTask.from_api(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None): for node_dict in self.node_ls(id=id, name=name): node_id = node_dict['ID'] node = DockerNode(self, node_id, inspect=node_dict) if self._node_prefix and not node.name.startswith(self._node_prefix): continue yield node
[docs] def node(self, id=None, name=None): try: return self.nodes(id=id, name=name).next() except StopIteration: return None
[docs] def nodes_in_state(self, status, availability): for node in self.nodes(): if node.in_state(status, availability): yield node
[docs] def node_tasks(self, node): for task_dict in self.node_ps(node.id): yield DockerTask.from_api(self, task_dict, node=node)
# # higher level queries #
[docs] def services_waiting(self): return self.services_in_state('Running', 'Pending')
[docs] def services_waiting_by_constraints(self): return self._objects_by_attribute(self.services_waiting(), 'constraints')
[docs] def services_completed(self): return self.services_in_state('Shutdown', 'Complete', tasks='all')
[docs] def services_terminal(self): return [s for s in self.services() if s.terminal]
[docs] def nodes_active(self): return self.nodes_in_state('Ready', 'Active')
[docs] def nodes_active_by_constraints(self): return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')
# # operations #
[docs] def services_clean(self): cleaned_service_ids = [] completed_services = list(self.services_completed()) # returns a generator, should probably fix this if completed_services: cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) terminal_services = list(self.services_terminal()) for service in terminal_services: log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) if terminal_services: cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services)
[docs]class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): container_type = 'docker_swarm_cli' option_map = { # `service create` options 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, 'replicas': {'flag': '--replicas', 'type': 'string'}, 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, 'name': {'flag': '--name', 'type': 'string'}, 'publish_port_random': {'flag': '--publish', 'type': 'string'}, 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, # `service update` options 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, 'availability': {'flag': '--availability', 'type': 'string'}, } # # docker object generators #
[docs] def services(self, id=None, name=None): for service_dict in self.service_ls(id=id, name=name): service_id = service_dict['ID'] service_name = service_dict['NAME'] if not service_name.startswith(self._name_prefix): continue task_list = self.service_ps(service_id) yield DockerService.from_cli(self, service_dict, task_list)
[docs] def service_tasks(self, service): for task_dict in self.service_ps(service.id): if task_dict['NAME'].strip().startswith(r'\_'): continue # historical task yield DockerTask.from_cli(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None): for node_dict in self.node_ls(id=id, name=name): node_id = node_dict['ID'].strip(' *') node_name = node_dict['HOSTNAME'] if self._node_prefix and not node_name.startswith(self._node_prefix): continue task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) yield DockerNode.from_cli(self, node_dict, task_list)
# # docker subcommands #
[docs] def service_create(self, command, image=None, **kwopts): if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: kwopts['constraint'] = [] image = self._get_image(image) if self._conf.service_create_image_constraint: kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) if self._conf.service_create_cpus_constraint: cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) if self._conf.cpus: kwopts['cpu_limit'] = self._conf.cpus kwopts['cpu_reservation'] = self._conf.cpus if self._conf.memory: kwopts['mem_limit'] = self._conf.memory kwopts['mem_reservation'] = self._conf.memory self.set_kwopts_name(kwopts) args = '{kwopts} {image} {command}'.format( kwopts=self._stringify_kwopts(kwopts), image=image if image else '', command=command if command else '', ).strip() service_id = self._run_docker(subcommand='service create', args=args, verbose=True) return DockerService.from_id(self, service_id)
[docs] @docker_json def service_inspect(self, service_id): return self._run_docker(subcommand='service inspect', args=service_id)[0]
[docs] @docker_columns def service_ls(self, id=None, name=None): return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns def service_ps(self, service_id): return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id))
[docs] def service_rm(self, service_ids): service_ids = ' '.join(service_ids) return self._run_docker(subcommand='service rm', args=service_ids).splitlines()
[docs] @docker_json def node_inspect(self, node_id): return self._run_docker(subcommand='node inspect', args=node_id)[0]
[docs] @docker_columns def node_ls(self, id=None, name=None): return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns def node_ps(self, node_id): return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id))
[docs] def node_update(self, node_id, **kwopts): return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( kwopts=self._stringify_kwopts(kwopts), node_id=node_id ))
[docs] @docker_json def task_inspect(self, task_id): return self._run_docker(subcommand="inspect", args=task_id)
[docs]class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): container_type = 'docker_swarm' placement_option_map = { 'constraint': {'param': 'constraints'}, } service_mode_option_map = { 'service_mode': {'param': 0, 'default': 'replicated'}, 'replicas': {'default': 1}, } endpoint_spec_option_map = { 'ports': {}, } resources_option_map = { 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, 'memory': {'params': ('mem_limit', 'mem_reservation')}, } container_spec_option_map = { 'image': {'param': 0}, 'command': {}, 'environment': {'param': 'env'}, 'labels': {}, } restart_policy_option_map = { 'restart_condition': {'param': 'condition', 'default': 'none'}, 'restart_delay': {'param': 'delay'}, 'restart_max_attempts': {'param': 'max_attemps'}, } task_template_option_map = { '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, '_resources': {'spec_class': docker.types.Resources}, '_restart_policy': {'spec_class': docker.types.RestartPolicy}, '_placement': {'spec_class': docker.types.Placement}, } node_spec_option_map = { 'availability': {'param': 'Availability'}, 'name': {'param': 'Name'}, 'role': {'param': 'Role'}, 'labels': {'param': 'Labels'}, }
[docs] @staticmethod def create_random_port_spec(port): return { 'Protocol': 'tcp', 'PublishedPort': None, 'TargetPort': port, }
# # docker subcommands #
[docs] def service_create(self, command, image=None, **kwopts): # TODO: some of this should probably move to run_in_container when the CLI interface is removed log.debug("Creating docker service with image '%s' for command: %s", image, command) # insert run kwopts from config for opt in self.conf_run_kwopts: if self._conf[opt]: kwopts[opt] = self._conf[opt] # image is part of the container spec kwopts['image'] = self._get_image(image) # service constraints kwopts['constraint'] = kwopts.get('constraint', []) if self._conf.service_create_image_constraint: kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image)) if self._conf.service_create_cpus_constraint: cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus)) # ports if 'publish_port_random' in kwopts: kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] # create specs service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) self.set_kwopts_name(kwopts) log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) success_test = partial(self._first, self.service_ls, name=kwopts['name']) # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class service = self._client.create_service( task_template, mode=service_mode, endpoint_spec=endpoint_spec, success_test=success_test, max_tries=5, **kwopts) service_id = service.get('ID') log.debug('Created service: %s (%s)', kwopts['name'], service_id) return DockerService.from_id(self, service_id)
[docs] def service_inspect(self, service_id): return self._client.inspect_service(service_id)
[docs] def service_ls(self, id=None, name=None): return self._client.services(filters=self._filter_by_id_or_name(id, name))
# roughly `docker service ps`
[docs] def service_ps(self, service_id): return self.task_ls(filters={'service': service_id})
[docs] def service_rm(self, service_ids): r = [] for service_id in service_ids: self._client.remove_service(service_id) r.append(service_id) return r
[docs] def node_inspect(self, node_id): return self._client.inspect_node(node_id)
[docs] def node_ls(self, id=None, name=None): return self._client.nodes(filters=self._filter_by_id_or_name(id, name))
# roughly `docker node ps`
[docs] def node_ps(self, node_id): return self.task_ls(filters={'node': node_id})
[docs] def node_update(self, node_id, **kwopts): node = DockerNode.from_id(self, node_id) spec = node.inspect['Spec'] if 'label_add' in kwopts: kwopts['labels'] = spec.get('Labels', {}) kwopts['labels'].update(kwopts.pop('label_add')) spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) return self._client.update_node(node.id, node.version, node_spec=spec)
[docs] def task_inspect(self, task_id): return self._client.inspect_task(task_id)
[docs] def task_ls(self, filters=None): return self._client.tasks(filters=filters)