Source code for tokio.connectors.hpss

"""Connect to various outputs made available by HPSS
"""

import re
import copy
import time
import datetime
from tokio.common import to_epoch
from tokio.connectors.common import SubprocessOutputDict

REX_HEADING_LINE = re.compile(r"^[= ]+$")
REX_EMPTY_LINE = re.compile(r"^\s*$")
REX_TIMEDELTA = re.compile(r"^(\d+)-(\d+):(\d+):(\d+)$")

FLOAT_KEYS = set([
    'io_gb',
    'write_gb',
    'read_gb',
    'copy_gb',
    'mig (gb)',
    'purge(gb)',
    'lock%'
])
INT_KEYS = set([
    'users',
    'ops',
    'w_ops',
    'r_ops',
    'c_ops',
    'migfiles',
    'purfiles',
    'count',
    'cleans',
    'locks',
    'mounts',
])
DELTIM_KEYS = set([
    'migtime',
    'purgetime',
    'availtime',
    'locktime',
    'mounttime',
])

REKEY_TABLES = {
    'io totals by client application': 'client',
    'io totals by client host': 'host',
    'io totals by hpss client gateway (ui) host': 'host',
    # 'largest users': 'user', # degenerate users will occur if one user uses multiple client apps
    'migration purge report': 'sc',
    # 'tape drive report': 'drivetyp',
}

