Source code for tokio.connectors.esnet_snmp

"""Provides interfaces into ESnet's SNMP REST API

Documentation for the REST API is here:

    http://es.net/network-r-and-d/data-for-researchers/snmp-api/

Notes:
    This connector relies either on the ``esnet_snmp_url`` configuration value
    being set in the pytokio configuration or the ``PYTOKIO_ESNET_SNMP_URI``
    being defined in the runtime environment.

Examples:
    Retrieving the data of multiple endpoints (ESnet routers) and interfaces
    is a common pattern.  To do this, the ``EsnetSnmp`` object should be
    initialized with only the intended start/end times, and the object should
    be asynchronously populated using calls to
    ``EsnetSnmp.get_interface_counters``::

        import datetime
        import tokio.connectors.esnet_snmp

        ROUTER = 'sunn-cr5'
        INTERFACE = 'to_nersc_ip-d_v4'
        TARGET_DATE = datetime.datetime.today() - datetime.timedelta(days=1)

        # Because the ESnet API treats the end date as inclusive, we subtract
        # one second to avoid counting the first measurement of the following
        # day.
        esnetsnmp = tokio.connectors.esnet_snmp.EsnetSnmp(
            start=TARGET_DATE,
            end=TARGET_DATE + datetime.timedelta(days=1, seconds=-1))
        for direction in 'in', 'out':
            esnetsnmp.get_interface_counters(
                endpoint=ROUTER,
                interface=INTERFACE,
                direction=direction,
                agg_func='average')

        for direction in 'in', 'out':
            bytes_per_sec = list(esnetsnmp[ROUTER][INTERFACE][direction].values())
            total_bytes = sum(bytes_per_sec) * esnetsnmp.timestep
            print("%s:%s saw %.2f TiB %s" % (
                ROUTER,
                INTERFACE,
                total_bytes / 2**40,
                direction))

    For simple queries, it is sufficient to specify the endpoint, interface,
    and direction directly in the initialization::

        esnetsnmp = tokio.connectors.esnet_snmp.EsnetSnmp(
            start=TARGET_DATE,
            end=TARGET_DATE + datetime.timedelta(days=1, seconds=-1)
            endpoint=ROUTER,
            interface=INTERFACE,
            direction="in")
        print("Total bytes in: %.2f" % (
            sum(list(esnetsnmp[ROUTER][INTERFACE]["in"].values())) / 2**40))

"""

import time
import json
import datetime
import warnings

import requests
import pandas

from .. import config
from . import common

