Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.containers.docker_swarm
"""
Docker Swarm mode interface
"""
import logging
import os.path
import subprocess
from functools import partial
from typing import Any, Dict, Optional
try:
import docker.types
except ImportError:
from galaxy.util.bunch import Bunch
docker = Bunch(types=Bunch(
ContainerSpec=None,
RestartPolicy=None,
Resources=None,
Placement=None,
))
from galaxy.containers.docker import (
DockerAPIInterface,
DockerCLIInterface,
DockerInterface
)
from galaxy.containers.docker_decorators import docker_columns, docker_json
from galaxy.containers.docker_model import (
CPUS_CONSTRAINT,
DockerNode,
DockerService,
DockerTask,
IMAGE_CONSTRAINT
)
from galaxy.exceptions import ContainerRunError
from galaxy.util import unicodify
from galaxy.util.json import safe_dumps_formatted
log = logging.getLogger(__name__)
SWARM_MANAGER_PATH = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
os.path.pardir,
os.path.pardir,
os.path.pardir,
'scripts',
'docker_swarm_manager.py'))
[docs]class DockerSwarmInterface(DockerInterface):
container_class = DockerService
conf_defaults: Dict[str, Optional[Any]] = {
'ignore_volumes': False,
'node_prefix': None,
'service_create_image_constraint': False,
'service_create_cpus_constraint': False,
'resolve_image_digest': False,
'managed': True,
'manager_autostart': True,
}
publish_port_list_required = True
supports_volumes = False
[docs] def validate_config(self):
super().validate_config()
self._node_prefix = self._conf.node_prefix
[docs] def run_in_container(self, command, image=None, **kwopts):
"""Run a service like a detached container
"""
kwopts['replicas'] = 1
kwopts['restart_condition'] = 'none'
if kwopts.get('publish_all_ports', False):
# not supported for services
# TODO: inspect image (or query registry if possible) for port list
if kwopts.get('publish_port_random', False) or kwopts.get('ports', False):
# assume this covers for publish_all_ports
del kwopts['publish_all_ports']
else:
raise ContainerRunError(
"Publishing all ports is not supported in Docker swarm"
" mode, use `publish_port_random` or `ports`",
image=image,
command=command
)
if not kwopts.get('detach', True):
raise ContainerRunError(
"Running attached containers is not supported in Docker swarm mode",
image=image,
command=command
)
elif kwopts.get('detach', None):
del kwopts['detach']
if kwopts.get('volumes', None):
if self._conf.ignore_volumes:
log.warning(
"'volumes' kwopt is set and not supported in Docker swarm "
"mode, volumes will not be passed (set 'ignore_volumes: "
"False' in containers config to fail instead): %s" % kwopts['volumes']
)
else:
raise ContainerRunError(
"'volumes' kwopt is set and not supported in Docker swarm "
"mode (set 'ignore_volumes: True' in containers config to "
"warn instead): %s" % kwopts['volumes'],
image=image,
command=command
)
# ensure the volumes key is removed from kwopts
kwopts.pop('volumes', None)
service = self.service_create(command, image=image, **kwopts)
self._run_swarm_manager()
return service
#
# helpers
#
def _run_swarm_manager(self):
if self._conf.managed and self._conf.manager_autostart:
try:
# sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi
subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file',
self.containers_config_file, '--swarm', self.key])
except subprocess.CalledProcessError as exc:
log.error('Failed to launch swarm manager: %s', unicodify(exc))
def _get_image(self, image):
"""Get the image string, either from the argument, or from the
configured interface default if ``image`` is ``None``. Optionally
resolve the image to its digest if ``resolve_image_digest`` is set in
the interface configuration.
If the image has not been pulled, the repo digest cannot be determined
and the image name will be returned.
:type image: str or None
:param image: image id or name
:returns: image name or image repo digest
"""
if not image:
image = self._conf.image
assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service"
if self._conf.resolve_image_digest:
image = self.image_repodigest(image)
return image
def _objects_by_attribute(self, generator, attribute_name):
rval = {}
for obj in generator:
attr = getattr(obj, attribute_name)
if attr not in rval:
rval[attr] = []
rval[attr].append(obj)
return rval
#
# docker object generators
#
[docs] def services(self, id=None, name=None):
for service_dict in self.service_ls(id=id, name=name):
service_id = service_dict['ID']
service = DockerService(self, service_id, inspect=service_dict)
if service.name.startswith(self._name_prefix):
yield service
[docs] def service(self, id=None, name=None):
try:
return next(self.services(id=id, name=name))
except StopIteration:
return None
[docs] def services_in_state(self, desired, current, tasks='any'):
for service in self.services():
if service.in_state(desired, current, tasks=tasks):
yield service
[docs] def service_tasks(self, service):
for task_dict in self.service_ps(service.id):
yield DockerTask.from_api(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None):
for node_dict in self.node_ls(id=id, name=name):
node_id = node_dict['ID']
node = DockerNode(self, node_id, inspect=node_dict)
if self._node_prefix and not node.name.startswith(self._node_prefix):
continue
yield node
[docs] def node(self, id=None, name=None):
try:
return next(self.nodes(id=id, name=name))
except StopIteration:
return None
[docs] def nodes_in_state(self, status, availability):
for node in self.nodes():
if node.in_state(status, availability):
yield node
[docs] def node_tasks(self, node):
for task_dict in self.node_ps(node.id):
yield DockerTask.from_api(self, task_dict, node=node)
#
# higher level queries
#
[docs] def services_waiting_by_constraints(self):
return self._objects_by_attribute(self.services_waiting(), 'constraints')
[docs] def services_completed(self):
return self.services_in_state('Shutdown', 'Complete', tasks='all')
[docs] def nodes_active_by_constraints(self):
return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')
#
# operations
#
[docs] def services_clean(self):
cleaned_service_ids = []
completed_services = list(self.services_completed()) # returns a generator, should probably fix this
if completed_services:
cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services]))
terminal_services = list(self.services_terminal())
for service in terminal_services:
log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state)
if terminal_services:
cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services]))
return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services)
[docs]class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface):
container_type = 'docker_swarm_cli'
option_map = {
# `service create` options
'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'},
'replicas': {'flag': '--replicas', 'type': 'string'},
'restart_condition': {'flag': '--restart-condition', 'type': 'string'},
'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
'name': {'flag': '--name', 'type': 'string'},
'publish_port_random': {'flag': '--publish', 'type': 'string'},
'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'},
'mem_limit': {'flag': '--limit-memory', 'type': 'string'},
'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'},
'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'},
# `service update` options
'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'},
'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'},
'availability': {'flag': '--availability', 'type': 'string'},
}
#
# docker object generators
#
[docs] def services(self, id=None, name=None):
for service_dict in self.service_ls(id=id, name=name):
service_id = service_dict['ID']
service_name = service_dict['NAME']
if not service_name.startswith(self._name_prefix):
continue
task_list = self.service_ps(service_id)
yield DockerService.from_cli(self, service_dict, task_list)
[docs] def service_tasks(self, service):
for task_dict in self.service_ps(service.id):
if task_dict['NAME'].strip().startswith(r'\_'):
continue # historical task
yield DockerTask.from_cli(self, task_dict, service=service)
[docs] def nodes(self, id=None, name=None):
for node_dict in self.node_ls(id=id, name=name):
node_id = node_dict['ID'].strip(' *')
node_name = node_dict['HOSTNAME']
if self._node_prefix and not node_name.startswith(self._node_prefix):
continue
task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id))
yield DockerNode.from_cli(self, node_dict, task_list)
#
# docker subcommands
#
[docs] def service_create(self, command, image=None, **kwopts):
if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts:
kwopts['constraint'] = []
image = self._get_image(image)
if self._conf.service_create_image_constraint:
kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image))
if self._conf.service_create_cpus_constraint:
cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus))
if self._conf.cpus:
kwopts['cpu_limit'] = self._conf.cpus
kwopts['cpu_reservation'] = self._conf.cpus
if self._conf.memory:
kwopts['mem_limit'] = self._conf.memory
kwopts['mem_reservation'] = self._conf.memory
self.set_kwopts_name(kwopts)
args = '{kwopts} {image} {command}'.format(
kwopts=self._stringify_kwopts(kwopts),
image=image if image else '',
command=command if command else '',
).strip()
service_id = self._run_docker(subcommand='service create', args=args, verbose=True)
return DockerService.from_id(self, service_id)
[docs] @docker_json
def service_inspect(self, service_id):
return self._run_docker(subcommand='service inspect', args=service_id)[0]
[docs] @docker_columns
def service_ls(self, id=None, name=None):
return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns
def service_ps(self, service_id):
return self._run_docker(subcommand='service ps', args=f'--no-trunc {service_id}')
[docs] def service_rm(self, service_ids):
service_ids = ' '.join(service_ids)
return self._run_docker(subcommand='service rm', args=service_ids).splitlines()
[docs] @docker_json
def node_inspect(self, node_id):
return self._run_docker(subcommand='node inspect', args=node_id)[0]
[docs] @docker_columns
def node_ls(self, id=None, name=None):
return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))
[docs] @docker_columns
def node_ps(self, node_id):
return self._run_docker(subcommand='node ps', args=f'--no-trunc {node_id}')
[docs] def node_update(self, node_id, **kwopts):
return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format(
kwopts=self._stringify_kwopts(kwopts),
node_id=node_id
))
[docs] @docker_json
def task_inspect(self, task_id):
return self._run_docker(subcommand="inspect", args=task_id)
[docs]class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface):
container_type = 'docker_swarm'
placement_option_map = {
'constraint': {'param': 'constraints'},
}
service_mode_option_map = {
'service_mode': {'param': 0, 'default': 'replicated'},
'replicas': {'default': 1},
}
endpoint_spec_option_map: Dict[str, Dict] = {
'ports': {},
}
resources_option_map = {
'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)},
'memory': {'params': ('mem_limit', 'mem_reservation')},
}
container_spec_option_map = {
'image': {'param': 0},
'command': {},
'environment': {'param': 'env'},
'labels': {},
}
restart_policy_option_map = {
'restart_condition': {'param': 'condition', 'default': 'none'},
'restart_delay': {'param': 'delay'},
'restart_max_attempts': {'param': 'max_attemps'},
}
task_template_option_map = {
'_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True},
'_resources': {'spec_class': docker.types.Resources},
'_restart_policy': {'spec_class': docker.types.RestartPolicy},
'_placement': {'spec_class': docker.types.Placement},
}
node_spec_option_map = {
'availability': {'param': 'Availability'},
'name': {'param': 'Name'},
'role': {'param': 'Role'},
'labels': {'param': 'Labels'},
}
[docs] @staticmethod
def create_random_port_spec(port):
return {
'Protocol': 'tcp',
'PublishedPort': None,
'TargetPort': port,
}
#
# docker subcommands
#
[docs] def service_create(self, command, image=None, **kwopts):
# TODO: some of this should probably move to run_in_container when the CLI interface is removed
log.debug("Creating docker service with image '%s' for command: %s", image, command)
# insert run kwopts from config
for opt in self.conf_run_kwopts:
if self._conf[opt]:
kwopts[opt] = self._conf[opt]
# image is part of the container spec
kwopts['image'] = self._get_image(image)
# service constraints
kwopts['constraint'] = kwopts.get('constraint', [])
if self._conf.service_create_image_constraint:
kwopts['constraint'].append(f"{IMAGE_CONSTRAINT}=={image}")
if self._conf.service_create_cpus_constraint:
cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
kwopts['constraint'].append(f"{CPUS_CONSTRAINT}=={cpus}")
# ports
if 'publish_port_random' in kwopts:
kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))]
# create specs
service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts)
endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts)
task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts)
self.set_kwopts_name(kwopts)
log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template))
log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec))
log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode))
log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts))
success_test = partial(self._first, self.service_ls, name=kwopts['name'])
# this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
service = self._client.create_service(
task_template,
mode=service_mode,
endpoint_spec=endpoint_spec,
success_test=success_test,
max_tries=5,
**kwopts)
service_id = service.get('ID')
log.debug('Created service: %s (%s)', kwopts['name'], service_id)
return DockerService.from_id(self, service_id)
[docs] def service_ls(self, id=None, name=None):
return self._client.services(filters=self._filter_by_id_or_name(id, name))
# roughly `docker service ps`
[docs] def service_rm(self, service_ids):
r = []
for service_id in service_ids:
self._client.remove_service(service_id)
r.append(service_id)
return r
[docs] def node_ls(self, id=None, name=None):
return self._client.nodes(filters=self._filter_by_id_or_name(id, name))
# roughly `docker node ps`
[docs] def node_update(self, node_id, **kwopts):
node = DockerNode.from_id(self, node_id)
spec = node.inspect['Spec']
if 'label_add' in kwopts:
kwopts['labels'] = spec.get('Labels', {})
kwopts['labels'].update(kwopts.pop('label_add'))
spec.update(self._create_docker_api_spec('node_spec', dict, kwopts))
return self._client.update_node(node.id, node.version, node_spec=spec)