Source code for tokio.connectors.mmperfmon

#!/usr/bin/env python
"""
Connectors for the GPFS ``mmperfmon query usage`` and
``mmperfmon query gpfsNumberOperations``.

The typical output of ``mmperfmon query usage`` may look something like::

    Legend:
     1:	xxxxxxxx.nersc.gov|CPU|cpu_user
     2:	xxxxxxxx.nersc.gov|CPU|cpu_sys
     3:	xxxxxxxx.nersc.gov|Memory|mem_total
     4:	xxxxxxxx.nersc.gov|Memory|mem_free
     5:	xxxxxxxx.nersc.gov|Network|lo|net_r
     6:	xxxxxxxx.nersc.gov|Network|lo|net_s

    Row           Timestamp cpu_user cpu_sys   mem_total    mem_free     net_r     net_s
      1 2019-01-11-10:00:00      0.2    0.56  31371.0 MB  18786.5 MB    1.7 kB    1.7 kB
      2 2019-01-11-10:01:00     0.22    0.57  31371.0 MB  18785.6 MB    1.7 kB    1.7 kB
      3 2019-01-11-10:02:00     0.14    0.55  31371.0 MB  18785.1 MB    1.7 kB    1.7 kB

Whereas the typical output of ``mmperfmon query gpfsnsdds`` is::

    Legend:
     1: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md01|gpfs_nsdds_bytes_read
     2: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md02|gpfs_nsdds_bytes_read
     3: xxxxxxxx.nersc.gov|GPFSNSDDisk|na07md03|gpfs_nsdds_bytes_read

    Row           Timestamp gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read gpfs_nsdds_bytes_read 
      1 2019-03-04-16:01:00             203539391                     0                     0
      2 2019-03-04-16:02:00             175109739                     0                     0
      3 2019-03-04-16:03:00              57053762                     0                     0

In general, each Legend: entry has the format::

    col_number: hostname|subsystem[|device_id]|counter_name

where

* col_number is an aribtrary number
* hostname is the fully qualified NSD server hostname
* subsystem is the type of component being measured (CPU, memory, network, disk)
* device_id is optional and represents the instance of the subsystem being
  measured (e.g., CPU core ID, network interface, or disk identifier)
* counter_name is the specific metric being measured

"""

import os
import re
import json
import gzip
import tarfile
import datetime
import warnings
import mimetypes

import pandas

from .common import SubprocessOutputDict, walk_file_collection
from ..common import to_epoch, recast_string, JSONEncoder

_REX_LEGEND = re.compile(r'^\s*(\d+):\s+([^|]+)\|([^|]+)\|(\S+)\s*$')
_REX_ROWHEAD = re.compile(r'^\s*Row\s+Timestamp')
_REX_ROW = re.compile(r'^\s*\d+\s+(\d{4}-\d\d-\d\d-\d\d:\d\d:\d\d)\s+')

MMPERFMON_DATE_FMT = "%Y-%m-%d-%H:%M:%S"
MMPERFMON_UNITS_TO_BYTES = {
    "MB": 1048576,
    "kB": 1024,
}

