Source code for tokio.connectors.nersc_lfsstate

#!/usr/bin/env python
"""
Tools to parse and index the outputs of Lustre's ``lfs`` and ``lctl`` commands
to quantify Lustre fullness and health.  Assumes inputs are generated by NERSC's
Lustre health monitoring cron jobs which periodically issue the following::

    echo "BEGIN $(date +%s)" >> osts.txt
    /usr/bin/lfs df >> osts.txt

    echo "BEGIN $(date +%s)" >> ost-map.txt
    /usr/sbin/lctl dl -t >> ost-map.txt

Accepts ASCII text files, or gzip-compressed text files.
"""

import sys
import gzip
import re
import warnings
import mimetypes
import tokio.connectors.lfshealth
from tokio.common import isstr

_REX_OST_MAP = re.compile(r'^\s*(\d+)\s+(\S+)\s+(\S+)\s+(snx\d+-\S+)\s+(\S+)\s+(\d+)\s+(\S+@\S+)\s*$')
"""Regular expression to match OSC/MDC lines

Matches output of ``lctl dl -t`` which takes the form::

    351 UP osc snx11025-OST0007-osc-ffff8875ac1e7c00 3f30f170-90e6-b332-b141-a6d4a94a1829 5 10.100.100.12@o2ib1

Intentionally skips MGC, LOV, and LMV lines.
"""

_REX_LFS_DF = re.compile(r'^\s*(snx\d+-\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+).\s+(\S+)\[([^:]+):(\d+)\]\s*$')
"""Regular expression to extract OST fullness levels

Matches output of ``lfs df`` which takes the form::

    snx11035-OST0000_UUID 90767651352 54512631228 35277748388  61% /scratch2[OST:0]

where the columns are

    * OST/MDT UID
    * kibibytes total
    * kibibytes in use
    * kibibytes available
    * percent fullness
    * file system mount, role, and ID

Carries the implicit assumption that all OSTs are prefixed with `snx`.
"""

