Source code for tokio.connectors.mmperfmon

#!/usr/bin/env python
"""
Connectors for the GPFS ``mmperfmon query usage`` and
``mmperfmon query gpfsNumberOperations``.

The typical output of ``mmperfmon query usage`` may look something like::

    Legend:
     1:	xxxxxxxx.nersc.gov|CPU|cpu_user
     2:	xxxxxxxx.nersc.gov|CPU|cpu_sys
     3:	xxxxxxxx.nersc.gov|Memory|mem_total
     4:	xxxxxxxx.nersc.gov|Memory|mem_free
     5:	xxxxxxxx.nersc.gov|Network|lo|net_r
     6:	xxxxxxxx.nersc.gov|Network|lo|net_s

    Row           Timestamp cpu_user cpu_sys   mem_total    mem_free     net_r     net_s
      1 2019-01-11-10:00:00      0.2    0.56  31371.0 MB  18786.5 MB    1.7 kB    1.7 kB
      2 2019-01-11-10:01:00     0.22    0.57  31371.0 MB  18785.6 MB    1.7 kB    1.7 kB
      3 2019-01-11-10:02:00     0.14    0.55  31371.0 MB  18785.1 MB    1.7 kB    1.7 kB

Whereas the typical output of ``mmperfmon query gpfsnsdds`` is::

    Legend:
     1: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md01|gpfs_nsdds_bytes_read
     2: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md02|gpfs_nsdds_bytes_read
     3: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md03|gpfs_nsdds_bytes_read

    Row           Timestamp gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read 
      1 2019-03-04-16:01:00             203539391                     0                     0
      2 2019-03-04-16:02:00             175109739                     0                     0
      3 2019-03-04-16:03:00              57053762                     0                     0

In general, each Legend: entry has the format::

    col_number: hostname|subsystem[|device_id]|counter_name

where

* col_number is an aribtrary number
* hostname is the fully qualified NSD server hostname
* subsystem is the type of component being measured (CPU, memory, network, disk)
* device_id is optional and represents the instance of the subsystem being
  measured (e.g., CPU core ID, network interface, or disk identifier)
* counter_name is the specific metric being measured

It is also worth noting that mmperfmon treats a timestamp labeled as, for
example, ``2019-03-04-16:01:00`` as containing all data from the period between
2019-03-04-16:00:00 and 2019-03-04-16:01:00.
"""

import os
import re
import json
import gzip
import tarfile
import datetime
import warnings
import mimetypes

import pandas

from .common import SubprocessOutputDict, walk_file_collection
from ..common import to_epoch, recast_string, JSONEncoder

_REX_LEGEND = re.compile(r'^\s*(\d+):\s+([^|]+)\|([^|]+)\|(\S+)\s*$')
_REX_ROWHEAD = re.compile(r'^\s*Row\s+Timestamp')
_REX_ROW = re.compile(r'^\s*\d+\s+(\d{4}-\d\d-\d\d-\d\d:\d\d:\d\d)\s+')

MMPERFMON_DATE_FMT = "%Y-%m-%d-%H:%M:%S"
MMPERFMON_UNITS_TO_BYTES = {
    "MB": 1048576,
    "kB": 1024,
}

