Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.metrics.instrumenters.cgroup
"""The module describes the ``cgroup`` job metrics plugin."""
import logging
import numbers
from collections import namedtuple
from galaxy.util import asbool, nice_size
from . import InstrumentPlugin
from .. import formatting
log = logging.getLogger(__name__)
TITLES = {
"memory.memsw.max_usage_in_bytes": "Max memory usage (MEM+SWP)",
"memory.max_usage_in_bytes": "Max memory usage (MEM)",
"memory.limit_in_bytes": "Memory limit on cgroup (MEM)",
"memory.memsw.limit_in_bytes": "Memory limit on cgroup (MEM+SWP)",
"memory.soft_limit_in_bytes": "Memory softlimit on cgroup",
"memory.failcnt": "Failed to allocate memory count",
"memory.oom_control.oom_kill_disable": "OOM Control enabled",
"memory.oom_control.under_oom": "Was OOM Killer active?",
"cpuacct.usage": "CPU Time"
}
CONVERSION = {
"memory.oom_control.oom_kill_disable": lambda x: "No" if x == 1 else "Yes",
"memory.oom_control.under_oom": lambda x: "Yes" if x == 1 else "No",
"cpuacct.usage": lambda x: formatting.seconds_to_str(x / 10**9) # convert nanoseconds
}
Metric = namedtuple("Metric", ("key", "subkey", "value"))
class CgroupPluginFormatter(formatting.JobMetricFormatter):
def format(self, key, value):
title = TITLES.get(key, key)
if key in CONVERSION:
return title, CONVERSION[key](value)
elif key.endswith("_bytes"):
try:
return title, nice_size(value)
except ValueError:
pass
elif isinstance(value, numbers.Number) and value == int(value):
value = int(value)
return title, value
[docs]class CgroupPlugin(InstrumentPlugin):
""" Plugin that collects memory and cpu utilization from within a cgroup.
"""
plugin_type = "cgroup"
formatter = CgroupPluginFormatter()
[docs] def __init__(self, **kwargs):
self.verbose = asbool(kwargs.get("verbose", False))
params_str = kwargs.get("params", None)
if params_str:
params = [v.strip() for v in params_str.split(",")]
else:
params = TITLES.keys()
self.params = params
[docs] def post_execute_instrument(self, job_directory):
commands = []
commands.append(self.__record_cgroup_cpu_usage(job_directory))
commands.append(self.__record_cgroup_memory_usage(job_directory))
return commands
[docs] def job_properties(self, job_id, job_directory):
metrics = self.__read_metrics(self.__cgroup_metrics_file(job_directory))
return metrics
def __record_cgroup_cpu_usage(self, job_directory):
return """if [ `command -v cgget` ] && [ -e /proc/$$/cgroup ]; then cat /proc/$$/cgroup | awk -F':' '$2=="cpuacct,cpu"{print $2":"$3}' | xargs -I{} cgget -g {} > %(metrics)s ; else echo "" > %(metrics)s; fi""" % {"metrics": self.__cgroup_metrics_file(job_directory)}
def __record_cgroup_memory_usage(self, job_directory):
return """if [ `command -v cgget` ] && [ -e /proc/$$/cgroup ]; then cat /proc/$$/cgroup | awk -F':' '$2=="memory"{print $2":"$3}' | xargs -I{} cgget -g {} >> %(metrics)s ; else echo "" > %(metrics)s; fi""" % {"metrics": self.__cgroup_metrics_file(job_directory)}
def __cgroup_metrics_file(self, job_directory):
return self._instrument_file_path(job_directory, "_metrics")
def __read_metrics(self, path):
metrics = {}
prev_metric = None
with open(path, "r") as infile:
for line in infile:
try:
metric, prev_metric = self.__read_key_value(line, prev_metric)
except Exception:
log.exception("Caught exception attempting to read metric from cgroup line: %s", line)
metric = None
if not metric:
continue
self.__add_metric(metrics, prev_metric)
prev_metric = metric
self.__add_metric(metrics, prev_metric)
return metrics
def __add_metric(self, metrics, metric):
if metric and (metric.subkey in self.params or self.verbose):
metrics[metric.subkey] = metric.value
def __read_key_value(self, line, prev_metric):
if not line.startswith('\t'):
# line is a single-line param or the first line of a multi-line param
try:
subkey, value = line.strip().split(": ", 1)
key = subkey
except ValueError:
# or not a param line at all, ignore
return None, prev_metric
else:
# line is a subsequent line of a multi-line param
subkey, value = line.strip().split(" ", 1)
key = prev_metric.key
subkey = ".".join((key, subkey))
prev_metric = self.__fix_prev_metric(prev_metric)
value = self.__type_value(value)
return (Metric(key, subkey, value), prev_metric)
def __fix_prev_metric(self, metric):
# we can't determine whether a param is single-line or multi-line until we read the second line, after which, we
# must go back and fix the first param to be subkeyed
if metric.key == metric.subkey:
try:
subkey, value = metric.value.split(" ", 1)
subkey = ".".join((metric.key, subkey))
metric = Metric(metric.key, subkey, self.__type_value(value))
except ValueError:
pass
return metric
def __type_value(self, value):
try:
try:
return int(value)
except ValueError:
return float(value)
except ValueError:
return value
__all__ = ('CgroupPlugin', )