[docs]class Mmperfmon(SubprocessOutputDict): """ Representation for the mmperfmon query command. Generates a dict of form:: { timestamp0: { "something0.nersc.gov": { "key0": value0, "key1": value1, ... }, "something1.nersc.gov": { ... }, ... }, timestamp1: { ... }, ... } """ def __init__(self, *args, **kwargs): super(Mmperfmon, self).__init__(*args, **kwargs) self.subprocess_cmd = None self.legend = {} self.rows = {} self.col_offsets = [] self.load()
[docs] def __repr__(self): """Returns string representation of self This does not convert back into a format that attempts to resemble the mmperfmon output because the process of loading mmperfmon output is lossy. """ return self.to_json()
[docs] @classmethod def from_str(cls, input_str): """Instantiate from a string """ return cls(from_string=input_str)
[docs] @classmethod def from_file(cls, cache_file): """Instantiate from a cache file """ return cls(cache_file=cache_file)
[docs] def load(self, cache_file=None): """Load either a tarfile, directory, or single mmperfmon output file Tries to load self.cache_file; if it is a directory or tarfile, it is handled by self.load_multiple; otherwise falls through to the load_str code path. """ if cache_file: self.cache_file = cache_file try: self.load_multiple(input_file=self.cache_file) except tarfile.ReadError: super(Mmperfmon, self).load()
[docs] def load_multiple(self, input_file): """Load one or more input files from a directory or tarball Args: input_file (str): Path to either a directory or a tarfile containing multiple text files, each of which contains the output of a single mmperfmon invocation. """ for (member_name, _, member_handle) in walk_file_collection(input_file): try: self.load_str(input_str=member_handle.read()) except: warnings.warn("Parsing error in %s" % member_name) raise
[docs] def load_cache(self, cache_file=None): """Loads from one of two formats of cache files Because self.save_cache() outputs to a different format from self.load_str(), load_cache() must be able to ingest both formats. """ if cache_file: self.cache_file = cache_file _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_fp = gzip.open(self.cache_file, 'rt') else: input_fp = open(self.cache_file, 'r') try: loaded_json = json.load(input_fp) for key, value in loaded_json.items(): key = recast_string(key) self[key] = value except ValueError: input_fp.close() super(Mmperfmon, self).load_cache(cache_file=cache_file)
[docs] def load_str(self, input_str): """Parses the output of the subprocess output to initialize self Args: input_str (str): Text output of the ``mmperfmon query`` command """ # clean out state from any previous parsings self.legend = {} self.rows = {} self.col_offsets = [] for line in input_str.splitlines(): # decode the legend if not isinstance(line, str): line = line.decode() match = _REX_LEGEND.search(line) if match is not None: # extract values row, hostname, counter = (match.group(1), match.group(2), match.group(4)) # counter will catch LUN names in the case of nsd-level counters device_id = None if '|' in counter: # an optional device_id is specified; append it to the hostname device_id, counter = counter.rsplit('|', 1) self.legend[int(row)] = {'host': hostname, 'counter': counter, 'device_id': device_id} # the reverse map doesn't hurt to have # self.legend[hostname] = {'row': int(row), 'counter': counter} continue # find the row header to determine offsets match = _REX_ROWHEAD.search(line) if match is not None: self.col_offsets = get_col_pos(line, align='right') # divide data row into fixed-width fields based on row header format match = _REX_ROW.search(line) if match is not None and self.col_offsets: fields = [line[istart:istop] for istart, istop in self.col_offsets] rowid = int(fields[0].strip()) if rowid not in self.rows: self.rows[rowid] = fields else: # skip the rowid and timestamp columns after the first time a row appears self.rows[rowid] += fields[2:] # begin processing the fully tabularized data structure for rowid in sorted(self.rows.keys()): fields = self.rows[rowid] timestamp = to_epoch(datetime.datetime.strptime(fields[1], MMPERFMON_DATE_FMT)) for index, val in enumerate(fields[2:]): orig_counter = self.legend[index + 1]['counter'] hostname = self.legend[index + 1]['host'] device_id = self.legend[index + 1].get('device_id') # turn number strings into numerics old_value = recast_string(val.strip()) # skip null values entirely as if they never existed if old_value is None: continue # attempt to turn human-readable byte quantities into numbers of bytes new_value = value_unit_to_bytes(old_value) new_counter = orig_counter + "_bytes" if type(old_value) != type(new_value) else orig_counter if timestamp not in self: self[timestamp] = {} to_update = self[timestamp].get(hostname, {}) if device_id: if new_counter not in to_update: to_update[new_counter] = {} to_update[new_counter].update({device_id: new_value}) else: to_update[new_counter] = new_value if hostname not in self[timestamp]: self[timestamp][hostname] = to_update
[docs] def to_dataframe_by_host(self, host): """Returns data from a specific host as a DataFrame Args: host (str): Hostname from which a DataFrame should be constructed Returns: pandas.DataFrame: All measurements from the given host. Columns correspond to different metrics; indexed in time. """ to_df = {} for timestamp, hosts in self.items(): metrics = hosts.get(host) if metrics is not None: timestamp_o = datetime.datetime.fromtimestamp(timestamp) to_df[timestamp_o] = {} for key, value in metrics.items(): if isinstance(value, dict): for deviceid, devicevalue in value.items(): new_key = key + ":" + deviceid to_df[timestamp_o][new_key] = devicevalue else: to_df[timestamp_o][key] = value dataframe = pandas.DataFrame.from_dict(to_df, orient='index') dataframe.index.name = 'timestamp' return dataframe
[docs] def to_dataframe_by_metric(self, metric): """Returns data for a specific metric as a DataFrame Args: metric (str): Metric from which a DataFrame should be constructed Returns: pandas.DataFrame: All measurements of the given metric for all hosts. Columns represent hosts; indexed in time. """ to_df = {} for timestamp, hosts in self.items(): for hostname, counters in hosts.items(): value = counters.get(metric) if value is None: continue timestamp_o = datetime.datetime.fromtimestamp(timestamp) if timestamp_o not in to_df: to_df[timestamp_o] = {} if isinstance(value, dict): for deviceid, devicevalue in value.items(): key = hostname + ":" + deviceid to_df[timestamp_o][key] = devicevalue else: key = hostname to_df[timestamp_o][key] = value dataframe = pandas.DataFrame.from_dict(to_df, orient='index') dataframe.index.name = 'timestamp' return dataframe
[docs] def to_dataframe(self, by_host=None, by_metric=None): """Convert to a pandas.DataFrame """ if (by_host is None and by_metric is None) \ or (by_host is not None and by_metric is not None): raise RuntimeError("must specify either by_host or by_metric") elif by_host is not None: return self.to_dataframe_by_host(host=by_host) return self.to_dataframe_by_metric(metric=by_metric)
[docs] def to_json(self, **kwargs): """Returns a json-encoded string representation of self. Returns: str: JSON representation of self """ return json.dumps(self, cls=JSONEncoder, **kwargs)
[docs]def get_col_pos(line, align=None): """Return column offsets of a left-aligned text table For example, given the string:: Row Timestamp cpu_user cpu_sys mem_total 123456789x123456789x123456789x123456789x123456789x123456789x would return:: [(0, 4), (15, 24), (25, 33), (34, 41), (44, 53)] for ``align=None``. Args: line (str): String from which offsets should be determined align (str or None): Expected column alignment; one of 'left', 'right', or None (to return the exact start and stop of just the non-space text) Returns: list: List of tuples of integer offsets denoting the start index (inclusive) and stop index (exclusive) for each column. """ col_pos = [] last_start = None for index, char in enumerate(line): # if this is the first space after a word if char == ' ' and last_start is not None: col_pos.append((last_start, index)) last_start = None # if this is the first letter in a word elif char != ' ' and last_start is None: last_start = index if last_start: col_pos.append((last_start, None)) if len(col_pos) > 1: if align and (align[0] == 'l' or align[0] == 'L'): old_col_pos = col_pos col_pos = [] for index, (start, stop) in enumerate(old_col_pos[:-1]): # change end point to start point of next column col_pos.append((start, old_col_pos[index+1][0] - 1)) # force last column out to the end of the line col_pos.append((col_pos[-1][1] + 1, None)) elif align and (align[0] == 'r' or align[0] == 'R'): old_col_pos = col_pos # force the first column to the beginning of the line col_pos = [(0, col_pos[0][1])] for index, (start, stop) in enumerate(old_col_pos[1:]): # change start point to end point of previous column col_pos.append((old_col_pos[index][1] + 1, stop)) return col_pos
[docs]def value_unit_to_bytes(value_unit): """Converts a value+unit string into bytes Converts a string containing both a numerical value and a unit of that value into a normalized value. For example, "1 MB" will convert to 1048576. Args: value_unit (str): Of the format "float str" where float is the value and str is the unit by which value is expressed. Returns: int: Number of bytes represented by value_unit """ try: val, unit = value_unit.split() except AttributeError: # occurs of value_unit isn't a string return value_unit val = float(val) multiple = MMPERFMON_UNITS_TO_BYTES.get(unit) if multiple is not None: return val * multiple return value_unit