Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.containers.docker_model
"""
Model objects for docker objects
"""
import logging
import shlex
try:
import docker
except ImportError:
from galaxy.util.bunch import Bunch
docker = Bunch(errors=Bunch(NotFound=None))
from galaxy.containers import (
Container,
ContainerPort,
ContainerVolume
)
from galaxy.util import (
pretty_print_time_interval,
unicodify,
)
CPUS_LABEL = '_galaxy_cpus'
IMAGE_LABEL = '_galaxy_image'
CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
log = logging.getLogger(__name__)
[docs]class DockerAttributeContainer:
[docs] def __init__(self, members=None):
if members is None:
members = set()
self._members = members
def __eq__(self, other):
return self.members == other.members
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted([repr(x) for x in self._members])))
def __str__(self):
return ', '.join(str(x) for x in self._members) or 'None'
def __iter__(self):
return iter(self._members)
def __getitem__(self, name):
for member in self._members:
if member.name == name:
return member
else:
raise KeyError(name)
def __contains__(self, item):
return item in self._members
@property
def members(self):
return frozenset(self._members)
[docs]class DockerVolume(ContainerVolume):
[docs] @classmethod
def from_str(cls, as_str):
"""Construct an instance from a string as would be passed to `docker run --volume`.
A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
Docker volume syntax.
"""
if not as_str:
raise ValueError("Failed to parse Docker volume from %s" % as_str)
parts = as_str.split(":", 2)
kwds = dict(host_path=parts[0])
if len(parts) == 1:
# auto-generated volume
kwds["path"] = kwds["host_path"]
elif len(parts) == 2:
# /host_path:mode is not (or is no longer?) valid Docker volume syntax
if parts[1] in DockerVolume.valid_modes:
kwds["mode"] = parts[1]
kwds["path"] = kwds["host_path"]
else:
kwds["path"] = parts[1]
elif len(parts) == 3:
kwds["path"] = parts[1]
kwds["mode"] = parts[2]
return cls(**kwds)
def __str__(self):
volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
if "$" not in volume_str:
volume_for_cmd_line = shlex.quote(volume_str)
else:
# e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
volume_for_cmd_line = '"%s"' % volume_str
return volume_for_cmd_line
[docs] def to_native(self):
host_path = self.host_path or self.path
return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
[docs]class DockerContainer(Container):
[docs] def __init__(self, interface, id, name=None, inspect=None):
super().__init__(interface, id, name=name)
self._inspect = inspect
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.inspect(id)
return cls(interface, id, name=inspect['Name'], inspect=inspect)
@property
def ports(self):
# {
# "NetworkSettings" : {
# "Ports" : {
# "3306/tcp" : [
# {
# "HostIp" : "127.0.0.1",
# "HostPort" : "3306"
# }
# ]
rval = []
try:
port_mappings = self.inspect['NetworkSettings']['Ports']
except KeyError:
log.warning("Failed to get ports for container %s from `docker inspect` output at "
"['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
return None
for port_name in port_mappings:
for binding in port_mappings[port_name]:
rval.append(ContainerPort(
int(port_name.split('/')[0]),
port_name.split('/')[1],
self.address,
int(binding['HostPort']),
))
return rval
@property
def address(self):
if self._interface.host and self._interface.host.startswith('tcp://'):
return self._interface.host.replace('tcp://', '').split(':', 1)[0]
else:
return 'localhost'
def __eq__(self, other):
return self._id == other.id
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self._id)
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.inspect(self._id)
return self._inspect
[docs]class DockerService(Container):
[docs] def __init__(self, interface, id, name=None, image=None, inspect=None):
super().__init__(interface, id, name=name)
self._image = image
self._inspect = inspect
self._env = {}
self._tasks = []
if inspect:
self._name = name or inspect['Spec']['Name']
self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
[docs] @classmethod
def from_cli(cls, interface, s, task_list):
service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
for task_dict in task_list:
if task_dict['NAME'].strip().startswith(r'\_'):
continue # historical task
service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
return service
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.service_inspect(id)
service = cls(interface, id, inspect=inspect)
for task in interface.service_tasks(service):
service.task_add(task)
return service
@property
def ports(self):
# {
# "Endpoint": {
# "Ports": [
# {
# "Protocol": "tcp",
# "TargetPort": 8888,
# "PublishedPort": 30000,
# "PublishMode": "ingress"
# }
# ]
rval = []
try:
port_mappings = self.inspect['Endpoint']['Ports']
except (IndexError, KeyError):
log.warning("Failed to get ports for container %s from `docker service inspect` output at "
"['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
return None
for binding in port_mappings:
rval.append(ContainerPort(
binding['TargetPort'],
binding['Protocol'],
self.address, # use the routing mesh
binding['PublishedPort']
))
return rval
@property
def address(self):
if self._interface.host and self._interface.host.startswith('tcp://'):
return self._interface.host.replace('tcp://', '').split(':', 1)[0]
else:
return 'localhost'
def __eq__(self, other):
return self._id == other.id
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self._id)
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.service_inspect(self._id)
return self._inspect
@property
def state(self):
"""If one of this service's tasks desired state is running, return that task state, otherwise, return the state
of a non-running task.
This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
our purposes for now.
"""
state = None
for task in self.tasks:
state = task.state
if task.desired_state == 'running':
break
return state
@property
def env(self):
if not self._env:
try:
for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
try:
self._env.update([env_str.split('=', 1)])
except ValueError:
self._env[env_str] = None
except KeyError as exc:
log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
return self._env
@property
def terminal(self):
"""Same caveats as :meth:`state`.
"""
for task in self.tasks:
if task.desired_state == 'running':
return False
return True
@property
def node(self):
"""Same caveats as :meth:`state`.
"""
for task in self.tasks:
if task.node is not None:
return task.node
return None
@property
def image(self):
if self._image is None:
self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
return self._image
@property
def cpus(self):
try:
cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
if cpus == int(cpus):
cpus = int(cpus)
return cpus
except KeyError:
return 0
@property
def constraints(self):
constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
return DockerServiceConstraints.from_constraint_string_list(constraints)
@property
def tasks(self):
"""A list of *all* tasks, including terminal ones.
"""
if not self._tasks:
self._tasks = []
for task in self._interface.service_tasks(self):
self.task_add(task)
return self._tasks
@property
def task_count(self):
"""A count of *all* tasks, including terminal ones.
"""
return len(self.tasks)
[docs] def in_state(self, desired, current, tasks='any'):
"""Indicate if one of this service's tasks matches the desired state.
"""
for task in self.tasks:
if task.in_state(desired, current):
if tasks == 'any':
# at least 1 task in desired state
return True
elif tasks == 'all':
# at least 1 task not in desired state
return False
else:
return False if tasks == 'any' else True
[docs] def constraint_add(self, name, op, value):
self._interface.service_constraint_add(self.id, name, op, value)
[docs]class DockerServiceConstraint:
[docs] def __init__(self, name=None, op=None, value=None):
self._name = name
self._op = op
self._value = value
def __eq__(self, other):
return self._name == other._name and \
self._op == other._op and \
self._value == other._value
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self._name, self._op, self._value))
def __repr__(self):
return f'{self.__class__.__name__}({self._name}{self._op}{self._value})'
def __str__(self):
return f'{self._name}{self._op}{self._value}'
[docs] @staticmethod
def split_constraint_string(constraint_str):
constraint = (constraint_str, '', '')
for op in '==', '!=':
t = constraint_str.partition(op)
if len(t[0]) < len(constraint[0]):
constraint = t
if constraint[0] == constraint_str:
raise Exception('Unable to parse constraint string: %s' % constraint_str)
return [x.strip() for x in constraint]
[docs] @classmethod
def from_str(cls, constraint_str):
name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
return cls(name=name, op=op, value=value)
@property
def name(self):
return self._name
@property
def op(self):
return self._op
@property
def value(self):
return self._value
@property
def label(self):
return DockerNodeLabel(
name=self.name.replace('node.labels.', ''),
value=self.value
)
[docs]class DockerServiceConstraints(DockerAttributeContainer):
member_class = DockerServiceConstraint
[docs] @classmethod
def from_constraint_string_list(cls, inspect):
members = []
for member_str in inspect:
members.append(cls.member_class.from_str(member_str))
return cls(members=members)
@property
def labels(self):
return DockerNodeLabels(members=[x.label for x in self.members])
[docs]class DockerNode:
[docs] def __init__(self, interface, id=None, name=None, status=None,
availability=None, manager=False, inspect=None):
self._interface = interface
self._id = id
self._name = name
self._status = status
self._availability = availability
self._manager = manager
self._inspect = inspect
if inspect:
self._name = name or inspect['Description']['Hostname']
self._status = status or inspect['Status']['State']
self._availability = inspect['Spec']['Availability']
self._manager = manager or inspect['Spec']['Role'] == 'manager'
self._tasks = []
[docs] @classmethod
def from_cli(cls, interface, n, task_list):
node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
for task_dict in task_list:
node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
return node
[docs] @classmethod
def from_id(cls, interface, id):
inspect = interface.node_inspect(id)
node = cls(interface, id, inspect=inspect)
for task in interface.node_tasks(node):
node.task_add(task)
return node
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def version(self):
# this changes on update so don't cache
return self._interface.node_inspect(self._id or self._name)['Version']['Index']
@property
def inspect(self):
if not self._inspect:
self._inspect = self._interface.node_inspect(self._id or self._name)
return self._inspect
@property
def state(self):
return (f'{self._status}-{self._availability}').lower()
@property
def cpus(self):
return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
@property
def labels(self):
labels = self.inspect['Spec'].get('Labels', {}) or {}
return DockerNodeLabels.from_label_dictionary(labels)
[docs] def label_add(self, label, value):
self._interface.node_update(self.id, label_add={label: value})
@property
def labels_as_constraints(self):
constraints_strings = [x.constraint_string for x in self.labels]
return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
[docs] def set_labels_for_constraints(self, constraints):
for label in self._constraints_to_label_args(constraints):
if label not in self.labels:
log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
self.label_add(label.name, label.value)
def _constraints_to_label_args(self, constraints):
constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
return labels
@property
def tasks(self):
"""A list of *all* tasks, including terminal ones.
"""
if not self._tasks:
self._tasks = []
for task in self._interface.node_tasks(self):
self.task_add(task)
return self._tasks
@property
def non_terminal_tasks(self):
r = []
for task in self.tasks:
# ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
# container is running, but the task still shows up in the node's task list)
if not task.terminal and task.service is not None:
r.append(task)
return r
@property
def task_count(self):
"""A count of *all* tasks, including terminal ones.
"""
return len(self.tasks)
[docs] def in_state(self, status, availability):
return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
[docs]class DockerNodeLabel:
def __eq__(self, other):
return self._name == other._name and \
self._value == other._value
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self._name, self._value))
def __repr__(self):
return f'{self.__class__.__name__}({self._name}: {self._value})'
def __str__(self):
return f'{self._name}: {self._value}'
@property
def name(self):
return self._name
@property
def value(self):
return self._value
@property
def constraint_string(self):
return f'node.labels.{self.name}=={self.value}'
@property
def constraint(self):
return DockerServiceConstraint(
name=f'node.labels.{self.name}',
op='==',
value=self.value
)
[docs]class DockerNodeLabels(DockerAttributeContainer):
member_class = DockerNodeLabel
[docs] @classmethod
def from_label_dictionary(cls, inspect):
members = []
for k, v in inspect.items():
members.append(cls.member_class(name=k, value=v))
return cls(members=members)
@property
def constraints(self):
return DockerServiceConstraints(members=[x.constraint for x in self.members])
[docs]class DockerTask:
# these are the possible *current* state terminal states
terminal_states = (
'shutdown', # this is normally only a desired state but I've seen a task with it as current as well
'complete',
'failed',
'rejected',
'orphaned',
)
[docs] def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
state=None, error=None, ports=None, service=None, node=None):
self._interface = interface
self._id = id
self._name = name
self._image = image
self._desired_state = desired_state
self._state = state
self._error = error
self._ports = ports
self._service = service
self._node = node
self._inspect = None
[docs] @classmethod
def from_cli(cls, interface, t, service=None, node=None):
state = t['CURRENT STATE'].split()[0]
return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
ports=t['PORTS'], service=service, node=node)
[docs] @classmethod
def from_api(cls, interface, t, service=None, node=None):
service = service or interface.service(id=t.get('ServiceID'))
node = node or interface.node(id=t.get('NodeID'))
if service:
name = service.name + '.' + str(t['Slot'])
else:
name = t['ID']
image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0], # remove pin
return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
service=service, node=node)
@property
def id(self):
return self._id
@property
def name(self):
return self._name
@property
def inspect(self):
if not self._inspect:
try:
self._inspect = self._interface.task_inspect(self._id)
except docker.errors.NotFound:
# This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
# does not actually exist anymore, nor does its service exist).
log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
self.name, self.id)
return None
return self._inspect
@property
def slot(self):
try:
return self.inspect['Slot']
except TypeError:
return None
@property
def node(self):
if not self._node:
try:
self._node = self._interface.node(id=self.inspect['NodeID'])
except TypeError:
return None
return self._node
@property
def service(self):
if not self._service:
try:
self._service = self._interface.service(id=self.inspect['ServiceID'])
except TypeError:
return None
return self._service
@property
def cpus(self):
try:
cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
if cpus == int(cpus):
cpus = int(cpus)
return cpus
except TypeError:
return None
except KeyError:
return 0
@property
def state(self):
return (f'{self._desired_state}-{self._state}').lower()
@property
def current_state(self):
try:
return self._state.lower()
except TypeError:
log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
return None
@property
def current_state_time(self):
# Docker API returns a stamp w/ higher second precision than Python takes
try:
stamp = self.inspect['Status']['Timestamp']
except TypeError:
return None
return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
@property
def desired_state(self):
try:
return self._desired_state.lower()
except TypeError:
log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
return None
@property
def terminal(self):
return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
[docs] def in_state(self, desired, current):
return self.desired_state == desired.lower() and self.current_state == current.lower()