Source code for tokio.cli.darshan_scoreboard

"""
Process the Darshan daily summary generated by either summarize_darshanlogs
or index_darshanlogs tools and generate a scoreboard of top sources of I/O based
on user, file system, and/or application.
"""

import re
import sys
import json
import sqlite3
import argparse
import collections

import tokio.config

BASE_QUERY = """
SELECT
    SUM(s.bytes_read) AS readbytes,
    SUM(s.bytes_written) AS writebytes,
    COUNT(DISTINCT h.filename) AS jobcount,
FROM
    summaries AS s
INNER JOIN
    headers AS h ON h.log_id = s.log_id,
    mounts AS m ON m.fs_id = s.fs_id
ORDER BY (readbytes+writebytes) DESC
"""

QUERY_PARAMS = collections.OrderedDict()
QUERY_PARAMS['per_user'] = {'col': 'h.username'}
QUERY_PARAMS['per_fs'] = {
    'col': 'm.fsname, m.mountpt',
    'group': 'm.fsname',
}
QUERY_PARAMS['per_exe'] = {'col': 'h.exename'}
QUERY_PARAMS['per_user_exe_fs'] = {
    'col': 'h.username || "|" || h.exename || "|" || m.fsname AS tuple',
    'group': 'tuple',
}

VERBOSITY = 0

[docs]def query_index_db(db_filenames, limit_fs=None, limit_user=None, limit_exe=None, exclude_fs=None, exclude_user=None, exclude_exe=None, max_results=None): """Reduce Darshan log index by fs, user, and/or exe """ where = [] where0 = [] if limit_fs: where0 = ["(m.mountpt LIKE '%s' OR m.fsname LIKE '%s')" % (limit, limit) for limit in limit_fs] where.append("(" + " OR ".join(where0) + ")") if exclude_fs: where0 = ["(m.mountpt NOT LIKE '%s' AND m.fsname NOT LIKE '%s')" % (limit, limit) for limit in exclude_fs] where.append("(" + " AND ".join(where0) + ")") if limit_user: where0 = ["h.username LIKE '%s'" % limit for limit in limit_user] where.append("(" + " OR ".join(where0) + ")") if exclude_user: where0 = ["h.username NOT LIKE '%s'" % limit for limit in exclude_user] where.append("(" + " AND ".join(where0) + ")") if limit_exe: where0 = ["h.exename LIKE '%s'" % limit for limit in limit_exe] where.append("(" + " OR ".join(where0) + ")") if exclude_exe: where0 = ["h.exename NOT LIKE '%s'" % limit for limit in exclude_exe] where.append("(" + " AND ".join(where0) + ")") results = collections.OrderedDict() for db_filename in db_filenames: conn = sqlite3.connect(db_filename) cursor = conn.cursor() for category, config in QUERY_PARAMS.items(): query = BASE_QUERY # insert the column to group by query = query.replace("FROM", " %s\nFROM" % config['col']) query = query.replace("ORDER", "GROUP BY %s\nORDER" % config.get('group', config['col'])) # insert filter qualifiers if where: query = query.replace("GROUP", "WHERE\n " + "\n AND ".join(where) + "\nGROUP") # insert max number of return items if max_results: query += "\nLIMIT %d" % max_results vprint(query, 1) cursor.execute(query) if category not in results: results[category] = [] results[category] += cursor.fetchall() cursor.close() conn.close() return results
[docs]def vprint(string, level): """Print a message if verbosity is enabled Args: string (str): Message to print level (int): Minimum verbosity level required to print """ if VERBOSITY >= level: print(string)
[docs]def main(argv=None): """Entry point for the CLI interface """ global VERBOSITY parser = argparse.ArgumentParser() parser.add_argument("indexfile", type=str, nargs='+', help="path to index database created by index_darshanlogs") parser.add_argument("--json", action='store_true', help="output in json format") parser.add_argument("--max-show", type=int, default=10, help="show top N users, apps, file systems") group_fs = parser.add_mutually_exclusive_group() group_fs.add_argument("--limit-fs", type=str, default=None, help="only process data targeting this file system. MUST be a fully qualified path to the mount point or injected SQL") group_fs.add_argument("--exclude-fs", type=str, default=None, help="exclude data targeting this file system. MUST be a fully qualified path to the mount point or injected SQL") group_user = parser.add_mutually_exclusive_group() group_user.add_argument("--limit-user", type=str, default=None, help="only process logs generated by this user") group_user.add_argument("--exclude-user", type=str, default=None, help="exclude logs generated by this user") group_exe = parser.add_mutually_exclusive_group() group_exe.add_argument("--limit-exe", type=str, default=None, help="only process logs generated by this binary") group_exe.add_argument("--exclude-exe", type=str, default=None, help="exclude logs generated by this binary") parser.add_argument('-v', '--verbose', action='count', default=0, help="Verbosity level (default: none)") args = parser.parse_args(argv) VERBOSITY = args.verbose kwargs = { 'limit_user': args.limit_user.split(',') if args.limit_user else [], 'limit_fs': args.limit_fs.split(',') if args.limit_fs else [], 'limit_exe': args.limit_exe.split(',') if args.limit_exe else [], 'exclude_user': args.exclude_user.split(',') if args.exclude_user else [], 'exclude_fs': args.exclude_fs.split(',') if args.exclude_fs else [], 'exclude_exe': args.exclude_exe.split(',') if args.exclude_exe else [], } results = query_index_db(args.indexfile, **kwargs) if args.json: print(json.dumps(results, indent=4, sort_keys=True)) else: print_top(results, max_show=args.max_show)