Source code for tokio.cli.archive_lmtdb

"""
Retrieve the contents of an LMT database and cache it locally.
"""

import sys
import datetime
import argparse
import warnings
import tokio.debug
import tokio.timeseries
import tokio.connectors.lmtdb
import tokio.connectors.hdf5
from tokio.common import isstr

DATE_FMT = "%Y-%m-%dT%H:%M:%S"
DATE_FMT_PRINT = "YYYY-MM-DDTHH:MM:SS"

SCHEMA_VERSION = "1"

[docs]class DatasetDict(dict): """A dictionary containing TimeSeries objects Contains the TimeSeries objects being populated from an LMT database. Implemented as a class so that a single object can store all of the TimeSeries objects that are generated by multiple method calls. """ def __init__(self, query_start, query_end, timestep, sort_hex=True, *args, **kwargs): super(DatasetDict, self).__init__(*args, **kwargs) self.query_start = query_start self.query_end = query_end self.timestep = timestep self.sort_hex = sort_hex self.config = { 'datatargets/readbytes': { "units": "bytes", "delta": True, "column": "READ_BYTES", }, 'datatargets/writebytes': { "units": "bytes", "delta": True, "column": "WRITE_BYTES", }, 'fullness/bytes': { "units": "KiB", "delta": False, "column": "KBYTES_USED", }, 'fullness/bytestotal': { "units": "KiB", "delta": False, }, 'fullness/inodes': { "units": "inodes", "delta": False, "column": "INODES_USED", }, 'fullness/inodestotal': { "units": "inodes", "delta": False, }, 'dataservers/cpuload': { "units": "%", "delta": False, "column": "PCT_CPU", }, 'dataservers/memused': { "units": "%", "delta": False, "column": "PCT_MEMORY", }, 'mdservers/cpuload': { "units": "%", "delta": False, "column": "PCT_CPU", }, 'mdtargets/opens': { "units": "ops", "delta": True, }, 'mdtargets/closes': { "units": "ops", "delta": True, }, 'mdtargets/mknods': { "units": "ops", "delta": True, }, 'mdtargets/links': { "units": "ops", "delta": True, }, 'mdtargets/unlinks': { "units": "ops", "delta": True, }, 'mdtargets/mkdirs': { "units": "ops", "delta": True, }, 'mdtargets/rmdirs': { "units": "ops", "delta": True, }, 'mdtargets/renames': { "units": "ops", "delta": True, }, 'mdtargets/getxattrs': { "units": "ops", "delta": True, }, 'mdtargets/statfss': { "units": "ops", "delta": True, }, 'mdtargets/setattrs': { "units": "ops", "delta": True, }, 'mdtargets/getattrs': { "units": "ops", "delta": True, }, } # Initialize raw datasets - extend query by one extra timestep so we can calculate deltas self.schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION) if self.schema is None: raise KeyError("Schema version %d is not known by connectors.hdf5" % SCHEMA_VERSION) # We need to query past the intended query range in order to calculate # the differences between each row its following row self.query_end_plusplus = self.query_end + datetime.timedelta(seconds=timestep)
[docs] def init_datasets(self, dataset_names, columns): """Populate empty datasets within self Creates and attachs TimeSeries objects to self based on a given column list Args: dataset_names (list of str): keys corresponding to self.config defining which datasets are being initialized columns (list of str): column names to use in the TimeSeries datasets being created """ for dataset_name in dataset_names: hdf5_dataset_name = self.schema.get(dataset_name) if hdf5_dataset_name is None: warnings.warn("Skipping %s (not in schema)" % dataset_name) else: self[dataset_name] = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name, start=self.query_start, end=self.query_end_plusplus, timestep=self.timestep, num_columns=len(columns), column_names=columns, sort_hex=self.sort_hex)
[docs] def finalize(self): """Convert datasets to deltas where necessary and tack on metadata Perform a few finishing actions to all datasets contained in self after they have been populated. Such actions are configured entirely in self.config and require no external input. """ self.convert_deltas(list(self.config.keys())) self.set_timeseries_metadata(list(self.config.keys()))
[docs] def convert_deltas(self, dataset_names): """Convert datasets from absolute values to values per timestep Given a list of dataset names, determine if they need to be converted from monotonically increasing counters to counts per timestep, and convert those that do. For those that don't, trim off the final row since it is not needed to calculate the difference between rows. Args: dataset_names (list of str): keys corresponding to self.config for the datasets to be converted/corrected """ for dataset_name in dataset_names: if dataset_name in self: if self.config[dataset_name].get('delta'): # Convert some datasets from absolute byte counts to deltas. # Note that align='l' changes the semantics of the # timestamps from right-aligned (which is what LMT provides) # to left-aligned (which is what the TOKIO Time Series # format assumes) self[dataset_name].convert_to_deltas(align='l') else: # Trim off the last row from the non-delta datasets to compensate # for initial over-sizing of the time range by an extra timestamp self[dataset_name].trim_rows(1)
[docs] def set_timeseries_metadata(self, dataset_names): """Set metadata constants (version, units, etc) on datasets and groups Args: dataset_names (list of str): keys corresponding to self.config for the datasets whose metadata should be set """ for dataset_name in dataset_names: if dataset_name in self: self[dataset_name].dataset_metadata.update({ 'version': SCHEMA_VERSION, 'units': self.config[dataset_name]['units'] }) self[dataset_name].group_metadata.update({'source': 'lmt'})
[docs] def archive_mds_data(self, lmtdb): """Extract and encode data from LMT's MDS_DATA table Queries the LMT database, interprets resulting rows, and populates a dictionary of TimeSeries objects with those values. Args: lmtdb (LmtDb): database object """ dataset_names = [ 'mdservers/cpuload', ] self.init_datasets(dataset_names, lmtdb.mds_names) # Now query the MDS_DATA table to get byte counts over the query time range results, columns = lmtdb.get_mds_data(self.query_start, self.query_end_plusplus) # Index the columns to speed up insertion of data col_map = {} try: for db_col in ['TIMESTAMP', 'MDS_ID', 'PCT_CPU']: col_map[db_col] = columns.index(db_col) except ValueError: raise ValueError("LMT database schema does not match expectation") # Loop through all the results of the timeseries query for row in results: if isstr(row[col_map['TIMESTAMP']]): # SQLite stores timestamps as a unicode string timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']], "%Y-%m-%d %H:%M:%S") else: # MySQL timestamps are automatically converted to datetime.datetime timestamp = row[col_map['TIMESTAMP']] target_name = lmtdb.mds_id_map[row[col_map['MDS_ID']]] for dataset_name in dataset_names: target_dbcol = self.config[dataset_name].get('column') # target_dbcol=PCT_CPU, target_name=snx11025n022 if target_dbcol is not None: self[dataset_name].insert_element( timestamp, target_name, row[col_map[target_dbcol]]) else: errmsg = "%s in self.config but missing 'column' setting" % dataset_name raise KeyError(errmsg)
[docs] def archive_mds_ops_data(self, lmtdb): """Extract and encode data from LMT's MDS_OPS_DATA table Queries the LMT database, interprets resulting rows, and populates a dictionary of TimeSeries objects with those values. Avoids JOINing the MDS_VARIABLE_INFO table and instead uses an internal mapping of OPERATION_IDs to demultiplex the data in MDS_OPS_DATA into different HDF5 datasets. Args: lmtdb (LmtDb): database object """ # mapping between OPERATION_INFO.OPERATION_NAME to HDF5 dataset names opname_to_dataset_name = { 'open': 'mdtargets/opens', 'close': 'mdtargets/closes', 'mknod': 'mdtargets/mknods', 'link': 'mdtargets/links', 'unlink': 'mdtargets/unlinks', 'mkdir': 'mdtargets/mkdirs', 'rmdir': 'mdtargets/rmdirs', 'rename': 'mdtargets/renames', 'getxattr': 'mdtargets/getxattrs', 'statfs': 'mdtargets/statfss', 'setattr': 'mdtargets/setattrs', 'getattr': 'mdtargets/getattrs', } dataset_names = list(opname_to_dataset_name.values()) self.init_datasets(dataset_names, lmtdb.mds_names) results, columns = lmtdb.get_mds_ops_data(self.query_start, self.query_end_plusplus) # Index the columns to speed up insertion of data col_map = {} try: for db_col in ['TIMESTAMP', 'MDS_ID', 'OPERATION_ID', 'SAMPLES']: col_map[db_col] = columns.index(db_col) except ValueError: raise ValueError("LMT database schema does not match expectation") # Loop through all the results of the timeseries query for row in results: if isstr(row[col_map['TIMESTAMP']]): # SQLite stores timestamps as a unicode string timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']], "%Y-%m-%d %H:%M:%S") else: # MySQL timestamps are automatically converted to datetime.datetime timestamp = row[col_map['TIMESTAMP']] # figure out the dataset this row's data will go into (this # implicitly filters out operations that aren't defined in # opname_to_dataset_name) op_name = lmtdb.mds_op_id_map[row[col_map['OPERATION_ID']]] dataset_name = opname_to_dataset_name.get(op_name) if dataset_name is None: continue # figure out which column (MDS name) this row's data will go into mds_name = lmtdb.mds_id_map.get(row[col_map['MDS_ID']]) if not mds_name: errmsg = "unknown MDS_ID %s" % row[col_map['MDS_ID']] warnings.warn(errmsg) continue self[dataset_name].insert_element( timestamp, mds_name, row[col_map['SAMPLES']])
[docs] def archive_oss_data(self, lmtdb): """Extract and encode data from LMT's OSS_DATA table Queries the LMT database, interprets resulting rows, and populates a dictionary of TimeSeries objects with those values. Args: lmtdb (LmtDb): database object """ dataset_names = [ 'dataservers/cpuload', 'dataservers/memused', ] self.init_datasets(dataset_names, lmtdb.oss_names) # Now query the OSS_DATA table to get byte counts over the query time range results, columns = lmtdb.get_oss_data(self.query_start, self.query_end_plusplus) # Index the columns to speed up insertion of data col_map = {} try: for db_col in ['TIMESTAMP', 'OSS_ID', 'PCT_CPU', 'PCT_MEMORY']: col_map[db_col] = columns.index(db_col) except ValueError: raise ValueError("LMT database schema does not match expectation") # Loop through all the results of the timeseries query for row in results: if isstr(row[col_map['TIMESTAMP']]): # SQLite stores timestamps as a unicode string timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']], "%Y-%m-%d %H:%M:%S") else: # MySQL timestamps are automatically converted to datetime.datetime timestamp = row[col_map['TIMESTAMP']] target_name = lmtdb.oss_id_map[row[col_map['OSS_ID']]] for dataset_name in dataset_names: target_dbcol = self.config[dataset_name].get('column') # target_dbcol=PCT_CPU, target_name=snx11025n022 if target_dbcol is not None: self[dataset_name].insert_element( timestamp, target_name, row[col_map[target_dbcol]]) else: errmsg = "%s in self.config but missing 'column' setting" % dataset_name raise KeyError(errmsg)
[docs] def archive_ost_data(self, lmtdb): """Extract and encode data from LMT's OST_DATA table Queries the LMT database, interprets resulting rows, and populates a dictionary of TimeSeries objects with those values. Args: lmtdb (LmtDb): database object """ dataset_names = [ 'datatargets/readbytes', 'datatargets/writebytes', 'fullness/bytes', 'fullness/bytestotal', 'fullness/inodes', 'fullness/inodestotal' ] self.init_datasets(dataset_names, lmtdb.ost_names) # Now query the OST_DATA table to get byte counts over the query time range results, columns = lmtdb.get_ost_data(self.query_start, self.query_end_plusplus) # Index the columns to speed up insertion of data col_map = {} try: for db_col in ['TIMESTAMP', 'OST_ID', 'READ_BYTES', 'WRITE_BYTES', 'KBYTES_USED', 'KBYTES_FREE', 'INODES_USED', 'INODES_FREE']: col_map[db_col] = columns.index(db_col) except ValueError: raise ValueError("LMT database schema does not match expectation") # Loop through all the results of the timeseries query for row in results: if isstr(row[col_map['TIMESTAMP']]): # SQLite stores timestamps as a unicode string timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']], "%Y-%m-%d %H:%M:%S") else: # MySQL timestamps are automatically converted to datetime.datetime timestamp = row[col_map['TIMESTAMP']] target_name = lmtdb.ost_id_map[row[col_map['OST_ID']]] for dataset_name in dataset_names: target_dbcol = self.config[dataset_name].get('column') if target_dbcol is not None: self[dataset_name].insert_element( timestamp, target_name, row[col_map[target_dbcol]]) elif dataset_name == 'fullness/bytestotal': self[dataset_name].insert_element( timestamp, target_name, row[col_map['KBYTES_USED']] + row[col_map['KBYTES_FREE']]) elif dataset_name == 'fullness/inodestotal': self[dataset_name].insert_element( timestamp, target_name, row[col_map['INODES_USED']] + row[col_map['INODES_FREE']]) else: errmsg = "%s in self.config but missing 'column' setting" % dataset_name raise KeyError(errmsg)
[docs]def init_hdf5_file(datasets, init_start, init_end, hdf5_file): """ Initialize the datasets at full dimensions in the HDF5 file if necessary """ schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION) for dataset_name, dataset in datasets.items(): hdf5_dataset_name = schema.get(dataset_name) if hdf5_dataset_name is None: if '/_' not in dataset_name: warnings.warn("Dataset key %s is not in schema" % dataset_name) continue if hdf5_dataset_name not in hdf5_file: # attempt to convert dataset into a timeseries timeseries = hdf5_file.to_timeseries(dataset_name=hdf5_dataset_name) # if dataset -> timeseries failed, create and commit a new, empty timeseries if timeseries is None: timeseries = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name, start=init_start, end=init_end, timestep=dataset.timestep, num_columns=dataset.dataset.shape[1]) hdf5_file.commit_timeseries(timeseries=timeseries) print("Initialized %s in %s with size %s" % ( hdf5_dataset_name, hdf5_file.name, timeseries.dataset.shape))
[docs]def archive_lmtdb(lmtdb, init_start, init_end, timestep, output_file, query_start, query_end): """ Given a start and end time, retrieve all of the relevant contents of an LMT database. """ datasets = DatasetDict(query_start, query_end, timestep) datasets.archive_ost_data(lmtdb) datasets.archive_oss_data(lmtdb) datasets.archive_mds_data(lmtdb) datasets.archive_mds_ops_data(lmtdb) datasets.finalize() with tokio.connectors.hdf5.Hdf5(output_file) as hdf5_file: hdf5_file.attrs['version'] = SCHEMA_VERSION init_hdf5_file(datasets, init_start, init_end, hdf5_file) for dataset in datasets.values(): print("Writing out %s" % dataset.dataset_name) hdf5_file.commit_timeseries(dataset) tokio.debug.debug_print("Wrote output to %s" % output_file)
[docs]def main(argv=None): """Entry point for the CLI interface """ parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", type=str, default=None, help="input cache db file") parser.add_argument("-o", "--output", type=str, default='output.hdf5', help="output file (default: output.hdf5)") parser.add_argument('--init-start', type=str, default=None, help='first timestamp (inclusive) when creating new output file,' + ' in %s format (default: same as start)' % DATE_FMT_PRINT) parser.add_argument('--init-end', type=str, default=None, help='final timestamp (exclusive) when creating new output file,' + ' in %s format (default: same as end)' % DATE_FMT_PRINT) parser.add_argument('--debug', action='store_true', help="produce debug messages") parser.add_argument('--timestep', type=int, default=5, help='collection frequency, in seconds (default: 5)') parser.add_argument("--host", type=str, default=None, help="database hostname") parser.add_argument("--user", type=str, default=None, help="database user") parser.add_argument("--password", type=str, default=None, help="database password") parser.add_argument("--database", type=str, default=None, help="database name") parser.add_argument("query_start", type=str, help="start time in %s format" % DATE_FMT_PRINT) parser.add_argument("query_end", type=str, help="end time in %s format" % DATE_FMT_PRINT) args = parser.parse_args(argv) if args.debug: tokio.debug.DEBUG = True # Convert CLI options into datetime try: query_start = datetime.datetime.strptime(args.query_start, DATE_FMT) query_end = datetime.datetime.strptime(args.query_end, DATE_FMT) init_start = query_start init_end = query_end if args.init_start: init_start = datetime.datetime.strptime(args.init_start, DATE_FMT) if args.init_end: init_end = datetime.datetime.strptime(args.init_end, DATE_FMT) except ValueError: sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT) raise # Basic input bounds checking if query_start >= query_end: raise Exception('query_start >= query_end') elif init_start >= init_end: raise Exception('query_start >= query_end') elif args.timestep < 1: raise Exception('--timestep must be > 0') if args.input is not None: lmtdb = tokio.connectors.lmtdb.LmtDb(cache_file=args.input) else: lmtdb = tokio.connectors.lmtdb.LmtDb( dbhost=args.host, dbuser=args.user, dbpassword=args.password, dbname=args.database) archive_lmtdb(lmtdb=lmtdb, init_start=init_start, init_end=init_end, timestep=args.timestep, output_file=args.output, query_start=query_start, query_end=query_end)