Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.job_metrics.collectl.processes
""" Modules will run collectl in playback mode and collect various process
statistics for a given pid's process and process ancestors.
"""
import collections
import csv
import logging
import sys
import tempfile
from galaxy import util
from ..collectl import stats
if sys.version_info > (3,):
long = int
log = logging.getLogger(__name__)
# Collectl process information cheat sheet:
#
# Record process information for current user.
# % collectl -sZ -f./__instrument_collectl -i 10:10 --procfilt U$USER
#
# TSV Replay of processing information in plottable mode...
#
# % collectl -sZ -P --sep=9 -p __instrument_collectl-jlaptop13-20140322-120919.raw.gz
#
# Has following columns:
# Date Time PID User PR PPID THRD S VmSize VmLck VmRSS VmData VmStk VmExe VmLib CPU SysT UsrT PCT AccumT RKB WKB RKBC WKBC RSYS WSYS CNCL MajF MinF Command
#
# Process data dumped one row per process per interval.
# http://collectl.sourceforge.net/Data-detail.html
PROCESS_COLUMNS = [
"#Date", # Date of interval - e.g. 20140322
"Time", # Time of interval - 12:18:58
"PID", # Process pid.
"User", # Process user.
"PR", # Priority of process.
"PPID", # Parent PID of process.
"THRD", # Thread???
"S", # Process state - S - Sleeping, D - Uninterruptable Sleep, R - Running, Z - Zombie or T - Stopped/Traced
# Memory options - http://ewx.livejournal.com/579283.html
"VmSize",
"VmLck",
"VmRSS",
"VmData",
"VmStk",
"VmExe",
"VmLib",
"CPU", # CPU number of process
"SysT", # Amount of system time consumed during interval
"UsrT", # Amount user time consumed during interval
"PCT", # Percentage of current interval consumed by task
"AccumT", # Total accumulated System and User time since the process began execution
# kilobytes read/written - requires I/O level monitoring to be enabled in kernel.
"RKB", # kilobytes read by process - requires I/O monitoring in kernel
"WKB",
"RKBC",
"WKBC",
"RSYS", # Number of read system calls
"WSYS", # Number of write system calls
"CNCL",
"MajF", # Number of major page faults
"MinF", # Number of minor page faults
"Command", # Command executed
]
# Types of statistics this module can summarize
STATISTIC_TYPES = ["max", "min", "sum", "count", "avg"]
COLUMN_INDICES = dict([(col, i) for i, col in enumerate(PROCESS_COLUMNS)])
PID_INDEX = COLUMN_INDICES["PID"]
PARENT_PID_INDEX = COLUMN_INDICES["PPID"]
DEFAULT_STATISTICS = [
("max", "VmSize"),
("avg", "VmSize"),
("max", "VmRSS"),
("avg", "VmRSS"),
("sum", "SysT"),
("sum", "UsrT"),
("max", "PCT"),
("avg", "PCT"),
("max", "AccumT"),
("sum", "RSYS"),
("sum", "WSYS"),
]
def parse_process_statistics(statistics):
""" Turn string or list of strings into list of tuples in format ( stat,
resource ) where stat is a value from STATISTIC_TYPES and resource is a
value from PROCESS_COLUMNS.
"""
if statistics is None:
statistics = DEFAULT_STATISTICS
statistics = util.listify(statistics)
statistics = [_tuplize_statistic(_) for _ in statistics]
# Check for validity...
for statistic in statistics:
if statistic[0] not in STATISTIC_TYPES:
raise Exception("Unknown statistic type encountered %s" % statistic[0])
if statistic[1] not in PROCESS_COLUMNS:
raise Exception("Unknown process column encountered %s" % statistic[1])
return statistics
[docs]def generate_process_statistics(collectl_playback_cli, pid, statistics=DEFAULT_STATISTICS):
""" Playback collectl file and generate summary statistics.
"""
with tempfile.NamedTemporaryFile() as tmp_tsv:
collectl_playback_cli.run(stdout=tmp_tsv)
with open(tmp_tsv.name, "r") as tsv_file:
return _read_process_statistics(tsv_file, pid, statistics)
def _read_process_statistics(tsv_file, pid, statistics):
process_summarizer = CollectlProcessSummarizer(pid, statistics)
current_interval = None
for row in csv.reader(tsv_file, dialect="excel-tab"):
if current_interval is None:
for header, expected_header in zip(row, PROCESS_COLUMNS):
if header.lower() != expected_header.lower():
raise Exception("Unknown header value encountered while processing collectl playback - %s" % header)
# First row, check contains correct header.
current_interval = CollectlProcessInterval()
continue
if current_interval.row_is_in(row):
current_interval.add_row(row)
else:
process_summarizer.handle_interval(current_interval)
current_interval = CollectlProcessInterval()
# Do we have unsummarized rows...
if current_interval and current_interval.rows:
process_summarizer.handle_interval(current_interval)
return process_summarizer.get_statistics()
class CollectlProcessSummarizer(object):
def __init__(self, pid, statistics):
self.pid = pid
self.statistics = statistics
self.columns_of_interest = set([s[1] for s in statistics])
self.tree_statistics = collections.defaultdict(stats.StatisticsTracker)
self.process_accum_statistics = collections.defaultdict(stats.StatisticsTracker)
self.interval_count = 0
def handle_interval(self, interval):
self.interval_count += 1
rows = self.__rows_for_process(interval.rows, self.pid)
for column_name in self.columns_of_interest:
column_index = COLUMN_INDICES[column_name]
if column_name == "AccumT":
# Should not sum this across pids each interval, sum max at end...
for r in rows:
pid_seconds = self.__time_to_seconds(r[column_index])
self.process_accum_statistics[r[PID_INDEX]].track(pid_seconds)
else:
# All other stastics should be summed across whole process tree
# at each interval I guess.
if column_name in ["SysT", "UsrT", "PCT"]:
to_num = float
else:
to_num = long
interval_stat = sum(to_num(r[column_index]) for r in rows)
self.tree_statistics[column_name].track(interval_stat)
def get_statistics(self):
if self.interval_count == 0:
return []
computed_statistics = []
for statistic in self.statistics:
statistic_type, column = statistic
if column == "AccumT":
# Only thing that makes sense is sum
if statistic_type != "max":
log.warning("Only statistic max makes sense for AccumT")
continue
value = sum(v.max for v in self.process_accum_statistics.values())
else:
statistics_tracker = self.tree_statistics[column]
value = getattr(statistics_tracker, statistic_type)
computed_statistic = (statistic, value)
computed_statistics.append(computed_statistic)
return computed_statistics
def __rows_for_process(self, rows, pid):
process_rows = []
pids = self.__all_child_pids(rows, pid)
for row in rows:
if row[PID_INDEX] in pids:
process_rows.append(row)
return process_rows
def __all_child_pids(self, rows, pid):
pids_in_process_tree = set([str(self.pid)])
added = True
while added:
added = False
for row in rows:
pid = row[PID_INDEX]
parent_pid = row[PARENT_PID_INDEX]
if parent_pid in pids_in_process_tree and pid not in pids_in_process_tree:
pids_in_process_tree.add(pid)
added = True
return pids_in_process_tree
def __time_to_seconds(self, minutes_str):
parts = minutes_str.split(":")
seconds = 0.0
for i, val in enumerate(parts):
seconds += float(val) * (60 ** (len(parts) - (i + 1)))
return seconds
class CollectlProcessInterval(object):
""" Represent all rows in collectl playback file for given time slice with
ability to filter out just rows corresponding to the process tree
corresponding to a given pid.
"""
def __init__(self):
self.rows = []
def row_is_in(self, row):
if not self.rows: # No rows, this row defines interval.
return True
first_row = self.rows[0]
return first_row[0] == row[0] and first_row[1] == row[1]
def add_row(self, row):
self.rows.append(row)
def _tuplize_statistic(statistic):
if not isinstance(statistic, tuple):
statistic_split = statistic.split("_", 1)
statistic = (statistic_split[0].lower(), statistic_split[1])
return statistic
__all__ = ('generate_process_statistics', )