"""
Retrieve the contents of an LMT database and cache it locally.
"""
import sys
import datetime
import argparse
import warnings
import tokio.debug
import tokio.timeseries
import tokio.connectors.lmtdb
import tokio.connectors.hdf5
from tokio.common import isstr
DATE_FMT = "%Y-%m-%dT%H:%M:%S"
DATE_FMT_PRINT = "YYYY-MM-DDTHH:MM:SS"
SCHEMA_VERSION = "1"
[docs]class DatasetDict(dict):
"""A dictionary containing TimeSeries objects
Contains the TimeSeries objects being populated from an LMT database.
Implemented as a class so that a single object can store all of the
TimeSeries objects that are generated by multiple method calls.
"""
def __init__(self, query_start, query_end, timestep, sort_hex=True, *args, **kwargs):
super(DatasetDict, self).__init__(*args, **kwargs)
self.query_start = query_start
self.query_end = query_end
self.timestep = timestep
self.sort_hex = sort_hex
self.config = {
'datatargets/readbytes': {
"units": "bytes",
"delta": True,
"column": "READ_BYTES",
},
'datatargets/writebytes': {
"units": "bytes",
"delta": True,
"column": "WRITE_BYTES",
},
'fullness/bytes': {
"units": "KiB",
"delta": False,
"column": "KBYTES_USED",
},
'fullness/bytestotal': {
"units": "KiB",
"delta": False,
},
'fullness/inodes': {
"units": "inodes",
"delta": False,
"column": "INODES_USED",
},
'fullness/inodestotal': {
"units": "inodes",
"delta": False,
},
'dataservers/cpuload': {
"units": "%",
"delta": False,
"column": "PCT_CPU",
},
'dataservers/memused': {
"units": "%",
"delta": False,
"column": "PCT_MEMORY",
},
'mdservers/cpuload': {
"units": "%",
"delta": False,
"column": "PCT_CPU",
},
'mdtargets/opens': {
"units": "ops",
"delta": True,
},
'mdtargets/closes': {
"units": "ops",
"delta": True,
},
'mdtargets/mknods': {
"units": "ops",
"delta": True,
},
'mdtargets/links': {
"units": "ops",
"delta": True,
},
'mdtargets/unlinks': {
"units": "ops",
"delta": True,
},
'mdtargets/mkdirs': {
"units": "ops",
"delta": True,
},
'mdtargets/rmdirs': {
"units": "ops",
"delta": True,
},
'mdtargets/renames': {
"units": "ops",
"delta": True,
},
'mdtargets/getxattrs': {
"units": "ops",
"delta": True,
},
'mdtargets/statfss': {
"units": "ops",
"delta": True,
},
'mdtargets/setattrs': {
"units": "ops",
"delta": True,
},
'mdtargets/getattrs': {
"units": "ops",
"delta": True,
},
}
# Initialize raw datasets - extend query by one extra timestep so we can calculate deltas
self.schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION)
if self.schema is None:
raise KeyError("Schema version %d is not known by connectors.hdf5" % SCHEMA_VERSION)
# We need to query past the intended query range in order to calculate
# the differences between each row its following row
self.query_end_plusplus = self.query_end + datetime.timedelta(seconds=timestep)
[docs] def init_datasets(self, dataset_names, columns):
"""Populate empty datasets within self
Creates and attachs TimeSeries objects to self based on a given column list
Args:
dataset_names (list of str): keys corresponding to self.config
defining which datasets are being initialized
columns (list of str): column names to use in the TimeSeries
datasets being created
"""
for dataset_name in dataset_names:
hdf5_dataset_name = self.schema.get(dataset_name)
if hdf5_dataset_name is None:
warnings.warn("Skipping %s (not in schema)" % dataset_name)
else:
self[dataset_name] = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name,
start=self.query_start,
end=self.query_end_plusplus,
timestep=self.timestep,
num_columns=len(columns),
column_names=columns,
sort_hex=self.sort_hex)
[docs] def finalize(self):
"""Convert datasets to deltas where necessary and tack on metadata
Perform a few finishing actions to all datasets contained in self after
they have been populated. Such actions are configured entirely in
self.config and require no external input.
"""
self.convert_deltas(list(self.config.keys()))
self.set_timeseries_metadata(list(self.config.keys()))
[docs] def convert_deltas(self, dataset_names):
"""Convert datasets from absolute values to values per timestep
Given a list of dataset names, determine if they need to be converted
from monotonically increasing counters to counts per timestep, and
convert those that do. For those that don't, trim off the final row
since it is not needed to calculate the difference between rows.
Args:
dataset_names (list of str): keys corresponding to self.config for
the datasets to be converted/corrected
"""
for dataset_name in dataset_names:
if dataset_name in self:
if self.config[dataset_name].get('delta'):
# Convert some datasets from absolute byte counts to deltas.
# Note that align='l' changes the semantics of the
# timestamps from right-aligned (which is what LMT provides)
# to left-aligned (which is what the TOKIO Time Series
# format assumes)
self[dataset_name].convert_to_deltas(align='l')
else:
# Trim off the last row from the non-delta datasets to compensate
# for initial over-sizing of the time range by an extra timestamp
self[dataset_name].trim_rows(1)
[docs] def archive_mds_data(self, lmtdb):
"""Extract and encode data from LMT's MDS_DATA table
Queries the LMT database, interprets resulting rows, and populates a
dictionary of TimeSeries objects with those values.
Args:
lmtdb (LmtDb): database object
"""
dataset_names = [
'mdservers/cpuload',
]
self.init_datasets(dataset_names, lmtdb.mds_names)
# Now query the MDS_DATA table to get byte counts over the query time range
results, columns = lmtdb.get_mds_data(self.query_start, self.query_end_plusplus)
# Index the columns to speed up insertion of data
col_map = {}
try:
for db_col in ['TIMESTAMP', 'MDS_ID', 'PCT_CPU']:
col_map[db_col] = columns.index(db_col)
except ValueError:
raise ValueError("LMT database schema does not match expectation")
# Loop through all the results of the timeseries query
for row in results:
if isstr(row[col_map['TIMESTAMP']]):
# SQLite stores timestamps as a unicode string
timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']],
"%Y-%m-%d %H:%M:%S")
else:
# MySQL timestamps are automatically converted to datetime.datetime
timestamp = row[col_map['TIMESTAMP']]
target_name = lmtdb.mds_id_map[row[col_map['MDS_ID']]]
for dataset_name in dataset_names:
target_dbcol = self.config[dataset_name].get('column')
# target_dbcol=PCT_CPU, target_name=snx11025n022
if target_dbcol is not None:
self[dataset_name].insert_element(
timestamp,
target_name,
row[col_map[target_dbcol]])
else:
errmsg = "%s in self.config but missing 'column' setting" % dataset_name
raise KeyError(errmsg)
[docs] def archive_mds_ops_data(self, lmtdb):
"""Extract and encode data from LMT's MDS_OPS_DATA table
Queries the LMT database, interprets resulting rows, and populates a
dictionary of TimeSeries objects with those values. Avoids JOINing the
MDS_VARIABLE_INFO table and instead uses an internal mapping of
OPERATION_IDs to demultiplex the data in MDS_OPS_DATA into different
HDF5 datasets.
Args:
lmtdb (LmtDb): database object
"""
# mapping between OPERATION_INFO.OPERATION_NAME to HDF5 dataset names
opname_to_dataset_name = {
'open': 'mdtargets/opens',
'close': 'mdtargets/closes',
'mknod': 'mdtargets/mknods',
'link': 'mdtargets/links',
'unlink': 'mdtargets/unlinks',
'mkdir': 'mdtargets/mkdirs',
'rmdir': 'mdtargets/rmdirs',
'rename': 'mdtargets/renames',
'getxattr': 'mdtargets/getxattrs',
'statfs': 'mdtargets/statfss',
'setattr': 'mdtargets/setattrs',
'getattr': 'mdtargets/getattrs',
}
dataset_names = list(opname_to_dataset_name.values())
self.init_datasets(dataset_names, lmtdb.mds_names)
results, columns = lmtdb.get_mds_ops_data(self.query_start, self.query_end_plusplus)
# Index the columns to speed up insertion of data
col_map = {}
try:
for db_col in ['TIMESTAMP', 'MDS_ID', 'OPERATION_ID', 'SAMPLES']:
col_map[db_col] = columns.index(db_col)
except ValueError:
raise ValueError("LMT database schema does not match expectation")
# Loop through all the results of the timeseries query
for row in results:
if isstr(row[col_map['TIMESTAMP']]):
# SQLite stores timestamps as a unicode string
timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']],
"%Y-%m-%d %H:%M:%S")
else:
# MySQL timestamps are automatically converted to datetime.datetime
timestamp = row[col_map['TIMESTAMP']]
# figure out the dataset this row's data will go into (this
# implicitly filters out operations that aren't defined in
# opname_to_dataset_name)
op_name = lmtdb.mds_op_id_map[row[col_map['OPERATION_ID']]]
dataset_name = opname_to_dataset_name.get(op_name)
if dataset_name is None:
continue
# figure out which column (MDS name) this row's data will go into
mds_name = lmtdb.mds_id_map.get(row[col_map['MDS_ID']])
if not mds_name:
errmsg = "unknown MDS_ID %s" % row[col_map['MDS_ID']]
warnings.warn(errmsg)
continue
self[dataset_name].insert_element(
timestamp,
mds_name,
row[col_map['SAMPLES']])
[docs] def archive_oss_data(self, lmtdb):
"""Extract and encode data from LMT's OSS_DATA table
Queries the LMT database, interprets resulting rows, and populates a
dictionary of TimeSeries objects with those values.
Args:
lmtdb (LmtDb): database object
"""
dataset_names = [
'dataservers/cpuload',
'dataservers/memused',
]
self.init_datasets(dataset_names, lmtdb.oss_names)
# Now query the OSS_DATA table to get byte counts over the query time range
results, columns = lmtdb.get_oss_data(self.query_start, self.query_end_plusplus)
# Index the columns to speed up insertion of data
col_map = {}
try:
for db_col in ['TIMESTAMP', 'OSS_ID', 'PCT_CPU', 'PCT_MEMORY']:
col_map[db_col] = columns.index(db_col)
except ValueError:
raise ValueError("LMT database schema does not match expectation")
# Loop through all the results of the timeseries query
for row in results:
if isstr(row[col_map['TIMESTAMP']]):
# SQLite stores timestamps as a unicode string
timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']],
"%Y-%m-%d %H:%M:%S")
else:
# MySQL timestamps are automatically converted to datetime.datetime
timestamp = row[col_map['TIMESTAMP']]
target_name = lmtdb.oss_id_map[row[col_map['OSS_ID']]]
for dataset_name in dataset_names:
target_dbcol = self.config[dataset_name].get('column')
# target_dbcol=PCT_CPU, target_name=snx11025n022
if target_dbcol is not None:
self[dataset_name].insert_element(
timestamp,
target_name,
row[col_map[target_dbcol]])
else:
errmsg = "%s in self.config but missing 'column' setting" % dataset_name
raise KeyError(errmsg)
[docs] def archive_ost_data(self, lmtdb):
"""Extract and encode data from LMT's OST_DATA table
Queries the LMT database, interprets resulting rows, and populates a
dictionary of TimeSeries objects with those values.
Args:
lmtdb (LmtDb): database object
"""
dataset_names = [
'datatargets/readbytes',
'datatargets/writebytes',
'fullness/bytes',
'fullness/bytestotal',
'fullness/inodes',
'fullness/inodestotal'
]
self.init_datasets(dataset_names, lmtdb.ost_names)
# Now query the OST_DATA table to get byte counts over the query time range
results, columns = lmtdb.get_ost_data(self.query_start, self.query_end_plusplus)
# Index the columns to speed up insertion of data
col_map = {}
try:
for db_col in ['TIMESTAMP', 'OST_ID', 'READ_BYTES',
'WRITE_BYTES', 'KBYTES_USED', 'KBYTES_FREE',
'INODES_USED', 'INODES_FREE']:
col_map[db_col] = columns.index(db_col)
except ValueError:
raise ValueError("LMT database schema does not match expectation")
# Loop through all the results of the timeseries query
for row in results:
if isstr(row[col_map['TIMESTAMP']]):
# SQLite stores timestamps as a unicode string
timestamp = datetime.datetime.strptime(row[col_map['TIMESTAMP']],
"%Y-%m-%d %H:%M:%S")
else:
# MySQL timestamps are automatically converted to datetime.datetime
timestamp = row[col_map['TIMESTAMP']]
target_name = lmtdb.ost_id_map[row[col_map['OST_ID']]]
for dataset_name in dataset_names:
target_dbcol = self.config[dataset_name].get('column')
if target_dbcol is not None:
self[dataset_name].insert_element(
timestamp,
target_name,
row[col_map[target_dbcol]])
elif dataset_name == 'fullness/bytestotal':
self[dataset_name].insert_element(
timestamp,
target_name,
row[col_map['KBYTES_USED']] + row[col_map['KBYTES_FREE']])
elif dataset_name == 'fullness/inodestotal':
self[dataset_name].insert_element(
timestamp,
target_name,
row[col_map['INODES_USED']] + row[col_map['INODES_FREE']])
else:
errmsg = "%s in self.config but missing 'column' setting" % dataset_name
raise KeyError(errmsg)
[docs]def init_hdf5_file(datasets, init_start, init_end, hdf5_file):
"""
Initialize the datasets at full dimensions in the HDF5 file if necessary
"""
schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION)
for dataset_name, dataset in datasets.items():
hdf5_dataset_name = schema.get(dataset_name)
if hdf5_dataset_name is None:
if '/_' not in dataset_name:
warnings.warn("Dataset key %s is not in schema" % dataset_name)
continue
if hdf5_dataset_name not in hdf5_file:
# attempt to convert dataset into a timeseries
timeseries = hdf5_file.to_timeseries(dataset_name=hdf5_dataset_name)
# if dataset -> timeseries failed, create and commit a new, empty timeseries
if timeseries is None:
timeseries = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name,
start=init_start,
end=init_end,
timestep=dataset.timestep,
num_columns=dataset.dataset.shape[1])
hdf5_file.commit_timeseries(timeseries=timeseries)
print("Initialized %s in %s with size %s" % (
hdf5_dataset_name,
hdf5_file.name,
timeseries.dataset.shape))
[docs]def archive_lmtdb(lmtdb, init_start, init_end, timestep, output_file, query_start, query_end):
"""
Given a start and end time, retrieve all of the relevant contents of an LMT
database.
"""
datasets = DatasetDict(query_start, query_end, timestep)
datasets.archive_ost_data(lmtdb)
datasets.archive_oss_data(lmtdb)
datasets.archive_mds_data(lmtdb)
datasets.archive_mds_ops_data(lmtdb)
datasets.finalize()
with tokio.connectors.hdf5.Hdf5(output_file) as hdf5_file:
hdf5_file.attrs['version'] = SCHEMA_VERSION
init_hdf5_file(datasets, init_start, init_end, hdf5_file)
for dataset in datasets.values():
print("Writing out %s" % dataset.dataset_name)
hdf5_file.commit_timeseries(dataset)
tokio.debug.debug_print("Wrote output to %s" % output_file)
[docs]def main(argv=None):
"""Entry point for the CLI interface
"""
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=str, default=None,
help="input cache db file")
parser.add_argument("-o", "--output", type=str, default='output.hdf5',
help="output file (default: output.hdf5)")
parser.add_argument('--init-start', type=str, default=None,
help='first timestamp (inclusive) when creating new output file,' +
' in %s format (default: same as start)' % DATE_FMT_PRINT)
parser.add_argument('--init-end', type=str, default=None,
help='final timestamp (exclusive) when creating new output file,' +
' in %s format (default: same as end)' % DATE_FMT_PRINT)
parser.add_argument('--debug', action='store_true', help="produce debug messages")
parser.add_argument('--timestep', type=int, default=5,
help='collection frequency, in seconds (default: 5)')
parser.add_argument("--host", type=str, default=None, help="database hostname")
parser.add_argument("--user", type=str, default=None, help="database user")
parser.add_argument("--password", type=str, default=None, help="database password")
parser.add_argument("--database", type=str, default=None, help="database name")
parser.add_argument("query_start", type=str, help="start time in %s format" % DATE_FMT_PRINT)
parser.add_argument("query_end", type=str, help="end time in %s format" % DATE_FMT_PRINT)
args = parser.parse_args(argv)
if args.debug:
tokio.debug.DEBUG = True
# Convert CLI options into datetime
try:
query_start = datetime.datetime.strptime(args.query_start, DATE_FMT)
query_end = datetime.datetime.strptime(args.query_end, DATE_FMT)
init_start = query_start
init_end = query_end
if args.init_start:
init_start = datetime.datetime.strptime(args.init_start, DATE_FMT)
if args.init_end:
init_end = datetime.datetime.strptime(args.init_end, DATE_FMT)
except ValueError:
sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT)
raise
# Basic input bounds checking
if query_start >= query_end:
raise Exception('query_start >= query_end')
elif init_start >= init_end:
raise Exception('query_start >= query_end')
elif args.timestep < 1:
raise Exception('--timestep must be > 0')
if args.input is not None:
lmtdb = tokio.connectors.lmtdb.LmtDb(cache_file=args.input)
else:
lmtdb = tokio.connectors.lmtdb.LmtDb(
dbhost=args.host,
dbuser=args.user,
dbpassword=args.password,
dbname=args.database)
archive_lmtdb(lmtdb=lmtdb,
init_start=init_start,
init_end=init_end,
timestep=args.timestep,
output_file=args.output,
query_start=query_start,
query_end=query_end)