Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.containers.docker_model
"""
Model objects for docker objects
"""
from __future__ import absolute_import
import logging
try:
import docker
except ImportError:
from galaxy.util.bunch import Bunch
docker = Bunch(errors=Bunch(NotFound=None))
from six.moves import shlex_quote
from galaxy.containers import (
Container,
ContainerPort,
ContainerVolume
)
from galaxy.util import (
pretty_print_time_interval,
unicodify,
)
CPUS_LABEL = '_galaxy_cpus'
IMAGE_LABEL = '_galaxy_image'
CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
log = logging.getLogger(__name__)
[docs]class DockerAttributeContainer(object):
[docs] def __init__(self, members=None):
if members is None:
members = set()
self._members = members
def __eq__(self, other):
return self.members == other.members
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted([repr(x) for x in self._members])))
def __str__(self):
return ', '.join(str(x) for x in self._members) or 'None'
def __iter__(self):
return iter(self._members)
def __getitem__(self, name):
for member in self._members:
if member.name == name:
return member
else:
raise KeyError(name)
def __contains__(self, item):
return item in self._members
@property
def members(self):
return frozenset(self._members)
[docs]class DockerVolume(ContainerVolume):
[docs] @classmethod
def from_str(cls, as_str):
"""Construct an instance from a string as would be passed to `docker run --volume`.
A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
Docker volume syntax.
"""
if not as_str:
raise ValueError("Failed to parse Docker volume from %s" % as_str)
parts = as_str.split(":", 2)
kwds = dict(host_path=parts[0])
if len(parts) == 1:
# auto-generated volume
kwds["path"] = kwds["host_path"]
elif len(parts) == 2:
# /host_path:mode is not (or is no longer?) valid Docker volume syntax
if parts[1] in DockerVolume.valid_modes:
kwds["mode"] = parts[1]
kwds["path"] = kwds["host_path"]
else:
kwds["path"] = parts[1]
elif len(parts) == 3:
kwds["path"] = parts[1]
kwds["mode"] = parts[2]
return cls(**kwds)
def __str__(self):
volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
if "$" not in volume_str:
volume_for_cmd_line = shlex_quote(volume_str)
else:
# e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
volume_for_cmd_line = '"%s"' % volume_str
return volume_for_cmd_line
[docs] def to_native(self):
host_path = self.host_path or self.path
return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
[docs]class DockerContainer(Container):
[docs] def __init__(self, interface, id, name=None, inspect=None):
super(DockerContainer, self).__init__(interface, id, name=name)
self._inspect = inspect
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.inspect(id)
return cls(interface, id, name=inspect['Name'], inspect=inspect)
@property
def ports(self):
# {
# "NetworkSettings" : {
# "Ports" : {
# "3306/tcp" : [
# {
# "HostIp" : "127.0.0.1",
# "HostPort" : "3306"
# }
# ]
rval = []
try:
port_mappings = self.inspect['NetworkSettings']['Ports']
except KeyError:
log.warning("Failed to get ports for container %s from `docker inspect` output at "
"['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
return None
for port_name in port_mappings:
for binding in port_mappings[port_name]:
rval.append(ContainerPort(
int(port_name.split('/')[0]),
port_name.split('/')[1],
self.address,
int(binding['HostPort']),
))
return rval
@property
def address(self):
if self._interface.host and self._interface.host.startswith('tcp://'):
return self._interface.host.replace('tcp://', '').split(':', 1)[0]
else:
return 'localhost'
def __eq__(self, other):
return self._id == other.id
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self._id)
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.inspect(self._id)
return self._inspect
[docs]class DockerService(Container):
[docs] def __init__(self, interface, id, name=None, image=None, inspect=None):
super(DockerService, self).__init__(interface, id, name=name)
self._image = image
self._inspect = inspect
self._env = {}
self._tasks = []
if inspect:
self._name = name or inspect['Spec']['Name']
self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
[docs] @classmethod
def from_cli(cls, interface, s, task_list):
service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
for task_dict in task_list:
if task_dict['NAME'].strip().startswith(r'\_'):
continue # historical task
service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
return service
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.service_inspect(id)
service = cls(interface, id, inspect=inspect)
for task in interface.service_tasks(service):
service.task_add(task)
return service
@property
def ports(self):
# {
# "Endpoint": {
# "Ports": [
# {
# "Protocol": "tcp",
# "TargetPort": 8888,
# "PublishedPort": 30000,
# "PublishMode": "ingress"
# }
# ]
rval = []
try:
port_mappings = self.inspect['Endpoint']['Ports']
except (IndexError, KeyError):
log.warning("Failed to get ports for container %s from `docker service inspect` output at "
"['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
return None
for binding in port_mappings:
rval.append(ContainerPort(
binding['TargetPort'],
binding['Protocol'],
self.address, # use the routing mesh
binding['PublishedPort']
))
return rval
@property
def address(self):
if self._interface.host and self._interface.host.startswith('tcp://'):
return self._interface.host.replace('tcp://', '').split(':', 1)[0]
else:
return 'localhost'
def __eq__(self, other):
return self._id == other.id
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self._id)
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.service_inspect(self._id)
return self._inspect
@property
def state(self):
"""If one of this service's tasks desired state is running, return that task state, otherwise, return the state
of a non-running task.
This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
our purposes for now.
"""
state = None
for task in self.tasks:
state = task.state
if task.desired_state == 'running':
break
return state
@property
def env(self):
if not self._env:
try:
for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
try:
self._env.update([env_str.split('=', 1)])
except ValueError:
self._env[env_str] = None
except KeyError as exc:
log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
return self._env
@property
def terminal(self):
"""Same caveats as :meth:`state`.
"""
for task in self.tasks:
if task.desired_state == 'running':
return False
return True
@property
def node(self):
"""Same caveats as :meth:`state`.
"""
for task in self.tasks:
if task.node is not None:
return task.node
return None
@property
def image(self):
if self._image is None:
self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
return self._image
@property
def cpus(self):
try:
cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
if cpus == int(cpus):
cpus = int(cpus)
return cpus
except KeyError:
return 0
@property
def constraints(self):
constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
return DockerServiceConstraints.from_constraint_string_list(constraints)
@property
def tasks(self):
"""A list of *all* tasks, including terminal ones.
"""
if not self._tasks:
self._tasks = []
for task in self._interface.service_tasks(self):
self.task_add(task)
return self._tasks
@property
def task_count(self):
"""A count of *all* tasks, including terminal ones.
"""
return len(self.tasks)
[docs] def in_state(self, desired, current, tasks='any'):
"""Indicate if one of this service's tasks matches the desired state.
"""
for task in self.tasks:
if task.in_state(desired, current):
if tasks == 'any':
# at least 1 task in desired state
return True
elif tasks == 'all':
# at least 1 task not in desired state
return False
else:
return False if tasks == 'any' else True
[docs] def constraint_add(self, name, op, value):
self._interface.service_constraint_add(self.id, name, op, value)
[docs]class DockerServiceConstraint(object):
[docs] def __init__(self, name=None, op=None, value=None):
self._name = name
self._op = op
self._value = value
def __eq__(self, other):
return self._name == other._name and \
self._op == other._op and \
self._value == other._value
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self._name, self._op, self._value))
def __repr__(self):
return '%s(%s%s%s)' % (self.__class__.__name__, self._name, self._op, self._value)
def __str__(self):
return '%s%s%s' % (self._name, self._op, self._value)
[docs] @staticmethod
def split_constraint_string(constraint_str):
constraint = (constraint_str, '', '')
for op in '==', '!=':
t = constraint_str.partition(op)
if len(t[0]) < len(constraint[0]):
constraint = t
if constraint[0] == constraint_str:
raise Exception('Unable to parse constraint string: %s' % constraint_str)
return [x.strip() for x in constraint]
[docs] @classmethod
def from_str(cls, constraint_str):
name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
return cls(name=name, op=op, value=value)
@property
def name(self):
return self._name
@property
def op(self):
return self._op
@property
def value(self):
return self._value
@property
def label(self):
return DockerNodeLabel(
name=self.name.replace('node.labels.', ''),
value=self.value
)
[docs]class DockerServiceConstraints(DockerAttributeContainer):
member_class = DockerServiceConstraint
[docs] @classmethod
def from_constraint_string_list(cls, inspect):
members = []
for member_str in inspect:
members.append(cls.member_class.from_str(member_str))
return cls(members=members)
@property
def labels(self):
return DockerNodeLabels(members=[x.label for x in self.members])
[docs]class DockerNode(object):
[docs] def __init__(self, interface, id=None, name=None, status=None,
availability=None, manager=False, inspect=None):
self._interface = interface
self._id = id
self._name = name
self._status = status
self._availability = availability
self._manager = manager
self._inspect = inspect
if inspect:
self._name = name or inspect['Description']['Hostname']
self._status = status or inspect['Status']['State']
self._availability = inspect['Spec']['Availability']
self._manager = manager or inspect['Spec']['Role'] == 'manager'
self._tasks = []
[docs] @classmethod
def from_cli(cls, interface, n, task_list):
node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
for task_dict in task_list:
node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
return node
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.node_inspect(id)
node = cls(interface, id, inspect=inspect)
for task in interface.node_tasks(node):
node.task_add(task)
return node
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def version(self):
# this changes on update so don't cache
return self._interface.node_inspect(self._id or self._name)['Version']['Index']
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.node_inspect(self._id or self._name)
return self._inspect
@property
def state(self):
return ('%s-%s' % (self._status, self._availability)).lower()
@property
def cpus(self):
return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
@property
def labels(self):
labels = self.inspect['Spec'].get('Labels', {}) or {}
return DockerNodeLabels.from_label_dictionary(labels)
[docs] def label_add(self, label, value):
self._interface.node_update(self.id, label_add={label: value})
@property
def labels_as_constraints(self):
constraints_strings = [x.constraint_string for x in self.labels]
return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
[docs] def set_labels_for_constraints(self, constraints):
for label in self._constraints_to_label_args(constraints):
if label not in self.labels:
log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
self.label_add(label.name, label.value)
def _constraints_to_label_args(self, constraints):
constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
return labels
@property
def tasks(self):
"""A list of *all* tasks, including terminal ones.
"""
if not self._tasks:
self._tasks = []
for task in self._interface.node_tasks(self):
self.task_add(task)
return self._tasks
@property
def non_terminal_tasks(self):
r = []
for task in self.tasks:
# ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
# container is running, but the task still shows up in the node's task list)
if not task.terminal and task.service is not None:
r.append(task)
return r
@property
def task_count(self):
"""A count of *all* tasks, including terminal ones.
"""
return len(self.tasks)
[docs] def in_state(self, status, availability):
return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
[docs]class DockerNodeLabel(object):
def __eq__(self, other):
return self._name == other._name and \
self._value == other._value
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self._name, self._value))
def __repr__(self):
return '%s(%s: %s)' % (self.__class__.__name__, self._name, self._value)
def __str__(self):
return '%s: %s' % (self._name, self._value)
@property
def name(self):
return self._name
@property
def value(self):
return self._value
@property
def constraint_string(self):
return 'node.labels.{name}=={value}'.format(name=self.name, value=self.value)
@property
def constraint(self):
return DockerServiceConstraint(
name='node.labels.{name}'.format(name=self.name),
op='==',
value=self.value
)
[docs]class DockerNodeLabels(DockerAttributeContainer):
member_class = DockerNodeLabel
[docs] @classmethod
def from_label_dictionary(cls, inspect):
members = []
for k, v in inspect.items():
members.append(cls.member_class(name=k, value=v))
return cls(members=members)
@property
def constraints(self):
return DockerServiceConstraints(members=[x.constraint for x in self.members])
[docs]class DockerTask(object):
# these are the possible *current* state terminal states
terminal_states = (
'shutdown', # this is normally only a desired state but I've seen a task with it as current as well
'complete',
'failed',
'rejected',
'orphaned',
)
[docs] def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
state=None, error=None, ports=None, service=None, node=None):
self._interface = interface
self._id = id
self._name = name
self._image = image
self._desired_state = desired_state
self._state = state
self._error = error
self._ports = ports
self._service = service
self._node = node
self._inspect = None
[docs] @classmethod
def from_cli(cls, interface, t, service=None, node=None):
state = t['CURRENT STATE'].split()[0]
return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
ports=t['PORTS'], service=service, node=node)
[docs] @classmethod
def from_api(cls, interface, t, service=None, node=None):
service = service or interface.service(id=t.get('ServiceID'))
node = node or interface.node(id=t.get('NodeID'))
if service:
name = service.name + '.' + str(t['Slot'])
else:
name = t['ID']
image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0], # remove pin
return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
service=service, node=node)
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def inspect(self):
if not self._inspect:
try:
self._inspect = self._interface.task_inspect(self._id)
except docker.errors.NotFound:
# This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
# does not actually exist anymore, nor does its service exist).
log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
self.name, self.id)
return None
return self._inspect
@property
def slot(self):
try:
return self.inspect['Slot']
except TypeError:
return None
@property
def node(self):
if not self._node:
try:
self._node = self._interface.node(id=self.inspect['NodeID'])
except TypeError:
return None
return self._node
@property
def service(self):
if not self._service:
try:
self._service = self._interface.service(id=self.inspect['ServiceID'])
except TypeError:
return None
return self._service
@property
def cpus(self):
try:
cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
if cpus == int(cpus):
cpus = int(cpus)
return cpus
except TypeError:
return None
except KeyError:
return 0
@property
def state(self):
return ('%s-%s' % (self._desired_state, self._state)).lower()
@property
def current_state(self):
try:
return self._state.lower()
except TypeError:
log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
return None
@property
def current_state_time(self):
# Docker API returns a stamp w/ higher second precision than Python takes
try:
stamp = self.inspect['Status']['Timestamp']
except TypeError:
return None
return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
@property
def desired_state(self):
try:
return self._desired_state.lower()
except TypeError:
log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
return None
@property
def terminal(self):
return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
[docs] def in_state(self, desired, current):
return self.desired_state == desired.lower() and self.current_state == current.lower()