#!/usr/bin/env python
"""
Tools to parse and index the outputs of Lustre's ``lfs`` and ``lctl`` commands
to quantify Lustre fullness and health. Assumes inputs are generated by NERSC's
Lustre health monitoring cron jobs which periodically issue the following::
echo "BEGIN $(date +%s)" >> osts.txt
/usr/bin/lfs df >> osts.txt
echo "BEGIN $(date +%s)" >> ost-map.txt
/usr/sbin/lctl dl -t >> ost-map.txt
Accepts ASCII text files, or gzip-compressed text files.
"""
import sys
import gzip
import re
import warnings
import mimetypes
import tokio.connectors.lfshealth
from tokio.common import isstr
_REX_OST_MAP = re.compile(r'^\s*(\d+)\s+(\S+)\s+(\S+)\s+(snx\d+-\S+)\s+(\S+)\s+(\d+)\s+(\S+@\S+)\s*$')
"""Regular expression to match OSC/MDC lines
Matches output of ``lctl dl -t`` which takes the form::
351 UP osc snx11025-OST0007-osc-ffff8875ac1e7c00 3f30f170-90e6-b332-b141-a6d4a94a1829 5 10.100.100.12@o2ib1
Intentionally skips MGC, LOV, and LMV lines.
"""
_REX_LFS_DF = re.compile(r'^\s*(snx\d+-\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+).\s+(\S+)\[([^:]+):(\d+)\]\s*$')
"""Regular expression to extract OST fullness levels
Matches output of ``lfs df`` which takes the form::
snx11035-OST0000_UUID 90767651352 54512631228 35277748388 61% /scratch2[OST:0]
where the columns are
* OST/MDT UID
* kibibytes total
* kibibytes in use
* kibibytes available
* percent fullness
* file system mount, role, and ID
Carries the implicit assumption that all OSTs are prefixed with `snx`.
"""
[docs]class NerscLfsOstMap(dict):
"""Subclass of dictionary that self-populates with Lustre OST-OSS mapping.
"""
[docs] def __init__(self, cache_file=None):
"""Load the mapping of OSTs to OSSes.
Args:
cache_file (str, optional): Path to a cache file to load instead of
issuing the ``lctl dl -t`` command
"""
super(NerscLfsOstMap, self).__init__(self)
self.cache_file = cache_file
self.load_ost_map_file()
[docs] def __repr__(self):
"""Serialize OST map into a format that resembles ``lctl dl -t``.
Returns:
str: Serialization of the OST to OSS mapping in a format similar to
``lctl dl -t``. Fixed-width columns are
* index: OST/MDT index
* status: up/down status
* role: ``osc``, ``mdc``, etc
* role_id: name with unique identifier for target
* uuid: UUID of target
* ref_count: number of references to target
* nid: LNET identifier of the target
"""
repr_result = ""
# Iterate over time steps ("BEGIN" lines)
for timestamp in sorted(self.keys()):
fs_data = self[timestamp]
repr_result += "BEGIN %d\n" % timestamp
repr_result += str(fs_data)
return repr_result
[docs] def load_ost_map_file(self):
"""Parse the cached output of an OST map generated by ``lctl dl -t``.
Reads the input OST map as given by the ``cache_file`` attribute and
populates self with keys of the form::
{ timestamp(int) : { file_system: { ost_name : { keys: values } } } }
"""
_, encoding = mimetypes.guess_type(self.cache_file)
if encoding == 'gzip':
input_file = gzip.open(self.cache_file, 'rt')
else:
input_file = open(self.cache_file, 'r')
this_timestamp = None
degenerate_keys = 0
load_str = []
for line in input_file:
if line.startswith('BEGIN'):
if degenerate_keys > 0:
warnings.warn("%d degenerate keys found for timestamp %d" % (degenerate_keys, this_timestamp))
degenerate_keys = 0
# append previous data
if load_str:
self.__setitem__(this_timestamp,
tokio.connectors.lfshealth.LfsOstMap(from_string='\n'.join(load_str)))
# initialize new timestamp
this_timestamp = int(line.split()[1])
if this_timestamp in self:
warnings.warn("degenerate timestamp %d found" % this_timestamp)
load_str = []
else:
load_str.append(line)
input_file.close()
[docs] def save_cache(self, output_file=None):
"""Serialize object into a form resembling the output of ``lctl dl -t``.
Args:
output_file (str): Path to a file to which the serialized output
should be written. If None, print to stdout.
"""
if output_file is None:
self._save_cache(sys.stdout)
else:
with open(output_file, 'w') as output:
self._save_cache(output)
[docs] def _save_cache(self, output):
"""Serialize object into a form resembling the output of ``lctl dl -t``.
Args:
output (file): File-like object into which resulting text should be
written.
"""
output.write(str(self))
[docs] def get_failovers(self):
"""Determine OSSes which are likely affected by a failover.
Figure out the OSTs that are probably failed over and, for each time
stamp and file system, return a list of abnormal OSSes and the expected
number of OSTs per OSS.
Returns:
dict: Dictionary keyed by timestamps and whose values are dicts of
the form::
{
'mode': int,
'abnormal_ips': [list of str]
}
where ``mode`` refers to the statistical mode of OSTs per OSS, and
``abnormal_ips`` is a list of strings containing the IP addresses of
OSSes whose OST counts are not equal to the ``mode`` for that
time stamp.
"""
resulting_data = {}
for timestamp, fs_data in self.items():
resulting_data[timestamp] = fs_data.get_failovers()
return resulting_data
[docs]class NerscLfsOstFullness(dict):
"""Subclass of dictionary that self-populates with Lustre OST fullness.
"""
[docs] def __init__(self, cache_file=None):
"""Load the fullness of OSTs
Args:
cache_file (str, optional): Path to a cache file to load instead of
issuing the ``lfs df`` command
"""
super(NerscLfsOstFullness, self).__init__(self)
self.cache_file = cache_file
self.load_ost_fullness_file()
[docs] def __repr__(self):
"""Serialize OST fullness into a format that resembles ``lfs df``.
Returns:
str: Serialization of the OST fullness in a format similar to
``lfs df``. Columns are
* Name of OST (e.g., snx11025-OST0001_UUID)
* Total kibibytes on OST
* Used kibibytes on OST
* Available kibibytes on OST
* Percent capacity used
* Mount point, role, and OST ID
"""
repr_result = ""
for timestamp in sorted(self.keys()):
repr_result += "BEGIN %d\n" % timestamp
fs_data = self[timestamp]
for target_name in sorted(fs_data.keys()):
obd_data = fs_data[target_name]
for obd_name in sorted(list(obd_data.keys()),
key=lambda x, y=obd_data: y[x]['target_index']):
keyvalues = obd_data[obd_name]
repr_result += "%s-%s_UUID %ld %ld %ld %3d%% %s[%s:%d]\n" % (
target_name,
obd_name,
keyvalues['total_kib'],
keyvalues['used_kib'],
keyvalues['remaining_kib'],
# Note that lfs dl's percents are not divided by
# avail_kib, but rather the sum of used and remaining.
round(100.0 * keyvalues['used_kib'] / (keyvalues['remaining_kib']
+ keyvalues['used_kib'])),
keyvalues['mount_pt'],
keyvalues['role'].upper(),
keyvalues['target_index'], )
return repr_result
[docs] def load_ost_fullness_file(self):
"""Parse the cached output of OST fullness generated by ``lfs df``.
Parses the output of a file containing concatenated outputs of `lfs df`
separated by lines of the form `BEGIN 0000` where 0000 is the UNIX epoch
time.
"""
_, encoding = mimetypes.guess_type(self.cache_file)
if encoding == 'gzip':
input_file = gzip.open(self.cache_file, 'rt')
else:
input_file = open(self.cache_file, 'r')
this_timestamp = None
degenerate_keys = 0
for line in input_file:
if line.startswith('BEGIN'):
if degenerate_keys > 0:
warnings.warn("%d degenerate keys found for timestamp %d"
% (degenerate_keys, this_timestamp))
this_timestamp = int(line.split()[1])
if this_timestamp in self:
warnings.warn("degenerate timestamp %d found" % this_timestamp)
self[this_timestamp] = {}
degenerate_keys = 0
else:
match = _REX_LFS_DF.search(line)
if match is not None:
file_system, target_name = re.findall('[^-_]+', match.group(1))[0:2]
if file_system not in self[this_timestamp]:
self[this_timestamp][file_system] = {}
# Duplicates can happen if a file system is doubly mounted
if target_name in self[this_timestamp][file_system]:
degenerate_keys += 1
self[this_timestamp][file_system][target_name] = {
'total_kib': int(match.group(2)),
'used_kib': int(match.group(3)),
'remaining_kib': int(match.group(4)),
'mount_pt': match.group(6),
'role': match.group(7).lower(),
'target_index': int(match.group(8)),
}
input_file.close()
[docs] def save_cache(self, output_file=None):
"""Serialize object into a form resembling the output of ``lfs df``.
Args:
output_file (str): Path to a file to which the serialized output
should be written. If None, print to stdout.
"""
if output_file is None:
self._save_cache(sys.stdout)
else:
with open(output_file, 'w') as output:
self._save_cache(output)
[docs] def _save_cache(self, output):
"""Serialize object into a form resembling the output of ``lfs df``.
Args:
output (file): File-like object into which resulting text should be
written.
"""
output.write(str(self))