[docs]class FtpLog(SubprocessOutputDict): """Provides an interface for log files containing HPSS FTP transactions This connector parses FTP logs generated by HPSS 7.3. Older versions are not supported. HPSS FTP log files contain transfer records that look something like:: #0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Mon Dec 31 00:06:46 2018 dtn01-int.nersc.gov /home/o/operator/.check_ftp.25651 b POPN_Cmd r r ftp operator fd 0 Mon Dec 31 00:06:46 2018 0.010 dtn01-int.nersc.gov 33 /home/o/opera... b o PRTR_Cmd r ftp operator fd 0 Mon Dec 31 00:06:48 2018 0.430 sgn-pub-01.nersc.gov 0 /home/g/glock... b o RETR_Cmd r ftp wwwhpss Mon Feb 4 16:45:04 2019 457.800 sgn-pub-01.nersc.gov 7184842752 /home/g/glock... b o RETR_Cmd r ftp wwwhpss Fri Jul 12 15:32:43 2019 2.080 gert01-224.nersc.gov 2147483647 /home/n/nickb... b i PSTO_Cmd r ftp nickb fd 0 Mon Jul 29 15:44:22 2019 0.800 dtn02.nersc.gov 464566784 /home/n/nickb... b o PRTR_Cmd r ftp nickb fd 0 which this class deserializes and represents as a dictionary-like object of the form:: { "ftp": [ { "bytes": 0, "bytes_sec": 0.0, "duration_sec": 0.43, "end_timestamp": 1546243608.0, "hpss_path": "/home/g/glock...", "hpss_uid": "wwwhpss", "opname": "HL", "remote_host": "sgn-pub-01.nersc.gov", "start_timestamp": 1546243607.57 }, ... ], "pftp": [ { "bytes": 33, "bytes_sec": 3300.0, "duration_sec": 0.01, "end_timestamp": 1546243606.0, "hpss_path": "/home/o/opera...", "hpss_uid": "operator", "opname": "HL", "remote_host": "dtn01-int.nersc.gov", "start_timestamp": 1546243605.99 }, ... ] } where the top-level keys are either "ftp" or "pftp", and their values are lists containing every FTP or parallel FTP transaction, respectively. """ def __init__(self, *args, **kwargs): super(FtpLog, self).__init__(*args, **kwargs) self.load()
[docs] @classmethod def from_str(cls, input_str): """Instantiate from a string """ return cls(from_string=input_str)
[docs] @classmethod def from_file(cls, cache_file): """Instantiate from a cache file """ return cls(cache_file=cache_file)
[docs] def load_str(self, input_str): """Parse text from an HPSS FTP log """ skip_recs = 0 for line in input_str.splitlines(): args = line.split() rec = { 'end_timestamp': time.mktime(datetime.datetime.strptime(" ".join(args[0:5]), "%a %b %d %H:%M:%S %Y").timetuple()), } # HPSS 7.4 POPN command can be skipped if args[11] == "ftp" and args[8] == "POPN_Cmd": skip_recs += 1 continue # this is the signature of HPSS 7.3 log files app = None if args[13] == "ftp" and args[11].endswith('_Cmd'): # STOR = serial ftp; PTSO = pftp; LFSTO = "local file command" # determine directionality if args[11] in {"STOR_Cmd", "PSTO_Cmd", "LFSTO_Cmd"}: rec['opname'] = "LH" # putting data into HPSS elif args[11] in {"RETR_Cmd", "PRTR_Cmd", "LFRTR_Cmd"}: rec['opname'] = "HL" # getting data from HPSS # determine protocol if args[11].startswith("P"): app = "pftp" elif args[11].startswith('LF'): # wtf even are these app = "pftp" else: app = "ftp" if not app: skip_recs += 1 continue # this is a low-resolution representation rec['duration_sec'] = float(args[5]) rec['duration_sec_resolution'] = _get_ascii_resolution(args[5]) rec['remote_host'] = args[6] rec['bytes'] = int(args[7]) rec['hpss_path'] = args[8] rec['hpss_uid'] = args[14] # access_latency is unknown if app not in self: self[app] = [] self[app].append(rec)
[docs]class HsiLog(SubprocessOutputDict): """Provides an interface for log files containing HSI and HTAR transactions This connector receives input from an HSI log file which takes the form:: Sat Aug 10 00:05:26 2019 dtn01.nersc.gov hsi 57074 31117 LH 0 0.02 543608 12356.7 4 /global/project/projectdir... /home/g/glock/... 57074 Sat Aug 10 00:05:28 2019 cori02-224.nersc.gov htar 58888 14301 create LH 0 58178668032 397.20 146472.0 /nersc/projects/blah.tar 5 58888 Sat Aug 10 00:05:29 2019 myuniversity.edu hsi 35136 1391 LH -1 0.03 0 0.0 0 xyz.bin /home/g/glock/xyz.bin 35136 but uses both tabs and spaces to denote different fields. This connector then presents this data in a dictionary-like form:: { "hsi": [ { "access_latency_sec": 0.03, "account_id": 35136, "bytes": 0, "bytes_sec": 0.0, "client_pid": 1035, "cos_id": 0, "dest_path": "/home/g/glock/blah.bin", "hpss_uid": 35136, "opname": "LH", "remote_host": "someuniv.edu", "return_code": -1, "source_path": "blah.bin", "end_timestamp": 1565420701 }, ... "htar": [ { "account_id": 58888, "bytes": 58178668032, "bytes_sec": 146472.0, "client_pid": 14301, "cos_id": 5, "duration_sec": 397.2, "hpss_path": "/nersc/projects/blah.tar", "hpss_uid": 58888, "htar_op": "create", "opname": "LH", "remote_ftp_host": "", "remote_host": "cori02-224.nersc.gov", "return_code": 0, "end_timestamp": 1565420728 } ] } where the top-level keys are either "hsi" or "htar", and their values are lists containing every HSI or HTAR transaction, respectively. The keys generally follow the raw nomenclature used in the HSI logs which can be found on `Mike Gleicher's website <http://pal.mgleicher.us/HSI_Admin/log_files.html>`_. Perhaps most relevant are the opnames, which can be one of * FU - file unlink. Has no destination filename field or account id. * FR - file rename. Has no account id. * LH - transfer into HPSS ("Local to HPSS") * HL - transfer out of HPSS ("HPSS to Local") * HH - internal file copy ("HPSS-to-HPSS") For posterity, - ``access_latency_sec`` is the time to open the file. This includes the latency to pull the tape and insert it into the drive. - ``bytes`` and ``bytes_sec`` are the size and rate of data transfer - ``duration_sec`` is the time to complete the transfer - ``return_code`` is zero on success, nonzero otherwise """ def __init__(self, *args, **kwargs): super(HsiLog, self).__init__(*args, **kwargs) self.load()
[docs] @classmethod def from_str(cls, input_str): """Instantiate from a string """ return cls(from_string=input_str)
[docs] @classmethod def from_file(cls, cache_file): """Instantiate from a cache file """ return cls(cache_file=cache_file)
[docs] def load_str(self, input_str): """Parse an HSI log file containing HSI and HTAR transactions """ bad_recs = 0 for line in input_str.splitlines(): # log lines use spaces/tabs to mean different things, e.g., # Sat Aug 10 00:05:13 2019 xyz.berkeley.edu hsi 69615 1360^ILH^I-1^I0.03^I0^I0.0^I0^Is6c0.out.20190727_085506.1^I/home/g/glock/blah.20190727_085506.1^I69615 args = re.split(pattern=' +', string=line, maxsplit=8) app = args[6] # hsi or htar app_args = args.pop() args += app_args.split('\t') if app == 'hsi': rec = { 'client_pid': int(args[8]), 'opname': args[9], 'return_code': int(args[10]), 'access_latency_sec': float(args[11]), 'access_latency_sec_resolution': _get_ascii_resolution(args[11]), 'bytes': int(args[12]), 'bytes_sec': 1000.0 * float(args[13]), 'bytes_sec_resolution': 1000.0 * _get_ascii_resolution(args[13]), 'cos_id': int(args[14]), 'source_path': args[15], } # not all operations (e.g., FU) have the final two fields if args[9] != 'FU': rec['dest_path'] = args[16] if args[9] != 'FR': rec['account_id'] = int(args[17]) elif app == 'htar': rec = { 'client_pid': int(args[8]), 'htar_op': args[9], 'opname': args[10], 'return_code': int(args[11]), 'bytes': int(args[12]), 'duration_sec': float(args[13]), 'duration_sec_resolution': _get_ascii_resolution(args[13]), 'bytes_sec': float(args[14]), 'bytes_sec_resolution': _get_ascii_resolution(args[14]), 'hpss_path': args[15], 'cos_id': int(args[-2]), 'account_id': int(args[-1]) } if len(args) == 19: # whose idea was it to make the column count variable? rec['remote_ftp_host'] = args[-3] else: bad_recs += 1 continue rec['hpss_uid'] = int(args[7]) rec['remote_host'] = args[5] rec['end_timestamp'] = to_epoch(datetime.datetime.strptime(" ".join(args[0:5]), "%a %b %d %H:%M:%S %Y"), float) if app not in self: self[app] = [] self[app].append(rec)
[docs]class HpssDailyReport(SubprocessOutputDict): """Representation for the daily report that HPSS can generate """ def __init__(self, *args, **kwargs): super(HpssDailyReport, self).__init__(*args, **kwargs) self.date = None self.load()
[docs] @classmethod def from_str(cls, input_str): """Instantiate from a string """ return cls(from_string=input_str)
[docs] @classmethod def from_file(cls, cache_file): """Instantiate from a cache file """ return cls(cache_file=cache_file)
[docs] def load_str(self, input_str): """Parse the HPSS daily report text """ lines = input_str.splitlines() num_lines = len(lines) start_line = 0 # Look for the header for the whole report to get the report date for start_line, line in enumerate(lines): if line.startswith("HPSS Report for Date"): self.date = datetime.datetime.strptime(line.split()[-1], "%Y-%m-%d") break if not self.date: raise IndexError("No report date found") # Try to find tables encoded in the remainder of the report while start_line < num_lines: parsed_table, finish_line = _parse_section(lines, start_line) if finish_line != start_line and 'records' in parsed_table: if parsed_table['system'] not in self: self.__setitem__(parsed_table['system'], {}) # convert a list of records into a dict of indices if parsed_table['title'] in REKEY_TABLES: parsed_table = _rekey_table(parsed_table, key=REKEY_TABLES[parsed_table['title']]) self[parsed_table['system']][parsed_table['title']] = parsed_table['records'] start_line += 1
[docs]def _parse_section(lines, start_line=0): """Parse a single table of the HPSS daily report Converts a table from the HPSS daily report into a dictionary. For example an example table may appear as:: Archive : IO Totals by HPSS Client Gateway (UI) Host Host Users IO_GB Ops =============== ===== ========= ======== heart 53 148740.6 27991 dtn11 5 29538.6 1694 Total 58 178279.2 29685 HPSS ACCOUNTING: 224962.6 which will return a dict of form:: { "system": "archive", "title": "io totals by hpss client gateway (ui) host", "records": { "heart": { "io_gb": "148740.6", "ops": "27991", "users": "53", }, "dtn11": { "io_gb": "29538.6", "ops": "1694", "users": "5", }, "total": { "io_gb": "178279.2", "ops": "29685", "users": "58", } ] } This function is robust to invalid data, and any lines that do not appear to be a valid table will be treated as the end of the table. Args: lines (list of str): Text of the HPSS report start_line (int): Index of ``lines`` defined such that * ``lines[start_line]`` is the table title * ``lines[start_line + 1]`` is the table heading row * ``lines[start_line + 2]`` is the line separating the table heading and the first row of data * ``lines[start_line + 3:]`` are the rows of the table Returns: tuple: Tuple of (dict, int) where * dict contains the parsed contents of the table * int is the index of the last line of the table + 1 """ results = {} # Skip any initial whitespace num_lines = len(lines) while start_line < num_lines and REX_EMPTY_LINE.match(lines[start_line]): start_line += 1 # Did we skip past the end of the input data? if start_line >= num_lines: return results, start_line # Parse table title (if available). This can pick up times (0:00:00) so do # not treat system, title as legitimate values until we also identify the # line below column headings. if ':' not in lines[start_line]: return results, start_line system, title = lines[start_line].split(':', 1) # Determine column delimiters separator_line = lines[start_line + 2] col_extents = _find_columns(separator_line) if len(col_extents) == 0: return results, start_line # At this point, we are reasonably confident we have found a table. # Populate results so that this function returns some indicator of # success. results['system'] = system.strip().lower() results['title'] = title.strip().lower() # Determine column headers heading_line = lines[start_line + 1] headings = [] for start_pos, str_len in col_extents: headings.append(heading_line[start_pos:start_pos + str_len].strip()) records = [] index = 0 for index, line in enumerate(lines[start_line + 3:]): # check for end of record (empty line) if REX_EMPTY_LINE.match(line): # an empty line denotes end of table break elif len(line) < (col_extents[-1][0] + col_extents[-1][1] - 1): # line is malformed; this happens for table summaries break record = {} for heading_idx, (start_pos, str_len) in enumerate(col_extents): col_name = headings[heading_idx].lower() col_val = line[start_pos:start_pos + str_len].lower().strip() if col_name in FLOAT_KEYS: record[col_name] = float(col_val) elif col_name in INT_KEYS: record[col_name] = int(col_val) elif col_name in DELTIM_KEYS: record[col_name] = col_val record[col_name + "secs"] = _hpss_timedelta_to_secs(col_val) else: record[col_name] = col_val records.append(record) if records: results['records'] = records return (results, index + 1)
[docs]def _find_columns(line, sep="=", gap=' ', strict=False): """Determine the column start/end positions for a header line separator Takes a line separator such as the one denoted below:: Host Users IO_GB =============== ===== ========= heart 53 148740.6 and returns a tuple of (start index, end index) values that can be used to slice table rows into column entries. Args: line (str): Text comprised of separator characters and spaces that define the extents of columns sep (str): The character used to draw the column lines gap (str): The character separating ``sep`` characters strict (bool): If true, restrict column extents to only include sep characters and not the spaces that follow them. Returns: list of tuples: """ columns = [] # if line is not comprised exclusively of separators and gaps, it is not a # valid heading line if line.replace(sep, 'X').replace(gap, 'X').strip('X') != "": return columns if strict: col_start = None else: col_start = 0 for index, char in enumerate(line): if strict: # col_start == None == looking for start of a new column if col_start is None and char == sep: col_start = index # if this is the end of an inter-column gap elif index > 0 and char == gap and line[index - 1] == sep: columns.append((col_start, index - col_start)) col_start = None else: # if this is the end of an inter-column gap if index > 0 and char == gap and line[index - 1] == sep: columns.append((col_start, index - col_start)) col_start = index if line and line[-1] == sep and col_start is not None: columns.append((col_start, len(line) - col_start)) return columns
[docs]def _rekey_table(table, key): """Converts a list of records into a dict of records Converts a table of records as returned by _parse_section() of the form:: { "records": [ { "host": "heart", "io_gb": "148740.6", "ops": "27991", "users": "53", }, ... ] } Into a table of key-value pairs the form:: { "records": { "heart": { "io_gb": "148740.6", "ops": "27991", "users": "53", }, ... } } Does not handle degenerate keys when re-keying, so only some tables with a uniquely identifying key can be rekeyed. Args: table (dict): Output of the _parse_section() function key (str): Key to pull out of each element of table['records'] to use as the key for each record Returns: dict: Table with records expressed as key-value pairs instead of a list """ new_table = copy.deepcopy(table) new_records = {} for record in new_table['records']: new_key = record.pop(key) if new_key in new_records: raise KeyError("Degenerate key %s=%s" % (key, new_key)) new_records[new_key] = record new_table['records'] = new_records return new_table
[docs]def _hpss_timedelta_to_secs(timedelta_str): """Convert HPSS-encoded timedelta string into seconds Args: timedelta_str (str): String in form d-HH:MM:SS where d is the number of days, HH is hours, MM minutes, and SS seconds Returns: int: number of seconds represented by timedelta_str """ match = REX_TIMEDELTA.match(timedelta_str) if match: seconds = int(match.group(1)) * 86400 seconds += int(match.group(2)) * 3600 seconds += int(match.group(3)) * 60 seconds += int(match.group(4)) else: seconds = -1 return seconds
[docs]def _get_ascii_resolution(numeric_str): """Determines the maximum resolution of an ascii-encoded numerical value Necessary because HPSS logs contain numeric values at different and often-insufficient resolutions. For example, tiny but finite transfers can show up as taking 0.000 seconds, which results in infinitely fast transfers when calculated naively. This function gives us a means to guess at what the real speed might've been. Does not work with scientific notation. Args: numeric_str (str): An ascii-encoded integer or float Returns: float: The smallest number that can be expressed using the resolution provided with ``numeric_str`` """ if '.' in numeric_str: return 1.0 / 10**(float(len(numeric_str.rsplit('.', 1)[-1]))) return 1.0