#!/usr/bin/env python
"""Retrieve data generated by collectd and stored in Elasticsearch
"""
import copy
from . import es
BASE_QUERY = copy.deepcopy(es.BASE_QUERY)
es.mutate_query(BASE_QUERY, term='prefix', field='hostname', value='bb')
QUERY_DISK_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_DISK_DATA, term='term', field='plugin', value='disk')
es.mutate_query(QUERY_DISK_DATA, term='prefix', field='plugin_instance', value='nvme')
es.mutate_query(QUERY_DISK_DATA, term='regexp', field='collectd_type', value='disk_(octets|ops)')
QUERY_CPU_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_CPU_DATA, term='term', field='plugin', value='cpu')
es.mutate_query(QUERY_CPU_DATA, term='regexp', field='type_instance', value='(idle|user|system)')
QUERY_MEMORY_DATA = copy.deepcopy(BASE_QUERY)
es.mutate_query(QUERY_MEMORY_DATA, term='term', field='plugin', value='memory')
### Only return the following _source fields
SOURCE_FILTER = [
'@timestamp',
'hostname',
'plugin',
'collectd_type',
'type_instance',
'plugin_instance',
'value',
'longterm',
'midterm',
'shortterm',
'majflt',
'minflt',
'if_octets',
'if_packets',
'if_errors',
'rx',
'tx',
'read',
'write',
'io_time',
]
[docs]class CollectdEs(es.EsConnection):
"""collectd-Elasticsearch connection handler
"""
def __init__(self, *args, **kwargs):
super(CollectdEs, self).__init__(*args, **kwargs)
self.filter_function = lambda x: x['hits']['hits']
self.flush_every = 50000
self.flush_function = lambda x: x
[docs] @classmethod
def from_cache(cls, *args, **kwargs):
instance = super(CollectdEs, cls).from_cache(*args, **kwargs)
instance.filter_function = lambda x: x['hits']['hits']
instance.flush_every = 50000
instance.flush_function = lambda x: x
return instance
[docs] def query_disk(self, start, end):
"""Query Elasticsearch for collectd disk plugin data.
Args:
start (datetime.datetime): lower bound for query (inclusive)
end (datetime.datetime): upper bound for query (exclusive)
"""
self.query_timeseries(QUERY_DISK_DATA, start, end)
[docs] def query_memory(self, start, end):
"""Query Elasticsearch for collectd memory plugin data.
Args:
start (datetime.datetime): lower bound for query (inclusive)
end (datetime.datetime): upper bound for query (exclusive)
"""
self.query_timeseries(QUERY_MEMORY_DATA, start, end)
[docs] def query_cpu(self, start, end):
"""Query Elasticsearch for collectd cpu plugin data.
Args:
start (datetime.datetime): lower bound for query (inclusive)
end (datetime.datetime): upper bound for query (exclusive)
"""
self.query_timeseries(QUERY_CPU_DATA, start, end)
[docs] def query_timeseries(self, query_template, start, end, source_filter=None,
filter_function=None, flush_every=None,
flush_function=None):
"""Map connection-wide attributes to super(self).query_timeseries arguments
Args:
query_template (dict): a query object containing at least one
``@timestamp`` field
start (datetime.datetime): lower bound for query (inclusive)
end (datetime.datetime): upper bound for query (exclusive)
source_filter (bool or list): Return all fields contained in each
document's _source field if True; otherwise, only return source
fields contained in the provided list of str. If None, use the
default for this connector.
filter_function (function, optional): Function to call before each
set of results is appended to the ``scroll_pages`` attribute; if
specified, return value of this function is what is appended.
If None, use the default for this connector.
flush_every (int or None): trigger the flush function once the
number of docs contained across all ``scroll_pages`` reaches
this value. If None, do not apply `flush_function`. If None,
use the default for this connector.
flush_function (function, optional): function to call when
`flush_every` docs are retrieved. If None, use the default for
this connector.
"""
if source_filter is None:
source_filter = SOURCE_FILTER
if filter_function is None:
filter_function = self.filter_function
if flush_every is None:
flush_every = self.flush_every
if flush_function is None:
flush_function = self.flush_function
return super(CollectdEs, self)\
.query_timeseries(query_template=query_template,
start=start,
end=end,
source_filter=source_filter,
filter_function=filter_function,
flush_every=flush_every,
flush_function=flush_function)
[docs] def to_dataframe(self):
"""Converts self.scroll_pages to a DataFrame
Returns:
pandas.DataFrame: Contents of the last query's pages
"""
return super(CollectdEs, self).to_dataframe(fields=SOURCE_FILTER)