[docs]class EsnetSnmp(common.CacheableDict): """Container for ESnet SNMP counters Dictionary with structure:: { "endpoint0": { "interface_x": { "in": { timestamp1: value1, timestamp2: value2, timestamp3: value3, ... }, "out": { ... } }, "interface_y": { ... } }, "endpoint1": { ... } } Various methods are provided to access the data of interest. """
[docs] def __init__(self, start, end, endpoint=None, interface=None, direction=None, agg_func=None, interval=None, **kwargs): """Retrieves data rate data for an ESnet endpoint Initializes the object with a start and end time. Optionally runs a REST API query if endpoint, interface, and direction are provided. Assumes that once the start/end time have been specified, they should not be changed. Args: start (datetime.datetime): Start of interval to retrieve, inclusive end (datetime.datetime): End of interval to retrieve, inclusive endpoint (str, optional): Name of the ESnet endpoint whose data is being retrieved interface (str, optional): Name of the ESnet endpoint interface on the specified endpoint direction (str, optional): "in" or "out" to signify data input into ESnet or data output from ESnet agg_func (str, optional ): Specifies the reduction operator to be applied over each interval; must be one of "average," "min," or "max." If None, uses the ESnet default. interval (int, optional ): Resolution, in seconds, of the data to be returned. If None, uses the ESnet default. kwargs (dict): arguments to pass to super.__init__() Attributes: start (datetime.datetime): Start of interval represented by this object, inclusive end (datetime.datetime): End of interval represented by this object, inclusive start_epoch (int): Seconds since epoch for self.start end_epoch (int): Seconds since epoch for self.end """ super(EsnetSnmp, self).__init__(**kwargs) if start >= end: raise RuntimeError("Start must be greater than or equal to end") # input parameters self.start = start self.end = end self.start_epoch = int(time.mktime(self.start.timetuple())) self.end_epoch = int(time.mktime(self.end.timetuple())) self.requested_timestep = interval self.requested_endpoint = endpoint self.requested_interface = interface self.requested_direction = direction self.requested_agg_func = agg_func # output parameters self.last_response = None self.timestep = None if endpoint and interface and direction: self.get_interface_counters(endpoint=endpoint, interface=interface, direction=direction, agg_func=agg_func, interval=interval)
[docs] def load_json(self, *args, **kwargs): """Loads input from serialized JSON Need to coerce timestamp keys back into ints from strings """ super(EsnetSnmp, self).load_json(*args, **kwargs) for endpoint, interfaces in self.items(): for interface, directions in interfaces.items(): for direction, data in directions.items(): if isinstance(data, dict): keys = list(data.keys()) # make a copy of the key as an int for key in keys: data[int(key)] = data[key] # delete the old non-int key for key in keys: del data[key]
[docs] def _insert_result(self): """Parse the raw output of the REST API and update self ESnet's REST API will return an object like:: { "agg": "30", "begin_time": 1517471100, "end_time": 1517471910, "cf": "average", "data": [ [ 1517471100, 5997486471.266666 ], ... [ 1517471910, 189300026.8 ] ] } Args: result (dict): JSON structure returned by the ESnet REST API Returns: bool: True if data inserted without errors; False otherwise """ result = self.last_response if not result: print("no result; bailing") return False # set the timestep if not previously set interval = _get_interval_result(result) if interval: if not self.timestep: self.timestep = interval elif self.timestep != interval: warnings.warn("received timestep %d from an object with timestep %d" % (interval, self.timestep)) if self.requested_endpoint not in self: self[self.requested_endpoint] = {} if self.requested_interface not in self[self.requested_endpoint]: self[self.requested_endpoint][self.requested_interface] = {} if self.requested_direction not in self[self.requested_endpoint][self.requested_interface]: self[self.requested_endpoint][self.requested_interface][self.requested_direction] = {} data = result.get('data') if not data: warnings.warn("No data in result") for timestamp, value in data: self[self.requested_endpoint][self.requested_interface][self.requested_direction][timestamp] = value self[self.requested_endpoint][self.requested_interface]['units'] = 'bytes/sec' return True
[docs] def gen_url(self, endpoint, interface, direction, agg_func=None, interval=None, **kwargs): """Builds parameters to be passed to requests.get() to retrieve data from the REST API. This is factored out so that the URI generation can be tested without a working REST endpoint. Args: endpoint (str): Name of ESnet endpoint (usually a router identifier) interface (str): Name of the ESnet endpoint interface direction (str): "in" or "out" to signify data input into ESnet or data output from ESnet agg_func (str or None): Specifies the reduction operator to be applied over each interval; must be one of "average," "min," or "max." If None, uses the ESnet default. interval (int or None): Resolution, in seconds, of the data to be returned. If None, uses the ESnet default. Returns: dict: kwargs to be passed directly to requests.get() """ if direction not in ['in', 'out']: raise RuntimeError("direction must be either in or out") self.requested_endpoint = endpoint self.requested_interface = interface self.requested_direction = direction self.requested_agg_func = agg_func self.requested_timestep = interval if self.requested_timestep is not None \ and self.timestep is not None \ and self.requested_timestep != self.timestep: warnings.warn("received timestep %d from an object with timestep %d" % (self.requested_timestep, self.timestep)) url = config.CONFIG.get('esnet_snmp_url') if url is None: raise requests.exceptions.ConnectionError("no esnet_snmp_url configured") url += '/%s/interface/%s/%s' % (endpoint, interface, direction) params = { 'begin': self.start_epoch, 'end': self.end_epoch, } if agg_func is not None: params['calc_func'] = agg_func if interval is not None: params['calc'] = interval return { "url": url, "params": params }
[docs] def get_interface_counters(self, endpoint, interface, direction, agg_func=None, interval=None, **kwargs): """Retrieves data rate data for an ESnet endpoint This is factored the way it is so that it can be subclassed to use a faked remote REST endpoint for testing. Args: endpoint (str): Name of ESnet endpoint (usually a router identifier) interface (str): Name of the ESnet endpoint interface direction (str): "in" or "out" to signify data input into ESnet or data output from ESnet agg_func (str or None): Specifies the reduction operator to be applied over each interval; must be one of "average," "min," or "max." If None, uses the ESnet default. interval (int or None): Resolution, in seconds, of the data to be returned. If None, uses the ESnet default. kwargs (dict): Extra parameters to pass to requests.get() Returns: dict: raw return from the REST API call """ request_args = self.gen_url(endpoint, interface, direction, agg_func=None, interval=None, **kwargs) request_args.update(kwargs) self.last_response = _get_url(**request_args) self._insert_result() return self.last_response
[docs] def to_dataframe(self, multiindex=False): """Return data as a Pandas DataFrame Args: multiindex (bool): If True, return a DataFrame indexed by timestamp, endpoint, interface, and direction """ to_df = [] for endpoint, interfaces in self.items(): for interface, directions in interfaces.items(): for direction, data in directions.items(): if isinstance(data, dict): for timestamp, value in data.items(): to_df.append({ 'endpoint': endpoint, 'interface': interface, 'direction': direction, 'timestamp': datetime.datetime.fromtimestamp(timestamp), 'data_rate': value, }) dataframe = pandas.DataFrame.from_records(to_df) if multiindex: return dataframe.set_index(['timestamp', 'endpoint', 'interface', 'direction']) return dataframe
[docs]def _get_interval_result(result): """Parse the raw output of the REST API output and return the timestep Args: result (dict): the raw JSON output of the ESnet REST API Returns: int or None """ value = result.get('agg') if value: return int(value) value = result.get('calc') if 'calc' in result: return int(value) return None
[docs]def _get_url(url, params, **kwargs): """Wraps the requests.get() call and returns the results as a decoded JSON object. Args: url (str): URL to request params (dict): Same as requests.get(params) kwargs: Passed directly to requests.get() Returns: dict: Result of the remote query """ request = requests.get(url, params=params, **kwargs) request.raise_for_status() return request.json()