[docs]class Mmperfmon(SubprocessOutputDict): """ Representation for the mmperfmon query command. Generates a dict of form:: { timestamp0: { "something0.nersc.gov": { "key0": value0, "key1": value1, ... }, "something1.nersc.gov": { ... }, ... }, timestamp1: { ... }, ... } """ def __init__(self, *args, **kwargs): super(Mmperfmon, self).__init__(*args, **kwargs) self.subprocess_cmd = None self.legend = {} self.rows = {} self.col_offsets = [] self.load()
[docs] def __repr__(self): """Returns string representation of self This does not convert back into a format that attempts to resemble the mmperfmon output because the process of loading mmperfmon output is lossy. """ return self.to_json()
[docs] @classmethod def from_str(cls, input_str): """Instantiate from a string """ return cls(from_string=input_str)
[docs] @classmethod def from_file(cls, cache_file): """Instantiate from a cache file """ return cls(cache_file=cache_file)
[docs] def load(self, cache_file=None): """Load either a tarfile, directory, or single mmperfmon output file Tries to load self.cache_file; if it is a directory or tarfile, it is handled by self.load_multiple; otherwise falls through to the load_str code path. """ if cache_file: self.cache_file = cache_file try: self.load_multiple(input_file=self.cache_file) except tarfile.ReadError: super(Mmperfmon, self).load()
[docs] def load_multiple(self, input_file): """Load one or more input files from a directory or tarball Args: input_file (str): Path to either a directory or a tarfile containing multiple text files, each of which contains the output of a single mmperfmon invocation. """ for (member_name, _, member_handle) in walk_file_collection(input_file): try: self.load_str(input_str=member_handle.read()) except: warnings.warn("Parsing error in %s" % member_name) raise
[docs] def load_cache(self, cache_file=None): """Loads from one of two formats of cache files Because self.save_cache() outputs to a different format from self.load_str(), load_cache() must be able to ingest both formats. """ if cache_file: self.cache_file = cache_file _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_fp = gzip.open(self.cache_file, 'rt') else: input_fp = open(self.cache_file, 'rt') try: loaded_json = json.load(input_fp) for key, value in loaded_json.items(): key = recast_string(key) self[key] = value except ValueError: input_fp.close() super(Mmperfmon, self).load_cache(cache_file=cache_file)
[docs] def load_str(self, input_str): """Parses the output of the subprocess output to initialize self Args: input_str (str): Text output of the ``mmperfmon query`` command """ # clean out state from any previous parsings self.legend = {} self.rows = {} self.col_offsets = [] for line in input_str.splitlines(): # decode the legend if not isinstance(line, str): line = line.decode() match = _REX_LEGEND.search(line) if match is not None: # extract values row, hostname, counter = (match.group(1), match.group(2), match.group(4)) # counter will catch LUN names in the case of nsd-level counters device_id = None if '|' in counter: # an optional device_id is specified; append it to the hostname device_id, counter = counter.rsplit('|', 1) self.legend[int(row)] = {'host': hostname, 'counter': counter, 'device_id': device_id} # the reverse map doesn't hurt to have # self.legend[hostname] = {'row': int(row), 'counter': counter} continue # find the row header to determine offsets match = _REX_ROWHEAD.search(line) if match is not None: self.col_offsets = get_col_pos(line, align='right') # divide data row into fixed-width fields based on row header format match = _REX_ROW.search(line) if match is not None and self.col_offsets: fields = [line[istart:istop] for istart, istop in self.col_offsets] rowid = int(fields[0].strip()) if rowid not in self.rows: self.rows[rowid] = fields else: # skip the rowid and timestamp columns after the first time a row appears self.rows[rowid] += fields[2:] # begin processing the fully tabularized data structure for rowid in sorted(self.rows.keys()): fields = self.rows[rowid] timestamp = to_epoch(datetime.datetime.strptime(fields[1], MMPERFMON_DATE_FMT)) for index, val in enumerate(fields[2:]): orig_counter = self.legend[index + 1]['counter'] hostname = self.legend[index + 1]['host'] device_id = self.legend[index + 1].get('device_id') # turn number strings into numerics old_value = recast_string(val.strip()) # skip null values entirely as if they never existed if old_value is None: continue # attempt to turn human-readable byte quantities into numbers of bytes new_value = value_unit_to_bytes(old_value) new_counter = orig_counter + "_bytes" if type(old_value) != type(new_value) else orig_counter if timestamp not in self: self[timestamp] = {} to_update = self[timestamp].get(hostname, {}) if device_id: if new_counter not in to_update: to_update[new_counter] = {} to_update[new_counter].update({device_id: new_value}) else: to_update[new_counter] = new_value if hostname not in self[timestamp]: self[timestamp][hostname] = to_update
[docs] def to_dataframe_by_host(self, host): """Returns data from a specific host as a DataFrame Args: host (str): Hostname from which a DataFrame should be constructed Returns: pandas.DataFrame: All measurements from the given host. Columns correspond to different metrics; indexed in time. """ to_df = {} for timestamp, hosts in self.items(): metrics = hosts.get(host) if metrics is not None: timestamp_o = datetime.datetime.fromtimestamp(timestamp) to_df[timestamp_o] = {} for key, value in metrics.items(): if isinstance(value, dict): for deviceid, devicevalue in value.items(): new_key = key + ":" + deviceid to_df[timestamp_o][new_key] = devicevalue else: to_df[timestamp_o][key] = value dataframe = pandas.DataFrame.from_dict(to_df, orient='index') dataframe.index.name = 'timestamp' return dataframe
[docs] def to_dataframe_by_metric(self, metric): """Returns data for a specific metric as a DataFrame Args: metric (str): Metric from which a DataFrame should be constructed Returns: pandas.DataFrame: All measurements of the given metric for all hosts. Columns represent hosts; indexed in time. """ to_df = {} for timestamp, hosts in self.items(): for hostname, counters in hosts.items(): value = counters.get(metric) if value is None: continue timestamp_o = datetime.datetime.fromtimestamp(timestamp) if timestamp_o not in to_df: to_df[timestamp_o] = {} if isinstance(value, dict): for deviceid, devicevalue in value.items(): key = hostname + ":" + deviceid to_df[timestamp_o][key] = devicevalue else: key = hostname to_df[timestamp_o][key] = value dataframe = pandas.DataFrame.from_dict(to_df, orient='index') dataframe.index.name = 'timestamp' return dataframe
[docs] def to_dataframe(self, by_host=None, by_metric=None): """Convert to a pandas.DataFrame """ if (by_host is None and by_metric is None) \ or (by_host is not None and by_metric is not None): raise RuntimeError("must specify either by_host or by_metric") elif by_host is not None: return self.to_dataframe_by_host(host=by_host) return self.to_dataframe_by_metric(metric=by_metric)
[docs] def to_json(self, **kwargs): """Returns a json-encoded string representation of self. Returns: str: JSON representation of self """ return json.dumps(self, cls=JSONEncoder, **kwargs)
[docs]def get_col_pos(line, align=None): """Return column offsets of a left-aligned text table For example, given the string:: Row Timestamp cpu_user cpu_sys mem_total 123456789x123456789x123456789x123456789x123456789x123456789x would return:: [(0, 4), (15, 24), (25, 33), (34, 41), (44, 53)] for ``align=None``. Args: line (str): String from which offsets should be determined align (str or None): Expected column alignment; one of 'left', 'right', or None (to return the exact start and stop of just the non-space text) Returns: list: List of tuples of integer offsets denoting the start index (inclusive) and stop index (exclusive) for each column. """ col_pos = [] last_start = None for index, char in enumerate(line): # if this is the first space after a word if char == ' ' and last_start is not None: col_pos.append((last_start, index)) last_start = None # if this is the first letter in a word elif char != ' ' and last_start is None: last_start = index if last_start: col_pos.append((last_start, None)) if len(col_pos) > 1: if align and (align[0] == 'l' or align[0] == 'L'): old_col_pos = col_pos col_pos = [] for index, (start, stop) in enumerate(old_col_pos[:-1]): # change end point to start point of next column col_pos.append((start, old_col_pos[index+1][0] - 1)) # force last column out to the end of the line col_pos.append((col_pos[-1][1] + 1, None)) elif align and (align[0] == 'r' or align[0] == 'R'): old_col_pos = col_pos # force the first column to the beginning of the line col_pos = [(0, col_pos[0][1])] for index, (start, stop) in enumerate(old_col_pos[1:]): # change start point to end point of previous column col_pos.append((old_col_pos[index][1] + 1, stop)) return col_pos
[docs]def value_unit_to_bytes(value_unit): """Converts a value+unit string into bytes Converts a string containing both a numerical value and a unit of that value into a normalized value. For example, "1 MB" will convert to 1048576. Args: value_unit (str): Of the format "float str" where float is the value and str is the unit by which value is expressed. Returns: int: Number of bytes represented by value_unit """ try: val, unit = value_unit.split() except AttributeError: # occurs of value_unit isn't a string return value_unit val = float(val) multiple = MMPERFMON_UNITS_TO_BYTES.get(unit) if multiple is not None: return val * multiple return value_unit