Source code for tokio.connectors.nersc_globuslogs

#!/usr/bin/env python
"""Retrieve Globus transfer logs from NERSC's Elasticsearch infrastructure

Connects to NERSC's OMNI service and retrieves Globus transfer logs.
"""

import copy
from . import es

QUERY = {
    "query": {
        "constant_score": {
            "filter": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "start_date": {}
                            }
                        },
                        {
                            "range": {
                                "end_date": {}
                            }
                        }
                    ]
                }
            }
        }
    }
}

### Only return the following _source fields
SOURCE_FILTER = [
    '@timestamp',
    'BLOCK',
    'BUFFER',
    'CODE',
    'DATE',
    'DEST',
    'DESTIP',
    'FILE',
    'HOST',
    'NBYTES',
    'START',
    'STREAMS',
    'STRIPES',
    'TASKID',
    'TYPE',
    'USER',
    'VOLUME',
    'bandwidth_mbps',
    'duration',
    'start_date',
    'end_date',
    'host',
]

[docs]class NerscGlobusLogs(es.EsConnection): """Connection handler for NERSC Globus transfer logs """ def __init__(self, *args, **kwargs): super(NerscGlobusLogs, self).__init__(*args, **kwargs) self.filter_function = lambda x: x['hits']['hits'] self.flush_every = 50000 self.flush_function = lambda x: x self.source_filter = SOURCE_FILTER
[docs] @classmethod def from_cache(cls, *args, **kwargs): instance = super(NerscGlobusLogs, cls).from_cache(*args, **kwargs) instance.filter_function = lambda x: x['hits']['hits'] instance.flush_every = 50000 instance.flush_function = lambda x: x return instance
[docs] def query(self, start, end, must=None, scroll=True): """Queries Elasticsearch for Globus logs Accepts a start time, end time, and an optional "must" field which can be used to apply additional term queries. For example, ``must`` may be:: [ { "term": { "TASKID": "none" } }, { "term: { "TYPE": "STOR" } } ] which would return only those queries that have no associated TASKID and were sending (storing) data. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) must (list or None): list of dictionaries to be inserted as additional term-level query parameters. scroll (bool): Use the scrolling interface if True. If False, source_filter/filter_function/flush_every/flush_function are ignored. """ if must: query = copy.deepcopy(QUERY) query['query']['constant_score']['filter']['bool']['must'] += must else: # no need to deepcopy since query_timeseries() will do that query = QUERY self.query_timeseries(query, start, end, scroll=scroll)
[docs] def query_user(self, start, end, user): """Wraps query() with a user restriction Convenience method to constrain a query to a specific user. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) user (str): constrain results to this username """ self.query(start=start, end=end, must=[{"term": {"USER": user}}])
[docs] def query_type(self, start, end, xfer_type): """Wraps query() with a type restriction Convenience method to constrain a query to a specific transfer type. Args: start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) xfer_type (str): constrain results to this transfer type (STOR, RETR, etc). Case sensitive. """ self.query(start=start, end=end, must=[{"term": {"TYPE": xfer_type}}])
[docs] def query_timeseries(self, query_template, start, end, scroll=True): """Craft and issue query that returns all overlapping records Args: query_template (dict): a query object containing at least one ``@timestamp`` field start (datetime.datetime): lower bound for query (inclusive) end (datetime.datetime): upper bound for query (exclusive) scroll (bool): Use the scrolling interface if True. If False, source_filter/filter_function/flush_every/flush_function are ignored. """ query = es.build_timeseries_query( query_template, start, end, start_key='start_date', end_key='end_date') if scroll: self.query_and_scroll( query=query, source_filter=self.source_filter, filter_function=self.filter_function, flush_every=self.flush_every, flush_function=self.flush_function) else: super(NerscGlobusLogs, self).query(query=query)
[docs] def to_dataframe(self): """Converts self.scroll_pages to a DataFrame Returns: pandas.DataFrame: Contents of the last query's pages """ return super(NerscGlobusLogs, self).to_dataframe(fields=self.source_filter)