"""Retrieves ESnet SNMP counters and store them in TOKIO Timeseries format
"""
import sys
import datetime
import argparse
import warnings
import tokio.debug
import tokio.config
import tokio.timeseries
import tokio.connectors.esnet_snmp
import tokio.connectors.hdf5
DATE_FMT = "%Y-%m-%dT%H:%M:%S"
DATE_FMT_PRINT = "YYYY-MM-DDTHH:MM:SS"
SCHEMA_VERSION = "1"
[docs]def endpoint_name(endpoint, interface):
"""Create a single key from an endpoint, interface pair
Args:
endpoint (str): The name of an endpoint
interface (str): The interface on the given endpoint
Returns:
str: A single key combining endpoint and interface
"""
return "%s:%s" % (endpoint, interface)
[docs]class Archiver(dict):
"""A dictionary containing TimeSeries objects
Contains the TimeSeries objects being populated from a remote data source.
Implemented as a class so that a single object can store all of the
TimeSeries objects that are generated by multiple method calls.
"""
[docs] def __init__(self, query_start, query_end, interfaces, timestep, timeout=30.0, *args, **kwargs):
"""Initializes the archiver and stores its settings
Args:
query_start (datetime.datetime): Lower bound of time to be archived,
inclusive
query_end (datetime.datetime): Upper bound of time to be archived,
inclusive
interfaces (list of tuples): List of endpoints and interfaces to
archive. Each tuple is of the form (endpoint, interface).
timestep (int): Number of seconds between successive data points.
The ESnet service may not honor this request.
timeout (float): Seconds before HTTP connection times out
"""
super(Archiver, self).__init__(*args, **kwargs)
self.query_start = query_start
self.query_end = query_end
self.timestep = timestep
self.interfaces = interfaces
self.timeout = timeout
self.config = {
'datatargets/readrates': {
"units": "bytes/sec",
"direction": "in",
},
'datatargets/writerates': {
"units": "bytes/sec",
"direction": "out",
},
}
self.columns = [endpoint_name(tup[0], tup[1]) for tup in self.interfaces]
self.schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION)
if self.schema is None:
raise KeyError("Schema version %d is not known by connectors.hdf5" % SCHEMA_VERSION)
[docs] def init_datasets(self, dataset_names):
"""Populate empty datasets within self
Creates and attachs TimeSeries objects to self based on a given column list
Args:
dataset_names (list of str): keys corresponding to self.config
defining which datasets are being initialized
"""
for dataset_name in dataset_names:
hdf5_dataset_name = self.schema.get(dataset_name)
if hdf5_dataset_name is None:
warnings.warn("Skipping %s (not in schema)" % dataset_name)
else:
self[dataset_name] = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name,
start=self.query_start,
end=self.query_end,
timestep=self.timestep,
num_columns=len(self.columns),
column_names=self.columns)
[docs] def finalize(self):
"""Convert datasets to deltas where necessary and tack on metadata
Perform a few finishing actions to all datasets contained in self after
they have been populated. Such actions are configured entirely in
self.config and require no external input.
"""
self.set_timeseries_metadata(list(self.config.keys()))
[docs] def archive(self, input_file=None):
"""Extract and encode data from ESnet's SNMP service
Queries the ESnet SNMP REST service, interprets resulting data, and
populates a dictionary of TimeSeries objects with those values.
Args:
esnetsnmp (tokio.connectors.esnet_snmp.EsnetSnmp): Connector instance
"""
esnetsnmp = tokio.connectors.esnet_snmp.EsnetSnmp(
start=self.query_start,
end=self.query_end,
input_file=input_file)
self.init_datasets(self.config.keys())
# Retrieve all counters from REST API if not loaded from cache
if input_file is None:
last_payload_ck = None
for endpoint, interface in self.interfaces:
for direction in 'in', 'out':
raw_result = esnetsnmp.get_interface_counters(
endpoint=endpoint,
interface=interface,
direction=direction,
agg_func='average',
interval=self.timestep,
timeout=self.timeout)
# verify that the payload returned data and didn't hit an API rate limit
payload = raw_result.get('data')
if not payload:
raise RuntimeError("%s:%s:%s returned no data" % (endpoint, interface, direction))
# verify that the payload returned a consistent number of data points
this_payload_ck = len(payload)
if last_payload_ck is not None and this_payload_ck != last_payload_ck:
raise RuntimeError(
"%s:%s:%s returned %d entries, but previous contained %d"
% (endpoint, interface, direction, this_payload_ck, last_payload_ck))
# Loop over all retrieved data
for dataset_name, config in self.config.items():
direction = config['direction']
for endpoint, interface in self.interfaces:
for timestamp_int, bytes_sec in esnetsnmp[endpoint][interface][direction].items():
timestamp = datetime.datetime.fromtimestamp(timestamp_int)
target_name = endpoint_name(endpoint, interface)
self[dataset_name].insert_element(
timestamp,
target_name,
bytes_sec)
[docs]def init_hdf5_file(datasets, init_start, init_end, hdf5_file):
"""
Initialize the datasets at full dimensions in the HDF5 file if necessary
"""
schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION)
for dataset_name, dataset in datasets.items():
hdf5_dataset_name = schema.get(dataset_name)
if hdf5_dataset_name is None:
if '/_' not in dataset_name:
warnings.warn("Dataset key %s is not in schema" % dataset_name)
continue
if hdf5_dataset_name not in hdf5_file:
# attempt to convert dataset into a timeseries
timeseries = hdf5_file.to_timeseries(dataset_name=hdf5_dataset_name)
# if dataset -> timeseries failed, create and commit a new, empty timeseries
if timeseries is None:
timeseries = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name,
start=init_start,
end=init_end,
timestep=dataset.timestep,
num_columns=dataset.dataset.shape[1])
hdf5_file.commit_timeseries(timeseries=timeseries)
print("Initialized %s in %s with size %s" % (
hdf5_dataset_name,
hdf5_file.name,
timeseries.dataset.shape))
[docs]def archive_esnet_snmp(init_start, init_end, interfaces, timestep, output_file, query_start, query_end, input_file=None, **kwargs):
"""Retrieves remote data and stores it in TOKIO time series format
Given a start and end time, retrieves all of the relevant contents of a
remote data source and encodes them in the TOKIO time series HDF5 data
format.
Args:
init_start (datetime.datetime): The first timestamp to be included in
the HDF5 file
init_end (datetime.datetime): The timestamp following the last timestamp
to be included in the HDF5 file.
interfaces (list of tuples): List of (endpoint, interface) elements to
query.
timestep (int): Number of seconds between successive entries in the HDF5
file to be created.
output_file (str): Path to the file to be created.
query_start (datetime.datetime): Time after which remote data should be
retrieved, inclusive.
query_end (datetime.datetime): Time before which remote data should be
retrieved, inclusive.
input_file (str or None): Path to a cached input. If specified, the
remote REST API will not be contacted and the contents of this file
will be instead loaded.
kwargs (dict): Extra arguments to be passed to Archiver.__init__()
"""
datasets = Archiver(query_start=query_start, query_end=query_end, interfaces=interfaces, timestep=timestep, **kwargs)
datasets.archive(input_file=input_file)
datasets.finalize()
with tokio.connectors.hdf5.Hdf5(output_file) as hdf5_file:
hdf5_file.attrs['version'] = SCHEMA_VERSION
init_hdf5_file(datasets, init_start, init_end, hdf5_file)
for dataset in datasets.values():
print("Writing out %s" % dataset.dataset_name)
hdf5_file.commit_timeseries(dataset)
tokio.debug.debug_print("Wrote output to %s" % output_file)
[docs]def main(argv=None):
"""Entry point for the CLI interface
"""
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=str, default=None,
help="input cache db file")
parser.add_argument("-o", "--output", type=str, default='output.hdf5',
help="output file (default: output.hdf5)")
parser.add_argument('--init-start', type=str, default=None,
help='first timestamp (inclusive) when creating new output file,' +
' in %s format (default: same as start)' % DATE_FMT_PRINT)
parser.add_argument('--init-end', type=str, default=None,
help='final timestamp (exclusive) when creating new output file,' +
' in %s format (default: same as end)' % DATE_FMT_PRINT)
parser.add_argument('--debug', action='store_true', help="produce debug messages")
parser.add_argument('--timestep', type=int, default=30,
help='collection frequency, in seconds (default: 30)')
parser.add_argument("--timeout", type=float, default=30.0,
help="connection timeout, in seconds (default: 30 sec)")
parser.add_argument("query_start", type=str,
help="start time in %s format" % DATE_FMT_PRINT)
parser.add_argument("query_end", type=str,
help="end time in %s format" % DATE_FMT_PRINT)
parser.add_argument("endpoints", type=str,
help='endpoint(s) to process, either a center name'
+ ' ("nersc") or comma-separated list of format'
+ ' "endpoint0:if0,endpoint1:if1,..." etc')
args = parser.parse_args(argv)
if args.debug:
tokio.debug.DEBUG = True
# Convert CLI options into datetime
try:
query_start = datetime.datetime.strptime(args.query_start, DATE_FMT)
query_end = datetime.datetime.strptime(args.query_end, DATE_FMT)
init_start = query_start
init_end = query_end
if args.init_start:
init_start = datetime.datetime.strptime(args.init_start, DATE_FMT)
if args.init_end:
init_end = datetime.datetime.strptime(args.init_end, DATE_FMT)
except ValueError:
sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT_PRINT)
raise
# Basic input bounds checking
if query_start >= query_end:
raise ValueError('query_start >= query_end')
elif init_start >= init_end:
raise ValueError('query_start >= query_end')
elif args.timestep < 1:
raise ValueError('--timestep must be > 0')
# Build list of desired interfaces
if ':' not in args.endpoints:
query_args = tokio.config.CONFIG.get('esnet_snmp_interfaces', {}).get(args.endpoints, {}).copy()
else:
query_args = {}
for kvpair in args.endpoints.split(","):
key, value = kvpair.split(":", 1)
if key not in query_args:
query_args[key] = []
query_args[key].append(value)
interfaces = []
for endpoint, ifs in query_args.items():
interfaces += [(endpoint, interface) for interface in ifs]
archive_esnet_snmp(
init_start=init_start,
init_end=init_end,
timestep=args.timestep,
interfaces=interfaces,
output_file=args.output,
query_start=query_start,
query_end=query_end,
input_file=args.input,
timeout=args.timeout)