Source code for tokio.connectors.collectd_es

#!/usr/bin/env python
"""Retrieve data generated by collectd and stored in Elasticsearch
"""

import copy
from . import es

BASE_QUERY = copy.deepcopy(es.BASE_QUERY)
es.mutate_query(BASE_QUERY, term='prefix', field='hostname', value='bb')

QUERY_DISK_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_DISK_DATA, term='term', field='plugin', value='disk')
es.mutate_query(QUERY_DISK_DATA, term='prefix', field='plugin_instance', value='nvme')
es.mutate_query(QUERY_DISK_DATA, term='regexp', field='collectd_type', value='disk_(octets|ops)')

QUERY_CPU_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_CPU_DATA, term='term', field='plugin', value='cpu')
es.mutate_query(QUERY_CPU_DATA, term='regexp', field='type_instance', value='(idle|user|system)')

QUERY_MEMORY_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_MEMORY_DATA, term='term', field='plugin', value='memory')

### Only return the following _source fields
SOURCE_FILTER = [
    '@timestamp',
    'hostname',
    'plugin',
    'collectd_type',
    'type_instance',
    'plugin_instance',
    'value',
    'longterm',
    'midterm',
    'shortterm',
    'majflt',
    'minflt',
    'if_octets',
    'if_packets',
    'if_errors',
    'rx',
    'tx',
    'read',
    'write',
    'io_time',
]


[docs]class CollectdEs(es.EsConnection): """collectd-Elasticsearch connection handler """ def __init__(self, *args, **kwargs): super(CollectdEs, self).__init__(*args, **kwargs) self.filter_function = lambda x: x['hits']['hits'] self.flush_every = 50000 self.flush_function = lambda x: x
[docs] @classmethod def from_cache(cls, *args, **kwargs): instance = super(CollectdEs, cls).from_cache(*args, **kwargs) instance.filter_function = lambda x: x['hits']['hits'] instance.flush_every = 50000 instance.flush_function = lambda x: x return instance
[docs] def query_disk(self, start, end): """Query Elasticsearch for collectd disk plugin data. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) """ self.query_timeseries(QUERY_DISK_DATA, start, end)
[docs] def query_memory(self, start, end): """Query Elasticsearch for collectd memory plugin data. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) """ self.query_timeseries(QUERY_MEMORY_DATA, start, end)
[docs] def query_cpu(self, start, end): """Query Elasticsearch for collectd cpu plugin data. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) """ self.query_timeseries(QUERY_CPU_DATA, start, end)
[docs] def query_timeseries(self, query_template, start, end, source_filter=None, filter_function=None, flush_every=None, flush_function=None): """Map connection-wide attributes to super(self).query_timeseries arguments Args: query_template (dict): a query object containing at least one ``@timestamp`` field start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) source_filter (bool or list): Return all fields contained in each document's _source field if True; otherwise, only return source fields contained in the provided list of str. If None, use the default for this connector. filter_function (function, optional): Function to call before each set of results is appended to the ``scroll_pages`` attribute; if specified, return value of this function is what is appended. If None, use the default for this connector. flush_every (int or None): trigger the flush function once the number of docs contained across all ``scroll_pages`` reaches this value. If None, do not apply `flush_function`. If None, use the default for this connector. flush_function (function, optional): function to call when `flush_every` docs are retrieved. If None, use the default for this connector. """ if source_filter is None: source_filter = SOURCE_FILTER if filter_function is None: filter_function = self.filter_function if flush_every is None: flush_every = self.flush_every if flush_function is None: flush_function = self.flush_function return super(CollectdEs, self)\ .query_timeseries(query_template=query_template, start=start, end=end, source_filter=source_filter, filter_function=filter_function, flush_every=flush_every, flush_function=flush_function)
[docs] def to_dataframe(self): """Converts self.scroll_pages to a DataFrame Returns: pandas.DataFrame: Contents of the last query's pages """ return super(CollectdEs, self).to_dataframe(fields=SOURCE_FILTER)