Source code for tokio.cli.cache_nersc_globuslogs

"""Command-line interface into the nersc_globuslogs connector
"""

import sys
import gzip
import json
import time
import datetime
import argparse
import warnings
import mimetypes

import tokio.debug
import tokio.connectors.nersc_globuslogs

DATE_FMT = "%Y-%m-%dT%H:%M:%S"

[docs]def main(argv=None): """Entry point for the CLI interface """ warnings.simplefilter('always', UserWarning) # One warning per invalid file # Parse CLI options parser = argparse.ArgumentParser(add_help=False) parser.add_argument("query_start", type=str, help="start time of query in %s format" % DATE_FMT) parser.add_argument("query_end", type=str, help="end time of query in %s format" % DATE_FMT) parser.add_argument('--debug', action='store_true', help="produce debug messages") parser.add_argument('--timeout', type=int, default=30, help='ElasticSearch timeout time (default: 30)') parser.add_argument('--input', type=str, default=None, help="use cached output from previous ES query") parser.add_argument("-o", "--output", type=str, default=None, help="output file") parser.add_argument('-h', '--host', type=str, default="localhost", help="hostname of ElasticSearch endpoint (default: localhost)") parser.add_argument('-p', '--port', type=int, default=9200, help="port of ElasticSearch endpoint (default: 9200)") parser.add_argument('-i', '--index', type=str, default='dtn-dtn-log*', help='ElasticSearch index to query (default:dtn-dtn-log*)') parser.add_argument("-c", "--csv", action="store_true", help="return output in CSV format") parser.add_argument('--user', type=str, default=None, help='limit results to transfers owned by this user') parser.add_argument('--type', type=str, default=None, help='limit results to either STOR, RETR, or NLST') args = parser.parse_args(argv) if args.debug: tokio.DEBUG = True # Convert CLI options into datetime try: query_start = datetime.datetime.strptime(args.query_start, DATE_FMT) query_end = datetime.datetime.strptime(args.query_end, DATE_FMT) except ValueError: sys.stderr.write("Start and end times must be in format %s\n" % DATE_FMT) raise # Basic input bounds checking if query_start >= query_end: raise Exception('query_start >= query_end') # Read input from a cached json file (generated previously) or by querying # Elasticsearch directly must = [] if args.input is None: ### Try to connect esdb = tokio.connectors.nersc_globuslogs.NerscGlobusLogs( host=args.host, port=args.port, index=args.index, timeout=args.timeout) if args.user: must.append({"term": {"USER": args.user}}) if args.type: must.append({"term": {"TYPE": args.type.upper()}}) tokio.debug.debug_print("Loaded results from %s:%s" % (args.host, args.port)) else: esdb = tokio.connectors.nersc_globuslogs.NerscGlobusLogs.from_cache(args.input) tokio.debug.debug_print("Loaded results from %s" % args.input) esdb.query(query_start, query_end, must=must) # Write output cache_file = args.output if cache_file is not None: print("Caching to %s" % cache_file) if args.csv: if cache_file is None: print(esdb.to_dataframe().to_csv()) else: esdb.to_dataframe().to_csv(cache_file) else: esdb.save_cache(cache_file)