Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.containers.docker_swarm

"""
Docker Swarm mode interface
"""

import logging
import os.path
import subprocess
from functools import partial
from typing import Any, Dict, Optional

try:
    import docker.types
except ImportError:
    from galaxy.util.bunch import Bunch
    docker = Bunch(types=Bunch(
        ContainerSpec=None,
        RestartPolicy=None,
        Resources=None,
        Placement=None,
    ))

from galaxy.containers.docker import (
    DockerAPIInterface,
    DockerCLIInterface,
    DockerInterface
)
from galaxy.containers.docker_decorators import docker_columns, docker_json
from galaxy.containers.docker_model import (
    CPUS_CONSTRAINT,
    DockerNode,
    DockerService,
    DockerTask,
    IMAGE_CONSTRAINT
)
from galaxy.exceptions import ContainerRunError
from galaxy.util import unicodify
from galaxy.util.json import safe_dumps_formatted

log = logging.getLogger(__name__)

SWARM_MANAGER_PATH = os.path.abspath(
    os.path.join(
        os.path.dirname(__file__),
        os.path.pardir,
        os.path.pardir,
        os.path.pardir,
        'scripts',
        'docker_swarm_manager.py'))


[docs]class DockerSwarmInterface(DockerInterface): container_class = DockerService conf_defaults: Dict[str, Optional[Any]] = { 'ignore_volumes': False, 'node_prefix': None, 'service_create_image_constraint': False, 'service_create_cpus_constraint': False, 'resolve_image_digest': False, 'managed': True, 'manager_autostart': True, } publish_port_list_required = True supports_volumes = False
[docs] def validate_config(self): super().validate_config() self._node_prefix = self._conf.node_prefix
[docs] def run_in_container(self, command, image=None, **kwopts): """Run a service like a detached container """ kwopts['replicas'] = 1 kwopts['restart_condition'] = 'none' if kwopts.get('publish_all_ports', False): # not supported for services # TODO: inspect image (or query registry if possible) for port list if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): # assume this covers for publish_all_ports del kwopts['publish_all_ports'] else: raise ContainerRunError( "Publishing all ports is not supported in Docker swarm" " mode, use `publish_port_random` or `ports`", image=image, command=command ) if not kwopts.get('detach', True): raise ContainerRunError( "Running attached containers is not supported in Docker swarm mode", image=image, command=command ) elif kwopts.get('detach', None): del kwopts['detach'] if kwopts.get('volumes', None): if self._conf.ignore_volumes: log.warning( "'volumes' kwopt is set and not supported in Docker swarm " "mode, volumes will not be passed (set 'ignore_volumes: " "False' in containers config to fail instead): %s" % kwopts['volumes'] ) else: raise ContainerRunError( "'volumes' kwopt is set and not supported in Docker swarm " "mode (set 'ignore_volumes: True' in containers config to " "warn instead): %s" % kwopts['volumes'], image=image, command=command ) # ensure the volumes key is removed from kwopts kwopts.pop('volumes', None) service = self.service_create(command, image=image, **kwopts) self._run_swarm_manager() return service
# # helpers # def _run_swarm_manager(self): if self._conf.managed and self._conf.manager_autostart: try: # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', self.containers_config_file, '--swarm', self.key]) except subprocess.CalledProcessError as exc: log.error('Failed to launch swarm manager: %s', unicodify(exc)) def _get_image(self, image): """Get the image string, either from the argument, or from the configured interface default if ``image`` is ``None``. Optionally resolve the image to its digest if ``resolve_image_digest`` is set in the interface configuration. If the image has not been pulled, the repo digest cannot be determined and the image name will be returned. :type image: str or None :param image: image id or name :returns: image name or image repo digest """ if not image: image = self._conf.image assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" if self._conf.resolve_image_digest: image = self.image_repodigest(image) return image def _objects_by_attribute(self, generator, attribute_name): rval = {} for obj in generator: attr = getattr(obj, attribute_name) if attr not in rval: rval[attr] = [] rval[attr].append(obj) return rval # # docker object generators #
[docs] def services(self, id=None, name=None): for service_dict in self.service_ls(id=id, name=name): service_id = service_dict['ID'] service = DockerService(self, service_id, inspect=service_dict) if service.name.startswith(self._name_prefix): yield service
[docs] def service(self, id=None, name=None): try: return next(self.services(id=id, name=name)) except StopIteration: return None
[docs] def services_in_state(self, desired, current, tasks='any'): for service in self.services(): if service.in_state(desired, current, tasks=tasks): yield service
[docs] def service_tasks(self, service): for task_dict in self.service_ps(service.id): yield DockerTask.from_api(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None): for node_dict in self.node_ls(id=id, name=name): node_id = node_dict['ID'] node = DockerNode(self, node_id, inspect=node_dict) if self._node_prefix and not node.name.startswith(self._node_prefix): continue yield node
[docs] def node(self, id=None, name=None): try: return next(self.nodes(id=id, name=name)) except StopIteration: return None
[docs] def nodes_in_state(self, status, availability): for node in self.nodes(): if node.in_state(status, availability): yield node
[docs] def node_tasks(self, node): for task_dict in self.node_ps(node.id): yield DockerTask.from_api(self, task_dict, node=node)
# # higher level queries #
[docs] def services_waiting(self): return self.services_in_state('Running', 'Pending')
[docs] def services_waiting_by_constraints(self): return self._objects_by_attribute(self.services_waiting(), 'constraints')
[docs] def services_completed(self): return self.services_in_state('Shutdown', 'Complete', tasks='all')
[docs] def services_terminal(self): return [s for s in self.services() if s.terminal]
[docs] def nodes_active(self): return self.nodes_in_state('Ready', 'Active')
[docs] def nodes_active_by_constraints(self): return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')
# # operations #
[docs] def services_clean(self): cleaned_service_ids = [] completed_services = list(self.services_completed()) # returns a generator, should probably fix this if completed_services: cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) terminal_services = list(self.services_terminal()) for service in terminal_services: log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) if terminal_services: cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services)
[docs]class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): container_type = 'docker_swarm_cli' option_map = { # `service create` options 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, 'replicas': {'flag': '--replicas', 'type': 'string'}, 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, 'name': {'flag': '--name', 'type': 'string'}, 'publish_port_random': {'flag': '--publish', 'type': 'string'}, 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, # `service update` options 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, 'availability': {'flag': '--availability', 'type': 'string'}, } # # docker object generators #
[docs] def services(self, id=None, name=None): for service_dict in self.service_ls(id=id, name=name): service_id = service_dict['ID'] service_name = service_dict['NAME'] if not service_name.startswith(self._name_prefix): continue task_list = self.service_ps(service_id) yield DockerService.from_cli(self, service_dict, task_list)
[docs] def service_tasks(self, service): for task_dict in self.service_ps(service.id): if task_dict['NAME'].strip().startswith(r'\_'): continue # historical task yield DockerTask.from_cli(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None): for node_dict in self.node_ls(id=id, name=name): node_id = node_dict['ID'].strip(' *') node_name = node_dict['HOSTNAME'] if self._node_prefix and not node_name.startswith(self._node_prefix): continue task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) yield DockerNode.from_cli(self, node_dict, task_list)
# # docker subcommands #
[docs] def service_create(self, command, image=None, **kwopts): if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: kwopts['constraint'] = [] image = self._get_image(image) if self._conf.service_create_image_constraint: kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) if self._conf.service_create_cpus_constraint: cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) if self._conf.cpus: kwopts['cpu_limit'] = self._conf.cpus kwopts['cpu_reservation'] = self._conf.cpus if self._conf.memory: kwopts['mem_limit'] = self._conf.memory kwopts['mem_reservation'] = self._conf.memory self.set_kwopts_name(kwopts) args = '{kwopts} {image} {command}'.format( kwopts=self._stringify_kwopts(kwopts), image=image if image else '', command=command if command else '', ).strip() service_id = self._run_docker(subcommand='service create', args=args, verbose=True) return DockerService.from_id(self, service_id)
[docs] @docker_json def service_inspect(self, service_id): return self._run_docker(subcommand='service inspect', args=service_id)[0]
[docs] @docker_columns def service_ls(self, id=None, name=None): return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns def service_ps(self, service_id): return self._run_docker(subcommand='service ps', args=f'--no-trunc {service_id}')
[docs] def service_rm(self, service_ids): service_ids = ' '.join(service_ids) return self._run_docker(subcommand='service rm', args=service_ids).splitlines()
[docs] @docker_json def node_inspect(self, node_id): return self._run_docker(subcommand='node inspect', args=node_id)[0]
[docs] @docker_columns def node_ls(self, id=None, name=None): return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns def node_ps(self, node_id): return self._run_docker(subcommand='node ps', args=f'--no-trunc {node_id}')
[docs] def node_update(self, node_id, **kwopts): return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( kwopts=self._stringify_kwopts(kwopts), node_id=node_id ))
[docs] @docker_json def task_inspect(self, task_id): return self._run_docker(subcommand="inspect", args=task_id)
[docs]class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): container_type = 'docker_swarm' placement_option_map = { 'constraint': {'param': 'constraints'}, } service_mode_option_map = { 'service_mode': {'param': 0, 'default': 'replicated'}, 'replicas': {'default': 1}, } endpoint_spec_option_map: Dict[str, Dict] = { 'ports': {}, } resources_option_map = { 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, 'memory': {'params': ('mem_limit', 'mem_reservation')}, } container_spec_option_map = { 'image': {'param': 0}, 'command': {}, 'environment': {'param': 'env'}, 'labels': {}, } restart_policy_option_map = { 'restart_condition': {'param': 'condition', 'default': 'none'}, 'restart_delay': {'param': 'delay'}, 'restart_max_attempts': {'param': 'max_attemps'}, } task_template_option_map = { '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, '_resources': {'spec_class': docker.types.Resources}, '_restart_policy': {'spec_class': docker.types.RestartPolicy}, '_placement': {'spec_class': docker.types.Placement}, } node_spec_option_map = { 'availability': {'param': 'Availability'}, 'name': {'param': 'Name'}, 'role': {'param': 'Role'}, 'labels': {'param': 'Labels'}, }
[docs] @staticmethod def create_random_port_spec(port): return { 'Protocol': 'tcp', 'PublishedPort': None, 'TargetPort': port, }
# # docker subcommands #
[docs] def service_create(self, command, image=None, **kwopts): # TODO: some of this should probably move to run_in_container when the CLI interface is removed log.debug("Creating docker service with image '%s' for command: %s", image, command) # insert run kwopts from config for opt in self.conf_run_kwopts: if self._conf[opt]: kwopts[opt] = self._conf[opt] # image is part of the container spec kwopts['image'] = self._get_image(image) # service constraints kwopts['constraint'] = kwopts.get('constraint', []) if self._conf.service_create_image_constraint: kwopts['constraint'].append(f"{IMAGE_CONSTRAINT}=={image}") if self._conf.service_create_cpus_constraint: cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) kwopts['constraint'].append(f"{CPUS_CONSTRAINT}=={cpus}") # ports if 'publish_port_random' in kwopts: kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] # create specs service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) self.set_kwopts_name(kwopts) log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) success_test = partial(self._first, self.service_ls, name=kwopts['name']) # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class service = self._client.create_service( task_template, mode=service_mode, endpoint_spec=endpoint_spec, success_test=success_test, max_tries=5, **kwopts) service_id = service.get('ID') log.debug('Created service: %s (%s)', kwopts['name'], service_id) return DockerService.from_id(self, service_id)
[docs] def service_inspect(self, service_id): return self._client.inspect_service(service_id)
[docs] def service_ls(self, id=None, name=None): return self._client.services(filters=self._filter_by_id_or_name(id, name))
# roughly `docker service ps`
[docs] def service_ps(self, service_id): return self.task_ls(filters={'service': service_id})
[docs] def service_rm(self, service_ids): r = [] for service_id in service_ids: self._client.remove_service(service_id) r.append(service_id) return r
[docs] def node_inspect(self, node_id): return self._client.inspect_node(node_id)
[docs] def node_ls(self, id=None, name=None): return self._client.nodes(filters=self._filter_by_id_or_name(id, name))
# roughly `docker node ps`
[docs] def node_ps(self, node_id): return self.task_ls(filters={'node': node_id})
[docs] def node_update(self, node_id, **kwopts): node = DockerNode.from_id(self, node_id) spec = node.inspect['Spec'] if 'label_add' in kwopts: kwopts['labels'] = spec.get('Labels', {}) kwopts['labels'].update(kwopts.pop('label_add')) spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) return self._client.update_node(node.id, node.version, node_spec=spec)
[docs] def task_inspect(self, task_id): return self._client.inspect_task(task_id)
[docs] def task_ls(self, filters=None): return self._client.tasks(filters=filters)