#!/usr/bin/env python
"""
Connectors for the GPFS ``mmperfmon query usage`` and
``mmperfmon query gpfsNumberOperations``.
The typical output of ``mmperfmon query usage`` may look something like::
Legend:
1: xxxxxxxx.nersc.gov|CPU|cpu_user
2: xxxxxxxx.nersc.gov|CPU|cpu_sys
3: xxxxxxxx.nersc.gov|Memory|mem_total
4: xxxxxxxx.nersc.gov|Memory|mem_free
5: xxxxxxxx.nersc.gov|Network|lo|net_r
6: xxxxxxxx.nersc.gov|Network|lo|net_s
Row Timestamp cpu_user cpu_sys mem_total mem_free net_r net_s
1 2019-01-11-10:00:00 0.2 0.56 31371.0 MB 18786.5 MB 1.7 kB 1.7 kB
2 2019-01-11-10:01:00 0.22 0.57 31371.0 MB 18785.6 MB 1.7 kB 1.7 kB
3 2019-01-11-10:02:00 0.14 0.55 31371.0 MB 18785.1 MB 1.7 kB 1.7 kB
Whereas the typical output of ``mmperfmon query gpfsnsdds`` is::
Legend:
1: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md01|gpfs_nsdds_bytes_read
2: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md02|gpfs_nsdds_bytes_read
3: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md03|gpfs_nsdds_bytes_read
Row Timestamp gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read
1 2019-03-04-16:01:00 203539391 0 0
2 2019-03-04-16:02:00 175109739 0 0
3 2019-03-04-16:03:00 57053762 0 0
In general, each Legend: entry has the format::
col_number: hostname|subsystem[|device_id]|counter_name
where
* col_number is an aribtrary number
* hostname is the fully qualified NSD server hostname
* subsystem is the type of component being measured (CPU, memory, network, disk)
* device_id is optional and represents the instance of the subsystem being
measured (e.g., CPU core ID, network interface, or disk identifier)
* counter_name is the specific metric being measured
It is also worth noting that mmperfmon treats a timestamp labeled as, for
example, ``2019-03-04-16:01:00`` as containing all data from the period between
2019-03-04-16:00:00 and 2019-03-04-16:01:00.
"""
import os
import re
import json
import gzip
import tarfile
import datetime
import warnings
import mimetypes
import pandas
from .common import SubprocessOutputDict, walk_file_collection
from ..common import to_epoch, recast_string, JSONEncoder
_REX_LEGEND = re.compile(r'^\s*(\d+):\s+([^|]+)\|([^|]+)\|(\S+)\s*$')
_REX_ROWHEAD = re.compile(r'^\s*Row\s+Timestamp')
_REX_ROW = re.compile(r'^\s*\d+\s+(\d{4}-\d\d-\d\d-\d\d:\d\d:\d\d)\s+')
MMPERFMON_DATE_FMT = "%Y-%m-%d-%H:%M:%S"
MMPERFMON_UNITS_TO_BYTES = {
"MB": 1048576,
"kB": 1024,
}
[docs]class Mmperfmon(SubprocessOutputDict):
"""
Representation for the mmperfmon query command. Generates a dict of form::
{
timestamp0: {
"something0.nersc.gov": {
"key0": value0,
"key1": value1,
...
},
"something1.nersc.gov": {
...
},
...
},
timestamp1: {
...
},
...
}
"""
def __init__(self, *args, **kwargs):
super(Mmperfmon, self).__init__(*args, **kwargs)
self.subprocess_cmd = None
self.legend = {}
self.rows = {}
self.col_offsets = []
self.load()
[docs] def __repr__(self):
"""Returns string representation of self
This does not convert back into a format that attempts to resemble the
mmperfmon output because the process of loading mmperfmon output is
lossy.
"""
return self.to_json()
[docs] @classmethod
def from_str(cls, input_str):
"""Instantiate from a string
"""
return cls(from_string=input_str)
[docs] @classmethod
def from_file(cls, cache_file):
"""Instantiate from a cache file
"""
return cls(cache_file=cache_file)
[docs] def load(self, cache_file=None):
"""Load either a tarfile, directory, or single mmperfmon output file
Tries to load self.cache_file; if it is a directory or tarfile, it is
handled by self.load_multiple; otherwise falls through to the load_str
code path.
"""
if cache_file:
self.cache_file = cache_file
try:
self.load_multiple(input_file=self.cache_file)
except tarfile.ReadError:
super(Mmperfmon, self).load()
[docs] def load_multiple(self, input_file):
"""Load one or more input files from a directory or tarball
Args:
input_file (str): Path to either a directory or a tarfile containing
multiple text files, each of which contains the output of a single
mmperfmon invocation.
"""
for (member_name, _, member_handle) in walk_file_collection(input_file):
try:
self.load_str(input_str=member_handle.read())
except:
warnings.warn("Parsing error in %s" % member_name)
raise
[docs] def load_cache(self, cache_file=None):
"""Loads from one of two formats of cache files
Because self.save_cache() outputs to a different format from
self.load_str(), load_cache() must be able to ingest both formats.
"""
if cache_file:
self.cache_file = cache_file
_, encoding = mimetypes.guess_type(self.cache_file)
if encoding == 'gzip':
input_fp = gzip.open(self.cache_file, 'rt')
else:
input_fp = open(self.cache_file, 'rt')
try:
loaded_json = json.load(input_fp)
for key, value in loaded_json.items():
key = recast_string(key)
self[key] = value
except ValueError:
input_fp.close()
super(Mmperfmon, self).load_cache(cache_file=cache_file)
[docs] def load_str(self, input_str):
"""Parses the output of the subprocess output to initialize self
Args:
input_str (str): Text output of the ``mmperfmon query`` command
"""
# clean out state from any previous parsings
self.legend = {}
self.rows = {}
self.col_offsets = []
for line in input_str.splitlines():
# decode the legend
if not isinstance(line, str):
line = line.decode()
match = _REX_LEGEND.search(line)
if match is not None:
# extract values
row, hostname, counter = (match.group(1), match.group(2), match.group(4))
# counter will catch LUN names in the case of nsd-level counters
device_id = None
if '|' in counter:
# an optional device_id is specified; append it to the hostname
device_id, counter = counter.rsplit('|', 1)
self.legend[int(row)] = {'host': hostname, 'counter': counter, 'device_id': device_id}
# the reverse map doesn't hurt to have
# self.legend[hostname] = {'row': int(row), 'counter': counter}
continue
# find the row header to determine offsets
match = _REX_ROWHEAD.search(line)
if match is not None:
self.col_offsets = get_col_pos(line, align='right')
# divide data row into fixed-width fields based on row header format
match = _REX_ROW.search(line)
if match is not None and self.col_offsets:
fields = [line[istart:istop] for istart, istop in self.col_offsets]
rowid = int(fields[0].strip())
if rowid not in self.rows:
self.rows[rowid] = fields
else:
# skip the rowid and timestamp columns after the first time a row appears
self.rows[rowid] += fields[2:]
# begin processing the fully tabularized data structure
for rowid in sorted(self.rows.keys()):
fields = self.rows[rowid]
timestamp = to_epoch(datetime.datetime.strptime(fields[1], MMPERFMON_DATE_FMT))
for index, val in enumerate(fields[2:]):
orig_counter = self.legend[index + 1]['counter']
hostname = self.legend[index + 1]['host']
device_id = self.legend[index + 1].get('device_id')
# turn number strings into numerics
old_value = recast_string(val.strip())
# skip null values entirely as if they never existed
if old_value is None:
continue
# attempt to turn human-readable byte quantities into numbers of bytes
new_value = value_unit_to_bytes(old_value)
new_counter = orig_counter + "_bytes" if type(old_value) != type(new_value) else orig_counter
if timestamp not in self:
self[timestamp] = {}
to_update = self[timestamp].get(hostname, {})
if device_id:
if new_counter not in to_update:
to_update[new_counter] = {}
to_update[new_counter].update({device_id: new_value})
else:
to_update[new_counter] = new_value
if hostname not in self[timestamp]:
self[timestamp][hostname] = to_update
[docs] def to_dataframe_by_host(self, host):
"""Returns data from a specific host as a DataFrame
Args:
host (str): Hostname from which a DataFrame should be constructed
Returns:
pandas.DataFrame: All measurements from the given host. Columns
correspond to different metrics; indexed in time.
"""
to_df = {}
for timestamp, hosts in self.items():
metrics = hosts.get(host)
if metrics is not None:
timestamp_o = datetime.datetime.fromtimestamp(timestamp)
to_df[timestamp_o] = {}
for key, value in metrics.items():
if isinstance(value, dict):
for deviceid, devicevalue in value.items():
new_key = key + ":" + deviceid
to_df[timestamp_o][new_key] = devicevalue
else:
to_df[timestamp_o][key] = value
dataframe = pandas.DataFrame.from_dict(to_df, orient='index')
dataframe.index.name = 'timestamp'
return dataframe
[docs] def to_dataframe_by_metric(self, metric):
"""Returns data for a specific metric as a DataFrame
Args:
metric (str): Metric from which a DataFrame should be constructed
Returns:
pandas.DataFrame: All measurements of the given metric for all
hosts. Columns represent hosts; indexed in time.
"""
to_df = {}
for timestamp, hosts in self.items():
for hostname, counters in hosts.items():
value = counters.get(metric)
if value is None:
continue
timestamp_o = datetime.datetime.fromtimestamp(timestamp)
if timestamp_o not in to_df:
to_df[timestamp_o] = {}
if isinstance(value, dict):
for deviceid, devicevalue in value.items():
key = hostname + ":" + deviceid
to_df[timestamp_o][key] = devicevalue
else:
key = hostname
to_df[timestamp_o][key] = value
dataframe = pandas.DataFrame.from_dict(to_df, orient='index')
dataframe.index.name = 'timestamp'
return dataframe
[docs] def to_dataframe(self, by_host=None, by_metric=None):
"""Convert to a pandas.DataFrame
"""
if (by_host is None and by_metric is None) \
or (by_host is not None and by_metric is not None):
raise RuntimeError("must specify either by_host or by_metric")
elif by_host is not None:
return self.to_dataframe_by_host(host=by_host)
return self.to_dataframe_by_metric(metric=by_metric)
[docs] def to_json(self, **kwargs):
"""Returns a json-encoded string representation of self.
Returns:
str: JSON representation of self
"""
return json.dumps(self, cls=JSONEncoder, **kwargs)
[docs]def get_col_pos(line, align=None):
"""Return column offsets of a left-aligned text table
For example, given the string::
Row Timestamp cpu_user cpu_sys mem_total
123456789x123456789x123456789x123456789x123456789x123456789x
would return::
[(0, 4), (15, 24), (25, 33), (34, 41), (44, 53)]
for ``align=None``.
Args:
line (str): String from which offsets should be determined
align (str or None): Expected column alignment; one of 'left', 'right',
or None (to return the exact start and stop of just the non-space
text)
Returns:
list: List of tuples of integer offsets denoting the start index
(inclusive) and stop index (exclusive) for each column.
"""
col_pos = []
last_start = None
for index, char in enumerate(line):
# if this is the first space after a word
if char == ' ' and last_start is not None:
col_pos.append((last_start, index))
last_start = None
# if this is the first letter in a word
elif char != ' ' and last_start is None:
last_start = index
if last_start:
col_pos.append((last_start, None))
if len(col_pos) > 1:
if align and (align[0] == 'l' or align[0] == 'L'):
old_col_pos = col_pos
col_pos = []
for index, (start, stop) in enumerate(old_col_pos[:-1]):
# change end point to start point of next column
col_pos.append((start, old_col_pos[index+1][0] - 1))
# force last column out to the end of the line
col_pos.append((col_pos[-1][1] + 1, None))
elif align and (align[0] == 'r' or align[0] == 'R'):
old_col_pos = col_pos
# force the first column to the beginning of the line
col_pos = [(0, col_pos[0][1])]
for index, (start, stop) in enumerate(old_col_pos[1:]):
# change start point to end point of previous column
col_pos.append((old_col_pos[index][1] + 1, stop))
return col_pos
[docs]def value_unit_to_bytes(value_unit):
"""Converts a value+unit string into bytes
Converts a string containing both a numerical value and a unit of that value
into a normalized value. For example, "1 MB" will convert to 1048576.
Args:
value_unit (str): Of the format "float str" where float is the value and
str is the unit by which value is expressed.
Returns:
int: Number of bytes represented by value_unit
"""
try:
val, unit = value_unit.split()
except AttributeError: # occurs of value_unit isn't a string
return value_unit
val = float(val)
multiple = MMPERFMON_UNITS_TO_BYTES.get(unit)
if multiple is not None:
return val * multiple
return value_unit