tokio.connectors.es module¶
Retrieve data stored in Elasticsearch
This module provides a wrapper around the Elasticsearch connection handler and methods to query, scroll, and process pages of scrolling data.
-
class
tokio.connectors.es.
EsConnection
(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]¶ Bases:
object
Elasticsearch connection handler.
Wrapper around an Elasticsearch connection context that provides simpler scrolling functionality for very long documents and callback functions to be run after each page is retrieved.
-
__init__
(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]¶ Configure and connect to an Elasticsearch endpoint.
Parameters: - host (str) – hostname for the Elasticsearch REST endpoint
- port (int) – port where Elasticsearch REST endpoint is bound
- index (str) – name of index against which queries will be issued
- scroll_size (str) – how long to keep the scroll search context open (e.g., “1m” for 1 minute)
- page_size (int) – how many documents should be returned per scrolled page (e.g., 10000 for 10k docs per scroll)
- timeout (int) – how many seconds to wait for a response from Elasticsearch before the query should time out
Variables: - client – Elasticsearch connection handler
- page (dict) – last page retrieved by a query
- scroll_pages (list) – dictionary of pages retrieved by query
- index (str) – name of index against which queries will be issued
- connect_host (str) – hostname for Elasticsearch REST endpoint
- connect_port (int) – port where Elasticsearch REST endpoint is bound
- connect_timeout (int) – seconds before query should time out
- page_size (int) – max number of documents returned per page
- scroll_size (int) – duration to keep scroll search context open
- scroll_id – identifier for the scroll search context currently in use
- sort_by (str) – field by which Elasticsearch should sort results before returning them as query results
- fake_pages (list) – A list of
page
structures that should be returned by self.scroll() when the elasticsearch module is not actually available. Used only for debugging. - local_mode (bool) – If True, retrieve query results from self.fake_pages instead of attempting to contact an Elasticsearch server
- kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__ if host and port are defined
-
_process_page
()[source]¶ Remove a page from the incoming queue and append it
Takes the last received page (self.page), updates the internal state of the scroll operation, updates some internal counters, calls the flush function if applicable, and applies the filter function. Then appends the results to self.scroll_pages.
Returns: True if hits were appended or not Return type: bool
-
connect
(**kwargs)[source]¶ Instantiate a connection and retain the connection context.
Parameters: kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__
-
classmethod
from_cache
(cache_file)[source]¶ Initializes an EsConnection object from a cache file.
This path is designed to be used for testing.
Parameters: cache_file (str) – Path to the JSON formatted list of pages
-
query
(query)[source]¶ Issue an Elasticsearch query.
Issues a query and returns the resulting page. If the query included a scrolling request, the scroll_id attribute is set so that scrolling can continue.
Parameters: query (dict) – Dictionary representing the query to issue Returns: The page resulting from the issued query. Return type: dict
-
query_and_scroll
(query, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]¶ Issue a query and retain all results.
Issues a query and scrolls through every resulting page, optionally applying in situ logic for filtering and flushing. All resulting pages are appended to the
scroll_pages
attribute of this object.The
scroll_pages
attribute must be wiped by whatever is consuming it; if this does not happen, query_and_scroll() will continue appending results to the results of previous queries.Parameters: - query (dict) – Dictionary representing the query to issue
- source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
- filter_function (function, optional) – Function to call before each
set of results is appended to the
scroll_pages
attribute; if specified, return value of this function is what is appended. - flush_every (int or None) – trigger the flush function once the
number of docs contained across all
scroll_pages
reaches this value. If None, do not apply flush_function. - flush_function (function, optional) – function to call when flush_every docs are retrieved.
-
query_timeseries
(query_template, start, end, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]¶ Craft and issue query bounded by time
Parameters: - query_template (dict) – a query object containing at least one
@timestamp
field - start (datetime.datetime) – lower bound for query (inclusive)
- end (datetime.datetime) – upper bound for query (exclusive)
- source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
- filter_function (function, optional) – Function to call before each
set of results is appended to the
scroll_pages
attribute; if specified, return value of this function is what is appended. - flush_every (int or None) – trigger the flush function once the
number of docs contained across all
scroll_pages
reaches this value. If None, do not apply flush_function. - flush_function (function, optional) – function to call when flush_every docs are retrieved.
- query_template (dict) – a query object containing at least one
-
save_cache
(output_file=None)[source]¶ Persist the response of the last query to a file
This is a little different from other connectors’ save_cache() methods in that it only saves down the state of the last query’s results. It does not save any connection information and does not restore the state of a previous EsConnection object.
Its principal intention is to be used with testing.
Parameters: output_file (str or None) – Path to file to which json should be written. If None, write to stdout. Default is None.
-
-
tokio.connectors.es.
build_timeseries_query
(orig_query, start, end, start_key='@timestamp', end_key=None)[source]¶ Create a query object with time ranges bounded.
Given a query dict and a start/end datetime object, return a new query object with the correct time ranges bounded. Relies on orig_query containing at least one
@timestamp
field to indicate where the time ranges should be inserted.If orig_query is querying records that contain both a “start” and “end” time (e.g., a job) rather than a discrete point in time (e.g., a sampled metric),
start_key
andend_key
can be used to modify the query to return all records that overlapped with the interval specified bystart_time
andend_time
.Parameters: - orig_query (dict) – A query object containing at least one
@timestamp
field. - start (datetime.datetime) – lower bound for query (inclusive)
- end (datetime.datetime) – upper bound for query (exclusive)
- start_key (str) – The key containing a timestamp against which a time range query should be applied.
- end_key (str) – The key containing a timestamp against which the upper
bound of the time range should be applied. If None, treat
start_key
as a single point in time rather than the start of a recorded process.
Returns: A query object with all instances of
@timestamp
bounded by start and end.Return type: - orig_query (dict) – A query object containing at least one
-
tokio.connectors.es.
mutate_query
(mutable_query, field, value, term='term')[source]¶ Inserts a new condition into a query object
See https://www.elastic.co/guide/en/elasticsearch/reference/current/term-level-queries.html for complete documentation.
Parameters: Returns: Nothing.
mutable_query
is updated in place.