"""
Process the Darshan daily summary generated by either summarize_darshanlogs
or index_darshanlogs tools and generate a scoreboard of top sources of I/O based
on user, file system, and/or application.
"""
import re
import sys
import json
import sqlite3
import argparse
import collections
import tokio.config
BASE_QUERY = """
SELECT
SUM(s.bytes_read) AS readbytes,
SUM(s.bytes_written) AS writebytes,
COUNT(DISTINCT h.filename) AS jobcount,
FROM
summaries AS s
INNER JOIN
headers AS h ON h.log_id = s.log_id,
mounts AS m ON m.fs_id = s.fs_id
ORDER BY (readbytes+writebytes) DESC
"""
QUERY_PARAMS = collections.OrderedDict()
QUERY_PARAMS['per_user'] = {'col': 'h.username'}
QUERY_PARAMS['per_fs'] = {
'col': 'm.fsname, m.mountpt',
'group': 'm.fsname',
}
QUERY_PARAMS['per_exe'] = {'col': 'h.exename'}
QUERY_PARAMS['per_user_exe_fs'] = {
'col': 'h.username || "|" || h.exename || "|" || m.fsname AS tuple',
'group': 'tuple',
}
VERBOSITY = 0
[docs]def query_index_db(db_filenames,
limit_fs=None, limit_user=None, limit_exe=None,
exclude_fs=None, exclude_user=None, exclude_exe=None,
max_results=None):
"""Reduce Darshan log index by fs, user, and/or exe
"""
where = []
where0 = []
if limit_fs:
where0 = ["(m.mountpt LIKE '%s' OR m.fsname LIKE '%s')" % (limit, limit) for limit in limit_fs]
where.append("(" + " OR ".join(where0) + ")")
if exclude_fs:
where0 = ["(m.mountpt NOT LIKE '%s' AND m.fsname NOT LIKE '%s')" % (limit, limit) for limit in exclude_fs]
where.append("(" + " AND ".join(where0) + ")")
if limit_user:
where0 = ["h.username LIKE '%s'" % limit for limit in limit_user]
where.append("(" + " OR ".join(where0) + ")")
if exclude_user:
where0 = ["h.username NOT LIKE '%s'" % limit for limit in exclude_user]
where.append("(" + " AND ".join(where0) + ")")
if limit_exe:
where0 = ["h.exename LIKE '%s'" % limit for limit in limit_exe]
where.append("(" + " OR ".join(where0) + ")")
if exclude_exe:
where0 = ["h.exename NOT LIKE '%s'" % limit for limit in exclude_exe]
where.append("(" + " AND ".join(where0) + ")")
results = collections.OrderedDict()
for db_filename in db_filenames:
conn = sqlite3.connect(db_filename)
cursor = conn.cursor()
for category, config in QUERY_PARAMS.items():
query = BASE_QUERY
# insert the column to group by
query = query.replace("FROM", " %s\nFROM" % config['col'])
query = query.replace("ORDER", "GROUP BY %s\nORDER" % config.get('group', config['col']))
# insert filter qualifiers
if where:
query = query.replace("GROUP",
"WHERE\n " + "\n AND ".join(where) + "\nGROUP")
# insert max number of return items
if max_results:
query += "\nLIMIT %d" % max_results
vprint(query, 1)
cursor.execute(query)
if category not in results:
results[category] = []
results[category] += cursor.fetchall()
cursor.close()
conn.close()
return results
[docs]def print_top(categorized_data, max_show=10):
"""
Print the biggest I/O {users, exes, file systems}
"""
names = {
'per_fs': "File Systems",
'per_user': "Users",
'per_exe': "Applications",
'per_user_exe_fs': "User/App/FS",
}
categories = 0
for category, rankings in categorized_data.items():
print_buffer = ""
name = names.get(category, category)
if categories > 0:
print_buffer += "\n"
print_buffer += "%2s %40s %10s %10s %8s\n" % ('#', name, 'Read(GiB)', 'Write(GiB)', '# Jobs')
print_buffer += '=' * 75 + "\n"
displayed = 0
for winner in sorted(rankings, key=lambda x: x[0] + x[1], reverse=True):
winner_str = winner[3]
if not winner_str:
# Darshan logs without POSIX/STDIO counters and without
# filename-encoded metadata can result in null strings for
# winner[0]
winner_str = "UNKNOWN|UNKNOWN|UNKNOWN"
if '|' in winner_str:
winner_str = winner_str.replace('|', ', ')
if len(winner_str) > 40:
# winner_str = "..." + winner_str[-37:]
winner_str = winner_str[:19] + "..." + winner_str[-18:]
displayed += 1
if displayed > max_show:
break
print_buffer += "%2d. %40.40s %10.1f %10.1f %8d\n" % (displayed,
winner_str,
winner[0] / 2.0**30,
winner[1] / 2.0**30,
winner[2])
if displayed > 0:
sys.stdout.write(print_buffer)
categories += 1
[docs]def vprint(string, level):
"""Print a message if verbosity is enabled
Args:
string (str): Message to print
level (int): Minimum verbosity level required to print
"""
if VERBOSITY >= level:
print(string)
[docs]def main(argv=None):
"""Entry point for the CLI interface
"""
global VERBOSITY
parser = argparse.ArgumentParser()
parser.add_argument("indexfile", type=str, nargs='+',
help="path to index database created by index_darshanlogs")
parser.add_argument("--json", action='store_true',
help="output in json format")
parser.add_argument("--max-show", type=int, default=10,
help="show top N users, apps, file systems")
group_fs = parser.add_mutually_exclusive_group()
group_fs.add_argument("--limit-fs", type=str, default=None,
help="only process data targeting this file system. MUST be a fully qualified path to the mount point or injected SQL")
group_fs.add_argument("--exclude-fs", type=str, default=None,
help="exclude data targeting this file system. MUST be a fully qualified path to the mount point or injected SQL")
group_user = parser.add_mutually_exclusive_group()
group_user.add_argument("--limit-user", type=str, default=None,
help="only process logs generated by this user")
group_user.add_argument("--exclude-user", type=str, default=None,
help="exclude logs generated by this user")
group_exe = parser.add_mutually_exclusive_group()
group_exe.add_argument("--limit-exe", type=str, default=None,
help="only process logs generated by this binary")
group_exe.add_argument("--exclude-exe", type=str, default=None,
help="exclude logs generated by this binary")
parser.add_argument('-v', '--verbose', action='count', default=0, help="Verbosity level (default: none)")
args = parser.parse_args(argv)
VERBOSITY = args.verbose
kwargs = {
'limit_user': args.limit_user.split(',') if args.limit_user else [],
'limit_fs': args.limit_fs.split(',') if args.limit_fs else [],
'limit_exe': args.limit_exe.split(',') if args.limit_exe else [],
'exclude_user': args.exclude_user.split(',') if args.exclude_user else [],
'exclude_fs': args.exclude_fs.split(',') if args.exclude_fs else [],
'exclude_exe': args.exclude_exe.split(',') if args.exclude_exe else [],
}
results = query_index_db(args.indexfile, **kwargs)
if args.json:
print(json.dumps(results, indent=4, sort_keys=True))
else:
print_top(results, max_show=args.max_show)