[docs]class NerscLfsOstMap(dict): """Subclass of dictionary that self-populates with Lustre OST-OSS mapping. """
[docs] def __init__(self, cache_file=None): """Load the mapping of OSTs to OSSes. Args: cache_file (str, optional): Path to a cache file to load instead of issuing the ``lctl dl -t`` command """ super(NerscLfsOstMap, self).__init__(self) self.cache_file = cache_file self.load_ost_map_file()
[docs] def __repr__(self): """Serialize OST map into a format that resembles ``lctl dl -t``. Returns: str: Serialization of the OST to OSS mapping in a format similar to ``lctl dl -t``. Fixed-width columns are * index: OST/MDT index * status: up/down status * role: ``osc``, ``mdc``, etc * role_id: name with unique identifier for target * uuid: UUID of target * ref_count: number of references to target * nid: LNET identifier of the target """ repr_result = "" # Iterate over time steps ("BEGIN" lines) for timestamp in sorted(self.keys()): fs_data = self[timestamp] repr_result += "BEGIN %d\n" % timestamp repr_result += str(fs_data) return repr_result
[docs] def load_ost_map_file(self): """Parse the cached output of an OST map generated by ``lctl dl -t``. Reads the input OST map as given by the ``cache_file`` attribute and populates self with keys of the form:: { timestamp(int) : { file_system: { ost_name : { keys: values } } } } """ _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_file = gzip.open(self.cache_file, 'rt') else: input_file = open(self.cache_file, 'r') this_timestamp = None degenerate_keys = 0 load_str = [] for line in input_file: if line.startswith('BEGIN'): if degenerate_keys > 0: warnings.warn("%d degenerate keys found for timestamp %d" % (degenerate_keys, this_timestamp)) degenerate_keys = 0 # append previous data if load_str: self.__setitem__(this_timestamp, tokio.connectors.lfshealth.LfsOstMap(from_string='\n'.join(load_str))) # initialize new timestamp this_timestamp = int(line.split()[1]) if this_timestamp in self: warnings.warn("degenerate timestamp %d found" % this_timestamp) load_str = [] else: load_str.append(line) input_file.close()
[docs] def save_cache(self, output_file=None): """Serialize object into a form resembling the output of ``lctl dl -t``. Args: output_file (str): Path to a file to which the serialized output should be written. If None, print to stdout. """ if output_file is None: self._save_cache(sys.stdout) else: with open(output_file, 'w') as output: self._save_cache(output)
[docs] def _save_cache(self, output): """Serialize object into a form resembling the output of ``lctl dl -t``. Args: output (file): File-like object into which resulting text should be written. """ output.write(str(self))
[docs] def get_failovers(self): """Determine OSSes which are likely affected by a failover. Figure out the OSTs that are probably failed over and, for each time stamp and file system, return a list of abnormal OSSes and the expected number of OSTs per OSS. Returns: dict: Dictionary keyed by timestamps and whose values are dicts of the form:: { 'mode': int, 'abnormal_ips': [list of str] } where ``mode`` refers to the statistical mode of OSTs per OSS, and ``abnormal_ips`` is a list of strings containing the IP addresses of OSSes whose OST counts are not equal to the ``mode`` for that time stamp. """ resulting_data = {} for timestamp, fs_data in self.items(): resulting_data[timestamp] = fs_data.get_failovers() return resulting_data
[docs]class NerscLfsOstFullness(dict): """Subclass of dictionary that self-populates with Lustre OST fullness. """
[docs] def __init__(self, cache_file=None): """Load the fullness of OSTs Args: cache_file (str, optional): Path to a cache file to load instead of issuing the ``lfs df`` command """ super(NerscLfsOstFullness, self).__init__(self) self.cache_file = cache_file self.load_ost_fullness_file()
[docs] def __repr__(self): """Serialize OST fullness into a format that resembles ``lfs df``. Returns: str: Serialization of the OST fullness in a format similar to ``lfs df``. Columns are * Name of OST (e.g., snx11025-OST0001_UUID) * Total kibibytes on OST * Used kibibytes on OST * Available kibibytes on OST * Percent capacity used * Mount point, role, and OST ID """ repr_result = "" for timestamp in sorted(self.keys()): repr_result += "BEGIN %d\n" % timestamp fs_data = self[timestamp] for target_name in sorted(fs_data.keys()): obd_data = fs_data[target_name] for obd_name in sorted(list(obd_data.keys()), key=lambda x, y=obd_data: y[x]['target_index']): keyvalues = obd_data[obd_name] repr_result += "%s-%s_UUID %ld %ld %ld %3d%% %s[%s:%d]\n" % ( target_name, obd_name, keyvalues['total_kib'], keyvalues['used_kib'], keyvalues['remaining_kib'], # Note that lfs dl's percents are not divided by # avail_kib, but rather the sum of used and remaining. round(100.0 * keyvalues['used_kib'] / (keyvalues['remaining_kib'] + keyvalues['used_kib'])), keyvalues['mount_pt'], keyvalues['role'].upper(), keyvalues['target_index'], ) return repr_result
[docs] def load_ost_fullness_file(self): """Parse the cached output of OST fullness generated by ``lfs df``. Parses the output of a file containing concatenated outputs of `lfs df` separated by lines of the form `BEGIN 0000` where 0000 is the UNIX epoch time. """ _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_file = gzip.open(self.cache_file, 'rt') else: input_file = open(self.cache_file, 'r') this_timestamp = None degenerate_keys = 0 for line in input_file: if line.startswith('BEGIN'): if degenerate_keys > 0: warnings.warn("%d degenerate keys found for timestamp %d" % (degenerate_keys, this_timestamp)) this_timestamp = int(line.split()[1]) if this_timestamp in self: warnings.warn("degenerate timestamp %d found" % this_timestamp) self[this_timestamp] = {} degenerate_keys = 0 else: match = _REX_LFS_DF.search(line) if match is not None: file_system, target_name = re.findall('[^-_]+', match.group(1))[0:2] if file_system not in self[this_timestamp]: self[this_timestamp][file_system] = {} # Duplicates can happen if a file system is doubly mounted if target_name in self[this_timestamp][file_system]: degenerate_keys += 1 self[this_timestamp][file_system][target_name] = { 'total_kib': int(match.group(2)), 'used_kib': int(match.group(3)), 'remaining_kib': int(match.group(4)), 'mount_pt': match.group(6), 'role': match.group(7).lower(), 'target_index': int(match.group(8)), } input_file.close()
[docs] def save_cache(self, output_file=None): """Serialize object into a form resembling the output of ``lfs df``. Args: output_file (str): Path to a file to which the serialized output should be written. If None, print to stdout. """ if output_file is None: self._save_cache(sys.stdout) else: with open(output_file, 'w') as output: self._save_cache(output)
[docs] def _save_cache(self, output): """Serialize object into a form resembling the output of ``lfs df``. Args: output (file): File-like object into which resulting text should be written. """ output.write(str(self))