tokio.connectors.es module

Retrieve data stored in Elasticsearch

This module provides a wrapper around the Elasticsearch connection handler and methods to query, scroll, and process pages of scrolling data.

class tokio.connectors.es.EsConnection(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]

Bases: object

Elasticsearch connection handler.

Wrapper around an Elasticsearch connection context that provides simpler scrolling functionality for very long documents and callback functions to be run after each page is retrieved.

__init__(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]

Configure and connect to an Elasticsearch endpoint.

Parameters:
  • host (str) – hostname for the Elasticsearch REST endpoint
  • port (int) – port where Elasticsearch REST endpoint is bound
  • index (str) – name of index against which queries will be issued
  • scroll_size (str) – how long to keep the scroll search context open (e.g., “1m” for 1 minute)
  • page_size (int) – how many documents should be returned per scrolled page (e.g., 10000 for 10k docs per scroll)
  • timeout (int) – how many seconds to wait for a response from Elasticsearch before the query should time out
Variables:
  • client – Elasticsearch connection handler
  • page (dict) – last page retrieved by a query
  • scroll_pages (list) – dictionary of pages retrieved by query
  • index (str) – name of index against which queries will be issued
  • connect_host (str) – hostname for Elasticsearch REST endpoint
  • connect_port (int) – port where Elasticsearch REST endpoint is bound
  • connect_timeout (int) – seconds before query should time out
  • page_size (int) – max number of documents returned per page
  • scroll_size (int) – duration to keep scroll search context open
  • scroll_id – identifier for the scroll search context currently in use
  • sort_by (str) – field by which Elasticsearch should sort results before returning them as query results
  • fake_pages (list) – A list of page structures that should be returned by self.scroll() when the elasticsearch module is not actually available. Used only for debugging.
  • local_mode (bool) – If True, retrieve query results from self.fake_pages instead of attempting to contact an Elasticsearch server
  • kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__ if host and port are defined
_process_page()[source]

Remove a page from the incoming queue and append it

Takes the last received page (self.page), updates the internal state of the scroll operation, updates some internal counters, calls the flush function if applicable, and applies the filter function. Then appends the results to self.scroll_pages.

Returns:True if hits were appended or not
Return type:bool
close()[source]

Close and invalidate the connection context.

connect(**kwargs)[source]

Instantiate a connection and retain the connection context.

Parameters:kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__
classmethod from_cache(cache_file)[source]

Initializes an EsConnection object from a cache file.

This path is designed to be used for testing.

Parameters:cache_file (str) – Path to the JSON formatted list of pages
query(query)[source]

Issue an Elasticsearch query.

Issues a query and returns the resulting page. If the query included a scrolling request, the scroll_id attribute is set so that scrolling can continue.

Parameters:query (dict) – Dictionary representing the query to issue
Returns:The page resulting from the issued query.
Return type:dict
query_and_scroll(query, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]

Issue a query and retain all results.

Issues a query and scrolls through every resulting page, optionally applying in situ logic for filtering and flushing. All resulting pages are appended to the scroll_pages attribute of this object.

The scroll_pages attribute must be wiped by whatever is consuming it; if this does not happen, query_and_scroll() will continue appending results to the results of previous queries.

Parameters:
  • query (dict) – Dictionary representing the query to issue
  • source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
  • filter_function (function, optional) – Function to call before each set of results is appended to the scroll_pages attribute; if specified, return value of this function is what is appended.
  • flush_every (int or None) – trigger the flush function once the number of docs contained across all scroll_pages reaches this value. If None, do not apply flush_function.
  • flush_function (function, optional) – function to call when flush_every docs are retrieved.
query_timeseries(query_template, start, end, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]

Craft and issue query bounded by time

Parameters:
  • query_template (dict) – a query object containing at least one @timestamp field
  • start (datetime.datetime) – lower bound for query (inclusive)
  • end (datetime.datetime) – upper bound for query (exclusive)
  • source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
  • filter_function (function, optional) – Function to call before each set of results is appended to the scroll_pages attribute; if specified, return value of this function is what is appended.
  • flush_every (int or None) – trigger the flush function once the number of docs contained across all scroll_pages reaches this value. If None, do not apply flush_function.
  • flush_function (function, optional) – function to call when flush_every docs are retrieved.
save_cache(output_file=None)[source]

Persist the response of the last query to a file

This is a little different from other connectors’ save_cache() methods in that it only saves down the state of the last query’s results. It does not save any connection information and does not restore the state of a previous EsConnection object.

Its principal intention is to be used with testing.

Parameters:output_file (str or None) – Path to file to which json should be written. If None, write to stdout. Default is None.
scroll()[source]

Request the next page of results.

Requests the next page of results for a query that is scrolling. This can only be performed if the scroll_id attribute is set (e.g., by the query() method).

Returns:The next page in the scrolling context.
Return type:dict
to_dataframe(fields)[source]

Converts self.scroll_pages to CSV

Returns:Contents of the last query’s pages in CSV format
Return type:str
tokio.connectors.es.build_timeseries_query(orig_query, start, end, start_key='@timestamp', end_key=None)[source]

Create a query object with time ranges bounded.

Given a query dict and a start/end datetime object, return a new query object with the correct time ranges bounded. Relies on orig_query containing at least one @timestamp field to indicate where the time ranges should be inserted.

If orig_query is querying records that contain both a “start” and “end” time (e.g., a job) rather than a discrete point in time (e.g., a sampled metric), start_key and end_key can be used to modify the query to return all records that overlapped with the interval specified by start_time and end_time.

Parameters:
  • orig_query (dict) – A query object containing at least one @timestamp field.
  • start (datetime.datetime) – lower bound for query (inclusive)
  • end (datetime.datetime) – upper bound for query (exclusive)
  • start_key (str) – The key containing a timestamp against which a time range query should be applied.
  • end_key (str) – The key containing a timestamp against which the upper bound of the time range should be applied. If None, treat start_key as a single point in time rather than the start of a recorded process.
Returns:

A query object with all instances of @timestamp bounded by start and end.

Return type:

dict

tokio.connectors.es.mutate_query(mutable_query, field, value, term='term')[source]

Inserts a new condition into a query object

See https://www.elastic.co/guide/en/elasticsearch/reference/current/term-level-queries.html for complete documentation.

Parameters:
  • mutable_query (dict) – a query object to be modified
  • field (str) – the field to which a term query will be applied
  • value – the value to match for the term query
  • term (str) – one of the following: term, terms, terms_set, range, exists, prefix, wildcard, regexp, fuzzy, type, ids.
Returns:

Nothing. mutable_query is updated in place.