Source code for tokio.cli.archive_esnet_snmp

"""Retrieves ESnet SNMP counters and store them in TOKIO Timeseries format
"""

import sys
import datetime
import argparse
import warnings
import tokio.debug
import tokio.config
import tokio.timeseries
import tokio.connectors.esnet_snmp
import tokio.connectors.hdf5

DATE_FMT = "%Y-%m-%dT%H:%M:%S"
DATE_FMT_PRINT = "YYYY-MM-DDTHH:MM:SS"

SCHEMA_VERSION = "1"

[docs]def endpoint_name(endpoint, interface): """Create a single key from an endpoint, interface pair Args: endpoint (str): The name of an endpoint interface (str): The interface on the given endpoint Returns: str: A single key combining endpoint and interface """ return "%s:%s" % (endpoint, interface)
[docs]class Archiver(dict): """A dictionary containing TimeSeries objects Contains the TimeSeries objects being populated from a remote data source. Implemented as a class so that a single object can store all of the TimeSeries objects that are generated by multiple method calls. """
[docs] def __init__(self, query_start, query_end, interfaces, timestep, timeout=30.0, *args, **kwargs): """Initializes the archiver and stores its settings Args: query_start (datetime.datetime): Lower bound of time to be archived, inclusive query_end (datetime.datetime): Upper bound of time to be archived, inclusive interfaces (list of tuples): List of endpoints and interfaces to archive. Each tuple is of the form (endpoint, interface). timestep (int): Number of seconds between successive data points. The ESnet service may not honor this request. timeout (float): Seconds before HTTP connection times out """ super(Archiver, self).__init__(*args, **kwargs) self.query_start = query_start self.query_end = query_end self.timestep = timestep self.interfaces = interfaces self.timeout = timeout self.config = { 'datatargets/readrates': { "units": "bytes/sec", "direction": "in", }, 'datatargets/writerates': { "units": "bytes/sec", "direction": "out", }, } self.columns = [endpoint_name(tup[0], tup[1]) for tup in self.interfaces] self.schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION) if self.schema is None: raise KeyError("Schema version %d is not known by connectors.hdf5" % SCHEMA_VERSION)
[docs] def init_datasets(self, dataset_names): """Populate empty datasets within self Creates and attachs TimeSeries objects to self based on a given column list Args: dataset_names (list of str): keys corresponding to self.config defining which datasets are being initialized """ for dataset_name in dataset_names: hdf5_dataset_name = self.schema.get(dataset_name) if hdf5_dataset_name is None: warnings.warn("Skipping %s (not in schema)" % dataset_name) else: self[dataset_name] = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name, start=self.query_start, end=self.query_end, timestep=self.timestep, num_columns=len(self.columns), column_names=self.columns)
[docs] def finalize(self): """Convert datasets to deltas where necessary and tack on metadata Perform a few finishing actions to all datasets contained in self after they have been populated. Such actions are configured entirely in self.config and require no external input. """ self.set_timeseries_metadata(list(self.config.keys()))
[docs] def set_timeseries_metadata(self, dataset_names): """Set metadata constants (version, units, etc) on datasets and groups Args: dataset_names (list of str): keys corresponding to self.config for the datasets whose metadata should be set """ for dataset_name in dataset_names: if dataset_name in self: self[dataset_name].dataset_metadata.update({ 'version': SCHEMA_VERSION, 'units': self.config[dataset_name]['units'] }) self[dataset_name].group_metadata.update({'source': 'esnet_snmp'})
[docs] def archive(self, input_file=None): """Extract and encode data from ESnet's SNMP service Queries the ESnet SNMP REST service, interprets resulting data, and populates a dictionary of TimeSeries objects with those values. Args: esnetsnmp (tokio.connectors.esnet_snmp.EsnetSnmp): Connector instance """ esnetsnmp = tokio.connectors.esnet_snmp.EsnetSnmp( start=self.query_start, end=self.query_end, input_file=input_file) self.init_datasets(self.config.keys()) # Retrieve all counters from REST API if not loaded from cache if input_file is None: last_payload_ck = None for endpoint, interface in self.interfaces: for direction in 'in', 'out': raw_result = esnetsnmp.get_interface_counters( endpoint=endpoint, interface=interface, direction=direction, agg_func='average', interval=self.timestep, timeout=self.timeout) # verify that the payload returned data and didn't hit an API rate limit payload = raw_result.get('data') if not payload: raise RuntimeError("%s:%s:%s returned no data" % (endpoint, interface, direction)) # verify that the payload returned a consistent number of data points this_payload_ck = len(payload) if last_payload_ck is not None and this_payload_ck != last_payload_ck: raise RuntimeError( "%s:%s:%s returned %d entries, but previous contained %d" % (endpoint, interface, direction, this_payload_ck, last_payload_ck)) # Loop over all retrieved data for dataset_name, config in self.config.items(): direction = config['direction'] for endpoint, interface in self.interfaces: for timestamp_int, bytes_sec in esnetsnmp[endpoint][interface][direction].items(): timestamp = datetime.datetime.fromtimestamp(timestamp_int) target_name = endpoint_name(endpoint, interface) self[dataset_name].insert_element( timestamp, target_name, bytes_sec)
[docs]def init_hdf5_file(datasets, init_start, init_end, hdf5_file): """ Initialize the datasets at full dimensions in the HDF5 file if necessary """ schema = tokio.connectors.hdf5.SCHEMA.get(SCHEMA_VERSION) for dataset_name, dataset in datasets.items(): hdf5_dataset_name = schema.get(dataset_name) if hdf5_dataset_name is None: if '/_' not in dataset_name: warnings.warn("Dataset key %s is not in schema" % dataset_name) continue if hdf5_dataset_name not in hdf5_file: # attempt to convert dataset into a timeseries timeseries = hdf5_file.to_timeseries(dataset_name=hdf5_dataset_name) # if dataset -> timeseries failed, create and commit a new, empty timeseries if timeseries is None: timeseries = tokio.timeseries.TimeSeries(dataset_name=hdf5_dataset_name, start=init_start, end=init_end, timestep=dataset.timestep, num_columns=dataset.dataset.shape[1]) hdf5_file.commit_timeseries(timeseries=timeseries) print("Initialized %s in %s with size %s" % ( hdf5_dataset_name, hdf5_file.name, timeseries.dataset.shape))
[docs]def archive_esnet_snmp(init_start, init_end, interfaces, timestep, output_file, query_start, query_end, input_file=None, **kwargs): """Retrieves remote data and stores it in TOKIO time series format Given a start and end time, retrieves all of the relevant contents of a remote data source and encodes them in the TOKIO time series HDF5 data format. Args: init_start (datetime.datetime): The first timestamp to be included in the HDF5 file init_end (datetime.datetime): The timestamp following the last timestamp to be included in the HDF5 file. interfaces (list of tuples): List of (endpoint, interface) elements to query. timestep (int): Number of seconds between successive entries in the HDF5 file to be created. output_file (str): Path to the file to be created. query_start (datetime.datetime): Time after which remote data should be retrieved, inclusive. query_end (datetime.datetime): Time before which remote data should be retrieved, inclusive. input_file (str or None): Path to a cached input. If specified, the remote REST API will not be contacted and the contents of this file will be instead loaded. kwargs (dict): Extra arguments to be passed to Archiver.__init__() """ datasets = Archiver(query_start=query_start, query_end=query_end, interfaces=interfaces, timestep=timestep, **kwargs) datasets.archive(input_file=input_file) datasets.finalize() with tokio.connectors.hdf5.Hdf5(output_file) as hdf5_file: hdf5_file.attrs['version'] = SCHEMA_VERSION init_hdf5_file(datasets, init_start, init_end, hdf5_file) for dataset in datasets.values(): print("Writing out %s" % dataset.dataset_name) hdf5_file.commit_timeseries(dataset) tokio.debug.debug_print("Wrote output to %s" % output_file)
[docs]def main(argv=None): """Entry point for the CLI interface """ parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", type=str, default=None, help="input cache db file") parser.add_argument("-o", "--output", type=str, default='output.hdf5', help="output file (default: output.hdf5)") parser.add_argument('--init-start', type=str, default=None, help='first timestamp (inclusive) when creating new output file,' + ' in %s format (default: same as start)' % DATE_FMT_PRINT) parser.add_argument('--init-end', type=str, default=None, help='final timestamp (exclusive) when creating new output file,' + ' in %s format (default: same as end)' % DATE_FMT_PRINT) parser.add_argument('--debug', action='store_true', help="produce debug messages") parser.add_argument('--timestep', type=int, default=30, help='collection frequency, in seconds (default: 30)') parser.add_argument("--timeout", type=float, default=30.0, help="connection timeout, in seconds (default: 30 sec)") parser.add_argument("query_start", type=str, help="start time in %s format" % DATE_FMT_PRINT) parser.add_argument("query_end", type=str, help="end time in %s format" % DATE_FMT_PRINT) parser.add_argument("endpoints", type=str, help='endpoint(s) to process, either a center name' + ' ("nersc") or comma-separated list of format' + ' "endpoint0:if0,endpoint1:if1,..." etc') args = parser.parse_args(argv) if args.debug: tokio.debug.DEBUG = True # Convert CLI options into datetime try: query_start = datetime.datetime.strptime(args.query_start, DATE_FMT) query_end = datetime.datetime.strptime(args.query_end, DATE_FMT) init_start = query_start init_end = query_end if args.init_start: init_start = datetime.datetime.strptime(args.init_start, DATE_FMT) if args.init_end: init_end = datetime.datetime.strptime(args.init_end, DATE_FMT) except ValueError: sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT_PRINT) raise # Basic input bounds checking if query_start >= query_end: raise ValueError('query_start >= query_end') elif init_start >= init_end: raise ValueError('query_start >= query_end') elif args.timestep < 1: raise ValueError('--timestep must be > 0') # Build list of desired interfaces if ':' not in args.endpoints: query_args = tokio.config.CONFIG.get('esnet_snmp_interfaces', {}).get(args.endpoints, {}).copy() else: query_args = {} for kvpair in args.endpoints.split(","): key, value = kvpair.split(":", 1) if key not in query_args: query_args[key] = [] query_args[key].append(value) interfaces = [] for endpoint, ifs in query_args.items(): interfaces += [(endpoint, interface) for interface in ifs] archive_esnet_snmp( init_start=init_start, init_end=init_end, timestep=args.timestep, interfaces=interfaces, output_file=args.output, query_start=query_start, query_end=query_end, input_file=args.input, timeout=args.timeout)