Source code for tokio.cli.summarize_h5lmt

"""
Generate summary metrics from an h5lmt file.  Will be eventually replaced by the
summarize_tts command-line tool.
"""

import os
import sys
import json
import time
import math
import datetime
import argparse
import warnings
import tokio.common
import tokio.connectors.hdf5
import tokio.tools.hdf5

DATASETS_TO_BIN_KEYS = {
    'datatargets/readbytes': 'ost_read',
    'datatargets/writebytes': 'ost_write',
    'dataservers/cpuload': 'oss_cpu',
    'mdservers/cpuload': 'mds_cpu',
}

BYTES_TO_GIB = 2.0**30
DATE_FMT = "%Y-%m-%d"
DATE_FMT_STR = "YYYY-MM-DDTHH:MM:SS"

[docs]def summarize_reduced_data(data): """ Take a list of LMT data sets and return summaries of each relevant key """ totals = { 'n': 0, 'sum_bytes_read': 0, 'sum_bytes_write': 0, 'oss_ave': -0.0, 'oss_max': -0.0, 'mds_ave': -0.0, 'mds_max': -0.0, 'missing_ost_read': 0, 'missing_ost_write': 0, 'missing_oss_cpu': 0, 'missing_mds_cpu': 0, 'num_ost_read': 0, 'num_ost_write': 0, 'num_oss_cpu': 0, 'num_mds_cpu': 0, } for timestamp in sorted(data.keys()): datum = data[timestamp] points_in_bin = (datum['indexf'] - datum['index0']) totals['sum_bytes_read'] += datum['sum_ost_read'] totals['sum_bytes_write'] += datum['sum_ost_write'] totals['n'] += points_in_bin if 'ave_oss_cpu' in datum: totals['oss_ave'] += datum['ave_oss_cpu'] * points_in_bin if 'ave_mds_cpu' in datum: totals['mds_ave'] += datum['ave_mds_cpu'] * points_in_bin if 'max_oss_cpu' in datum: if datum['max_oss_cpu'] > totals['oss_max']: totals['oss_max'] = datum['max_oss_cpu'] else: totals['oss_max'] = totals['oss_max'] if 'max_mds_cpu' in datum: if datum['max_mds_cpu'] > totals['mds_max']: totals['mds_max'] = datum['max_mds_cpu'] else: totals['mds_max'] = totals['mds_max'] totals['missing_ost_read'] += datum.get('missing_ost_read', 0) totals['missing_ost_write'] += datum.get('missing_ost_write', 0) totals['missing_oss_cpu'] += datum.get('missing_oss_cpu', 0) totals['missing_mds_cpu'] += datum.get('missing_mds_cpu', 0) totals['num_ost_read'] += datum.get('num_ost_read', 0) totals['num_ost_write'] += datum.get('num_ost_write', 0) totals['num_oss_cpu'] += datum.get('num_oss_cpu', 0) totals['num_mds_cpu'] += datum.get('num_mds_cpu', 0) # Derived values totals['ave_bytes_read_per_dt'] = totals['sum_bytes_read'] / totals['n'] totals['ave_bytes_write_per_dt'] = totals['sum_bytes_write'] / totals['n'] totals['sum_kibs_read'] = totals['sum_bytes_read'] / 1024.0 totals['sum_kibs_write'] = totals['sum_bytes_write'] / 1024.0 totals['sum_mibs_read'] = totals['sum_kibs_read'] / 1024.0 totals['sum_mibs_write'] = totals['sum_kibs_write'] / 1024.0 totals['sum_gibs_read'] = totals['sum_mibs_read'] / 1024.0 totals['sum_gibs_write'] = totals['sum_mibs_write'] / 1024.0 totals['sum_tibs_read'] = totals['sum_gibs_read'] / 1024.0 totals['sum_tibs_write'] = totals['sum_gibs_write'] / 1024.0 totals['ave_gibs_read_per_dt'] = totals['sum_gibs_read'] / totals['n'] totals['ave_gibs_write_per_dt'] = totals['sum_gibs_write'] / totals['n'] totals['oss_ave'] = totals['oss_ave'] / totals['n'] totals['mds_ave'] = totals['mds_ave'] / totals['n'] # Missing fractions for base in ['ost_read', 'ost_write', 'oss_cpu', 'mds_cpu']: if totals['num_%s' % base] > 0: totals['frac_missing_%s' % base] = float(totals['missing_%s' % base]) \ / totals['num_%s' % base] # For convenience totals['ave_tibs_read_per_dt'] = totals['sum_gibs_read'] / totals['n'] totals['ave_tibs_write_per_dt'] = totals['sum_gibs_write'] / totals['n'] return totals
[docs]def bin_datasets(hdf5_file, dataset_names, orient='columns', num_bins=24): """Group many timeseries datasets into bins Takes a TOKIO HDF file and converts it into bins of reduced data (e.g., bin by hourly totals) Args: hdf5_file (connectors.Hdf5): HDF5 file from where data should be retrieved dataset_names (list of str): dataset names to be aggregated columns (str): either 'columns' or 'index'; same semantic meaning as pandas.DataFrame.from_dict num_binds (int): number of bins to generate per day Returns: Dictionary of lists. Keys are metrics, and values (lists) are the aggregated value of that metric in a single timestep bin. For example:: { "sum_some_metric": [ 0, 2, 3, 1], "sum_someother_metric": [9.9, 2.3, 5.1, 0.2], } """ bins_by_columns = {} bins_by_index = {} found_metrics = {} for dataset_key in dataset_names: # binned_dataset is a list of dictionaries. Each list element # corresponds to a single time bin, and its key-value pairs are # individual metrics aggregated over that bin. # # binned_dataset = { # "ave_mds_cpu": [ # 0.86498, # 0.80356, # ... # ], # "ave_oss_cpu": [ # 39.7725, # 39.2170, # ... # ], # ... # } binned_dataset = bin_dataset(hdf5_file, dataset_key, num_bins) # loop over time bins for index, counters in enumerate(binned_dataset): # loop over aggregate metrics in a single bin # key = ave_mds_cpu # value = { 0.86498, 0.80356 } for key, value in counters.items(): # look for keys that appear in multiple datasets. only record # values for the first dataset in which the key is encountered if key in found_metrics: if found_metrics[key] != dataset_key: if bins_by_columns[key][index] != value: raise Exception("Not all timesteps are the same in HDF5") continue else: found_metrics[key] = dataset_key # append the value to its metric counter in bins_by_columns if key in bins_by_columns: bins_by_columns[key].append(value) else: bins_by_columns[key] = [value] # append the value to its metric counter in bins_by_index if 'tstart' in counters: index_key = int(time.mktime(counters['tstart'].timetuple())) if index_key not in bins_by_index: bins_by_index[index_key] = {} bins_by_index[index_key].update(counters) if orient == 'columns': return bins_by_columns elif orient == 'index': return bins_by_index else: raise ValueError('only recognize index or columns for orient')
[docs]def bin_dataset(hdf5_file, dataset_name, num_bins): """Group timeseries dataset into bins Args: dataset (h5py.Dataset): dataset to be binned up Returns: list of dictionaries corresponding to bins. Each dictionary contains data summarized over that bin's time interval. """ base_key = DATASETS_TO_BIN_KEYS.get(dataset_name.lstrip('/')) if not base_key: raise KeyError("Cannot bin unknown dataset %s" % dataset_name) missing_key = "missing_" + base_key total_key = "num_" + base_key dataset = hdf5_file.get(dataset_name) if dataset is None: return [] timestamps = hdf5_file.get_timestamps(dataset_name)[...] columns = hdf5_file.get_columns(dataset_name) if hdf5_file.schema: if not (timestamps.shape[0] % num_bins) == 0: warnings.warn("Bin count %d does not evenly divide into FSStepsDataSet size %d" % (num_bins, (timestamps.shape[0]))) dt_per_bin = int(timestamps.shape[0] / num_bins) else: # no schema means h5lmt file, which has + 1 extra timestep if not ((timestamps.shape[0] - 1) % num_bins) == 0: warnings.warn("Bin count %d does not evenly divide into FSStepsDataSet size %d" % (num_bins, (timestamps.shape[0] - 1))) dt_per_bin = int((timestamps.shape[0] - 1) / num_bins) # we generate the missing data matrix for each dataset only once--it's very # expensive to do this multiple times missing_dataset = hdf5_file.get_missing(dataset_name) # create a list of dictionaries, where each list element is a bin binned_data = [] for bin_num in range(num_bins): index0 = bin_num * dt_per_bin indexf = (bin_num+1) * dt_per_bin bin_datum = { "index0": index0, "indexf": indexf, "tstart": datetime.datetime.fromtimestamp(timestamps[index0]), 'tend': datetime.datetime.fromtimestamp(timestamps[indexf - 1]), } bin_datum["max_" + base_key] = dataset[index0:indexf, :].max() bin_datum["min_" + base_key] = dataset[index0:indexf, :].min() bin_datum["sum_" + base_key] = dataset[index0:indexf, :].sum() bin_datum["ave_" + base_key] = bin_datum["sum_" + base_key] / float(indexf - index0) bin_datum["ave_" + base_key] /= columns.shape[0] bin_datum[missing_key] = missing_dataset[index0:indexf, :].sum() bin_datum[total_key] = (indexf - index0) * columns.shape[0] if bin_datum[total_key]: bin_datum["frac_" + missing_key] = float(bin_datum[missing_key]) / bin_datum[total_key] else: bin_datum["frac_" + missing_key] = 1.0 if base_key.startswith('ost_'): for agg_key in 'max', 'min', 'ave', 'sum': root = "%s_%s" % (agg_key, base_key) bin_datum['%s_bytes' % root] = bin_datum[root] bin_datum['%s_kibs' % root] = bin_datum[root] / 1024.0 bin_datum['%s_mibs' % root] = bin_datum['%s_kibs' % root] / 1024.0 bin_datum['%s_gibs' % root] = bin_datum['%s_mibs' % root] / 1024.0 bin_datum['%s_tibs' % root] = bin_datum['%s_gibs' % root] / 1024.0 binned_data.append(bin_datum) return binned_data
[docs]def main(argv=None): """Entry point for the CLI interface """ parser = argparse.ArgumentParser( description='aggregate bytes in/out from h5lmt file every hour') parser.add_argument('h5lmt', type=str, nargs='+', help='h5lmt file or basename to examine') parser.add_argument('--summary', action='store_true', help='print a summary of all output') group = parser.add_mutually_exclusive_group() group.add_argument('--bytes', action='store_true', help='print bytes') group.add_argument('--kibs', action='store_true', help='print in units of KiBs') group.add_argument('--mibs', action='store_true', help='print in units of MiBs') group.add_argument('--gibs', action='store_true', help='print in units of GiBs (default)') group.add_argument('--tibs', action='store_true', help='print in units of TiBs') parser.add_argument('--bins', type=int, default=24, help="number of bins per day") parser.add_argument('--start', type=str, default=None, help="date/time to start in %s format" % DATE_FMT_STR) parser.add_argument('--end', type=str, default=None, help="date/time to end in %s format" % DATE_FMT_STR) parser.add_argument('--json', action='store_true', help='return json output') args = parser.parse_args(argv) if args.bytes: units = 'byte' elif args.kibs: units = 'KiB' elif args.mibs: units = 'MiB' elif args.tibs: units = 'TiB' else: # default units units = 'GiB' # Figure out the list of HDF5 files to open if args.start and args.end: hdf5_basename = os.path.basename(args.h5lmt[0]) if len(args.h5lmt) > 1: warnings.warn("multiple h5lmt files specified with --start/--end; only using " + hdf5_basename) hdf5_filenames = [x[0] for x in tokio.tools.hdf5.get_files_and_indices( hdf5_basename, list(DATASETS_TO_BIN_KEYS.keys())[0], datetime.datetime.strptime(args.start, DATE_FMT), datetime.datetime.strptime(args.end, DATE_FMT))] else: hdf5_filenames = args.h5lmt # Process each HDF5 file and append its bins to a global dictionary all_binned_data = {} if not args.json: sys.stdout.write(print_datum(None, units=units)) for hdf5_filename in hdf5_filenames: hdf5_file = tokio.connectors.hdf5.Hdf5(hdf5_filename, 'r') binned_data = bin_datasets(hdf5_file=hdf5_file, dataset_names=list(DATASETS_TO_BIN_KEYS.keys()), orient='index', num_bins=args.bins) if not args.json: for timestamp in sorted(binned_data.keys()): print(print_datum(binned_data[timestamp], units=units).strip()) all_binned_data.update(binned_data) # Print binned results in the desired format if args.json: if args.summary: to_serialize = { 'bins': all_binned_data, 'summary': summarize_reduced_data(all_binned_data), } else: to_serialize = {'bins': all_binned_data} print(json.dumps(to_serialize, sort_keys=True, indent=4, cls=tokio.common.JSONEncoder)) elif args.summary: sys.stdout.write(print_data_summary(all_binned_data, units=units))