Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.containers.docker_model

"""
Model objects for docker objects
"""
from __future__ import absolute_import

import logging

try:
    import docker
except ImportError:
    from galaxy.util.bunch import Bunch
    docker = Bunch(errors=Bunch(NotFound=None))
from six.moves import shlex_quote

from galaxy.containers import (
    Container,
    ContainerPort,
    ContainerVolume
)
from galaxy.util import (
    pretty_print_time_interval,
    unicodify,
)


CPUS_LABEL = '_galaxy_cpus'
IMAGE_LABEL = '_galaxy_image'
CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL

log = logging.getLogger(__name__)


[docs]class DockerAttributeContainer(object):
[docs] def __init__(self, members=None): if members is None: members = set() self._members = members
def __eq__(self, other): return self.members == other.members def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(tuple(sorted([repr(x) for x in self._members]))) def __str__(self): return ', '.join([str(x) for x in self._members]) or 'None' def __iter__(self): return iter(self._members) def __getitem__(self, name): for member in self._members: if member.name == name: return member else: raise KeyError(name) def __contains__(self, item): return item in self._members @property def members(self): return frozenset(self._members)
[docs] def hash(self): return hex(self.__hash__())[2:]
[docs] def get(self, name, default): try: return self[name] except KeyError: return default
[docs]class DockerVolume(ContainerVolume):
[docs] @classmethod def from_str(cls, as_str): """Construct an instance from a string as would be passed to `docker run --volume`. A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid Docker volume syntax. """ if not as_str: raise ValueError("Failed to parse Docker volume from %s" % as_str) parts = as_str.split(":", 2) kwds = dict(host_path=parts[0]) if len(parts) == 1: # auto-generated volume kwds["path"] = kwds["host_path"] elif len(parts) == 2: # /host_path:mode is not (or is no longer?) valid Docker volume syntax if parts[1] in DockerVolume.valid_modes: kwds["mode"] = parts[1] kwds["path"] = kwds["host_path"] else: kwds["path"] = parts[1] elif len(parts) == 3: kwds["path"] = parts[1] kwds["mode"] = parts[2] return cls(**kwds)
def __str__(self): volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode))) if "$" not in volume_str: volume_for_cmd_line = shlex_quote(volume_str) else: # e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote. volume_for_cmd_line = '"%s"' % volume_str return volume_for_cmd_line
[docs] def to_native(self): host_path = self.host_path or self.path return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
[docs]class DockerContainer(Container):
[docs] def __init__(self, interface, id, name=None, inspect=None): super(DockerContainer, self).__init__(interface, id, name=name) self._inspect = inspect
[docs] @classmethod def from_id(cls, interface, id): inspect = interface.inspect(id) return cls(interface, id, name=inspect['Name'], inspect=inspect)
@property def ports(self): # { # "NetworkSettings" : { # "Ports" : { # "3306/tcp" : [ # { # "HostIp" : "127.0.0.1", # "HostPort" : "3306" # } # ] rval = [] try: port_mappings = self.inspect['NetworkSettings']['Ports'] except KeyError: log.warning("Failed to get ports for container %s from `docker inspect` output at " "['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True) return None for port_name in port_mappings: for binding in port_mappings[port_name]: rval.append(ContainerPort( int(port_name.split('/')[0]), port_name.split('/')[1], self.address, int(binding['HostPort']), )) return rval @property def address(self): if self._interface.host and self._interface.host.startswith('tcp://'): return self._interface.host.replace('tcp://', '').split(':', 1)[0] else: return 'localhost'
[docs] def is_ready(self): return self.inspect['State']['Running']
def __eq__(self, other): return self._id == other.id def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self._id) @property def inspect(self): if not self._inspect: self._inspect = self._interface.inspect(self._id) return self._inspect
[docs]class DockerService(Container):
[docs] def __init__(self, interface, id, name=None, image=None, inspect=None): super(DockerService, self).__init__(interface, id, name=name) self._image = image self._inspect = inspect self._env = {} self._tasks = [] if inspect: self._name = name or inspect['Spec']['Name'] self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
[docs] @classmethod def from_cli(cls, interface, s, task_list): service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE']) for task_dict in task_list: if task_dict['NAME'].strip().startswith(r'\_'): continue # historical task service.task_add(DockerTask.from_cli(interface, task_dict, service=service)) return service
[docs] @classmethod def from_id(cls, interface, id): inspect = interface.service_inspect(id) service = cls(interface, id, inspect=inspect) for task in interface.service_tasks(service): service.task_add(task) return service
@property def ports(self): # { # "Endpoint": { # "Ports": [ # { # "Protocol": "tcp", # "TargetPort": 8888, # "PublishedPort": 30000, # "PublishMode": "ingress" # } # ] rval = [] try: port_mappings = self.inspect['Endpoint']['Ports'] except (IndexError, KeyError): log.warning("Failed to get ports for container %s from `docker service inspect` output at " "['Endpoint']['Ports']: %s: %s", self.id, exc_info=True) return None for binding in port_mappings: rval.append(ContainerPort( binding['TargetPort'], binding['Protocol'], self.address, # use the routing mesh binding['PublishedPort'] )) return rval @property def address(self): if self._interface.host and self._interface.host.startswith('tcp://'): return self._interface.host.replace('tcp://', '').split(':', 1)[0] else: return 'localhost'
[docs] def is_ready(self): return self.in_state('Running', 'Running')
def __eq__(self, other): return self._id == other.id def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self._id)
[docs] def task_add(self, task): self._tasks.append(task)
@property def inspect(self): if not self._inspect: self._inspect = self._interface.service_inspect(self._id) return self._inspect @property def state(self): """If one of this service's tasks desired state is running, return that task state, otherwise, return the state of a non-running task. This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits our purposes for now. """ state = None for task in self.tasks: state = task.state if task.desired_state == 'running': break return state @property def env(self): if not self._env: try: for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']: try: self._env.update([env_str.split('=', 1)]) except ValueError: self._env[env_str] = None except KeyError as exc: log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc)) return self._env @property def terminal(self): """Same caveats as :meth:`state`. """ for task in self.tasks: if task.desired_state == 'running': return False return True @property def node(self): """Same caveats as :meth:`state`. """ for task in self.tasks: if task.node is not None: return task.node return None @property def image(self): if self._image is None: self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image'] return self._image @property def cpus(self): try: cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0 if cpus == int(cpus): cpus = int(cpus) return cpus except KeyError: return 0 @property def constraints(self): constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', []) return DockerServiceConstraints.from_constraint_string_list(constraints) @property def tasks(self): """A list of *all* tasks, including terminal ones. """ if not self._tasks: self._tasks = [] for task in self._interface.service_tasks(self): self.task_add(task) return self._tasks @property def task_count(self): """A count of *all* tasks, including terminal ones. """ return len(self.tasks)
[docs] def in_state(self, desired, current, tasks='any'): """Indicate if one of this service's tasks matches the desired state. """ for task in self.tasks: if task.in_state(desired, current): if tasks == 'any': # at least 1 task in desired state return True elif tasks == 'all': # at least 1 task not in desired state return False else: return False if tasks == 'any' else True
[docs] def constraint_add(self, name, op, value): self._interface.service_constraint_add(self.id, name, op, value)
[docs] def set_cpus(self): self.constraint_add(CPUS_LABEL, '==', self.cpus)
[docs] def set_image(self): self.constraint_add(IMAGE_LABEL, '==', self.image)
[docs]class DockerServiceConstraint(object):
[docs] def __init__(self, name=None, op=None, value=None): self._name = name self._op = op self._value = value
def __eq__(self, other): return self._name == other._name and \ self._op == other._op and \ self._value == other._value def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((self._name, self._op, self._value)) def __repr__(self): return '%s(%s%s%s)' % (self.__class__.__name__, self._name, self._op, self._value) def __str__(self): return '%s%s%s' % (self._name, self._op, self._value)
[docs] @staticmethod def split_constraint_string(constraint_str): constraint = (constraint_str, '', '') for op in '==', '!=': t = constraint_str.partition(op) if len(t[0]) < len(constraint[0]): constraint = t if constraint[0] == constraint_str: raise Exception('Unable to parse constraint string: %s' % constraint_str) return [x.strip() for x in constraint]
[docs] @classmethod def from_str(cls, constraint_str): name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str) return cls(name=name, op=op, value=value)
@property def name(self): return self._name @property def op(self): return self._op @property def value(self): return self._value @property def label(self): return DockerNodeLabel( name=self.name.replace('node.labels.', ''), value=self.value )
[docs]class DockerServiceConstraints(DockerAttributeContainer): member_class = DockerServiceConstraint
[docs] @classmethod def from_constraint_string_list(cls, inspect): members = [] for member_str in inspect: members.append(cls.member_class.from_str(member_str)) return cls(members=members)
@property def labels(self): return DockerNodeLabels(members=[x.label for x in self.members])
[docs]class DockerNode(object):
[docs] def __init__(self, interface, id=None, name=None, status=None, availability=None, manager=False, inspect=None): self._interface = interface self._id = id self._name = name self._status = status self._availability = availability self._manager = manager self._inspect = inspect if inspect: self._name = name or inspect['Description']['Hostname'] self._status = status or inspect['Status']['State'] self._availability = inspect['Spec']['Availability'] self._manager = manager or inspect['Spec']['Role'] == 'manager' self._tasks = []
[docs] @classmethod def from_cli(cls, interface, n, task_list): node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'], availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False) for task_dict in task_list: node.task_add(DockerTask.from_cli(interface, task_dict, node=node)) return node
[docs] @classmethod def from_id(cls, interface, id): inspect = interface.node_inspect(id) node = cls(interface, id, inspect=inspect) for task in interface.node_tasks(node): node.task_add(task) return node
[docs] def task_add(self, task): self._tasks.append(task)
@property def id(self): return self._id @property def name(self): return self._name @property def version(self): # this changes on update so don't cache return self._interface.node_inspect(self._id or self._name)['Version']['Index'] @property def inspect(self): if not self._inspect: self._inspect = self._interface.node_inspect(self._id or self._name) return self._inspect @property def state(self): return ('%s-%s' % (self._status, self._availability)).lower() @property def cpus(self): return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000 @property def labels(self): labels = self.inspect['Spec'].get('Labels', {}) or {} return DockerNodeLabels.from_label_dictionary(labels)
[docs] def label_add(self, label, value): self._interface.node_update(self.id, label_add={label: value})
@property def labels_as_constraints(self): constraints_strings = [x.constraint_string for x in self.labels] return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
[docs] def set_labels_for_constraints(self, constraints): for label in self._constraints_to_label_args(constraints): if label not in self.labels: log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value) self.label_add(label.name, label.value)
def _constraints_to_label_args(self, constraints): constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints) labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints) return labels @property def tasks(self): """A list of *all* tasks, including terminal ones. """ if not self._tasks: self._tasks = [] for task in self._interface.node_tasks(self): self.task_add(task) return self._tasks @property def non_terminal_tasks(self): r = [] for task in self.tasks: # ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no # container is running, but the task still shows up in the node's task list) if not task.terminal and task.service is not None: r.append(task) return r @property def task_count(self): """A count of *all* tasks, including terminal ones. """ return len(self.tasks)
[docs] def in_state(self, status, availability): return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
[docs] def is_ok(self): return self.in_state('Ready', 'Active')
[docs] def is_managed(self): return not self._manager
[docs] def destroyable(self): return not self._manager and self.is_ok() and self.task_count == 0
[docs] def drain(self): self._interface.node_update(self.id, availability='drain')
[docs]class DockerNodeLabel(object):
[docs] def __init__(self, name=None, value=None): self._name = name self._value = value
def __eq__(self, other): return self._name == other._name and \ self._value == other._value def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((self._name, self._value)) def __repr__(self): return '%s(%s: %s)' % (self.__class__.__name__, self._name, self._value) def __str__(self): return '%s: %s' % (self._name, self._value) @property def name(self): return self._name @property def value(self): return self._value @property def constraint_string(self): return 'node.labels.{name}=={value}'.format(name=self.name, value=self.value) @property def constraint(self): return DockerServiceConstraint( name='node.labels.{name}'.format(name=self.name), op='==', value=self.value )
[docs]class DockerNodeLabels(DockerAttributeContainer): member_class = DockerNodeLabel
[docs] @classmethod def from_label_dictionary(cls, inspect): members = [] for k, v in inspect.items(): members.append(cls.member_class(name=k, value=v)) return cls(members=members)
@property def constraints(self): return DockerServiceConstraints(members=[x.constraint for x in self.members])
[docs]class DockerTask(object): # these are the possible *current* state terminal states terminal_states = ( 'shutdown', # this is normally only a desired state but I've seen a task with it as current as well 'complete', 'failed', 'rejected', 'orphaned', )
[docs] def __init__(self, interface, id=None, name=None, image=None, desired_state=None, state=None, error=None, ports=None, service=None, node=None): self._interface = interface self._id = id self._name = name self._image = image self._desired_state = desired_state self._state = state self._error = error self._ports = ports self._service = service self._node = node self._inspect = None
[docs] @classmethod def from_cli(cls, interface, t, service=None, node=None): state = t['CURRENT STATE'].split()[0] return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'], desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'], ports=t['PORTS'], service=service, node=node)
[docs] @classmethod def from_api(cls, interface, t, service=None, node=None): service = service or interface.service(id=t.get('ServiceID')) node = node or interface.node(id=t.get('NodeID')) if service: name = service.name + '.' + str(t['Slot']) else: name = t['ID'] image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0], # remove pin return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'], state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'], service=service, node=node)
@property def id(self): return self._id @property def name(self): return self._name @property def inspect(self): if not self._inspect: try: self._inspect = self._interface.task_inspect(self._id) except docker.errors.NotFound: # This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that # does not actually exist anymore, nor does its service exist). log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)', self.name, self.id) return None return self._inspect @property def slot(self): try: return self.inspect['Slot'] except TypeError: return None @property def node(self): if not self._node: try: self._node = self._interface.node(id=self.inspect['NodeID']) except TypeError: return None return self._node @property def service(self): if not self._service: try: self._service = self._interface.service(id=self.inspect['ServiceID']) except TypeError: return None return self._service @property def cpus(self): try: cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0 if cpus == int(cpus): cpus = int(cpus) return cpus except TypeError: return None except KeyError: return 0 @property def state(self): return ('%s-%s' % (self._desired_state, self._state)).lower() @property def current_state(self): try: return self._state.lower() except TypeError: log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state)) return None @property def current_state_time(self): # Docker API returns a stamp w/ higher second precision than Python takes try: stamp = self.inspect['Status']['Timestamp'] except TypeError: return None return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z') @property def desired_state(self): try: return self._desired_state.lower() except TypeError: log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state)) return None @property def terminal(self): return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
[docs] def in_state(self, desired, current): return self.desired_state == desired.lower() and self.current_state == current.lower()