#!/usr/bin/env python
"""
Provide a TOKIO-aware HDF5 class that knows how to interpret schema versions
encoded in a TOKIO HDF5 file and translate a universal schema into file-specific
schemas. Also supports dynamically mapping static HDF5 datasets into new
derived datasets dynamically.
"""
import math
import time
import datetime
import warnings
import h5py
import numpy
import pandas
import tokio.common
from tokio.connectors._hdf5 import (convert_counts_rates, #pylint: disable=unused-import
map_dataset,
demux_column,
get_timestamps,
get_timestamps_key,
reduce_dataset_name,
DEFAULT_TIMESTAMP_DATASET,
TIMESTAMP_KEY,
COLUMN_NAME_KEY)
SCHEMA = {
None: {},
"1": {
"datatargets/readbytes": "/datatargets/readbytes",
"datatargets/writebytes": "/datatargets/writebytes",
"datatargets/readrates": "/datatargets/readrates",
"datatargets/writerates": "/datatargets/writerates",
"datatargets/readops": "/datatargets/readops",
"datatargets/writeops": "/datatargets/writeops",
"datatargets/readoprates": "/datatargets/readoprates",
"datatargets/writeoprates": "/datatargets/writeoprates",
"mdtargets/readbytes": "/mdtargets/readbytes",
"mdtargets/writebytes": "/mdtargets/writebytes",
"mdtargets/readrates": "/mdtargets/readrates",
"mdtargets/writerates": "/mdtargets/writerates",
"mdtargets/readops": "/mdtargets/readops",
"mdtargets/writeops": "/mdtargets/writeops",
"mdtargets/readoprates": "/mdtargets/readoprates",
"mdtargets/writeoprates": "/mdtargets/writeoprates",
"mdtargets/opens": "/mdtargets/opens",
"mdtargets/openrates": "/mdtargets/openrates",
"mdtargets/closes": "/mdtargets/closes",
"mdtargets/closerates": "/mdtargets/closerates",
"mdtargets/mknods": "/mdtargets/mknods",
"mdtargets/mknodrates": "/mdtargets/mknodrates",
"mdtargets/links": "/mdtargets/links",
"mdtargets/linkrates": "/mdtargets/linkrates",
"mdtargets/unlinks": "/mdtargets/unlinks",
"mdtargets/unlinkrates": "/mdtargets/unlinkrates",
"mdtargets/mkdirs": "/mdtargets/mkdirs",
"mdtargets/mkdirrates": "/mdtargets/mkdirrates",
"mdtargets/rmdirs": "/mdtargets/rmdirs",
"mdtargets/rmdirrates": "/mdtargets/rmdirrates",
"mdtargets/renames": "/mdtargets/renames",
"mdtargets/renamerates": "/mdtargets/renamerates",
"mdtargets/getxattrs": "/mdtargets/getxattrs",
"mdtargets/getxattrrates": "/mdtargets/getxattrrates",
"mdtargets/statfss": "/mdtargets/statfss",
"mdtargets/statfsrates": "/mdtargets/statfsrates",
"mdtargets/setattrs": "/mdtargets/setattrs",
"mdtargets/setattrrates": "/mdtargets/setattrrates",
"mdtargets/getattrs": "/mdtargets/getattrs",
"mdtargets/getattrrates": "/mdtargets/getattrrates",
"mdservers/cpuuser": "/mdservers/cpuuser",
"mdservers/cpusys": "/mdservers/cpusys",
"mdservers/cpuidle": "/mdservers/cpuidle",
"mdservers/cpuload": "/mdservers/cpuload",
"mdservers/memfree": "/mdservers/memfree",
"mdservers/memused": "/mdservers/memused",
"mdservers/memcached": "/mdservers/memcached",
"mdservers/membuffered": "/mdservers/membuffered",
"mdservers/memslab": "/mdservers/memslab",
"mdservers/memslab_unrecl": "/mdservers/memslab_unrecl",
"mdservers/memtotal": "/mdservers/memtotal",
"mdservers/netinbytes": "/mdservers/netinbytes",
"mdservers/netoutbytes": "/mdservers/netoutbytes",
"dataservers/cpuuser": "/dataservers/cpuuser",
"dataservers/cpusys": "/dataservers/cpusys",
"dataservers/cpuidle": "/dataservers/cpuidle",
"dataservers/cpuload": "/dataservers/cpuload",
"dataservers/memfree": "/dataservers/memfree",
"dataservers/memused": "/dataservers/memused",
"dataservers/memcached": "/dataservers/memcached",
"dataservers/membuffered": "/dataservers/membuffered",
"dataservers/memslab": "/dataservers/memslab",
"dataservers/memslab_unrecl": "/dataservers/memslab_unrecl",
"dataservers/memtotal": "/dataservers/memtotal",
"dataservers/netinbytes": "/dataservers/netinbytes",
"dataservers/netoutbytes": "/dataservers/netoutbytes",
"fullness/bytes": "/fullness/bytes",
"fullness/bytestotal": "/fullness/bytestotal",
"fullness/inodes": "/fullness/inodes",
"fullness/inodestotal": "/fullness/inodestotal",
"failover/datatargets": "/failover/datatargets",
"failover/mdtargets": "/failover/mdtargets",
},
}
# Map keys which don't exist as datasets in the underlying HDF5 but can be
# calculated from datasets that _do_ exist to the functions that do these
# conversions. This table is only consulted when a dataset is not found
# directly in the underlying HDF5 _and_ a mapping from the SCHEMA table
# above does not return a match, so this table contains what appear to be
# some circular references (e.g., both datatargets/readbytes and
# datatargets/readrates). This allows any valid HDF5 file to contain either
# bytes or rates but have them all present the same datasets to the downstream
# application.
SCHEMA_DATASET_PROVIDERS = {
None: {
"datatargets/readbytes": {
'func': convert_counts_rates,
'args': {
'from_key': 'OSTReadGroup/OSTBulkReadDataSet',
'to_rates': False,
'transpose': True,
},
},
"datatargets/writebytes": {
'func': convert_counts_rates,
'args': {
'from_key': 'OSTWriteGroup/OSTBulkWriteDataSet',
'to_rates': False,
'transpose': True,
},
},
"datatargets/readrates": {
'func': map_dataset,
'args': {
'from_key': "/OSTReadGroup/OSTBulkReadDataSet",
'transpose': True,
},
},
"datatargets/writerates": {
'func': map_dataset,
'args': {
'from_key': "/OSTWriteGroup/OSTBulkWriteDataSet",
'transpose': True,
},
},
"dataservers/cpuload": {
'func': map_dataset,
'args': {
'from_key': "/OSSCPUGroup/OSSCPUDataSet",
'transpose': True,
},
},
"mdservers/cpuload": {
'func': map_dataset,
'args': {
'from_key': "/MDSCPUGroup/MDSCPUDataSet",
'transpose': True,
'force2d': True,
},
},
### MDSOpsGroup, as counts per timestep
"mdtargets/opens": {
'func': demux_column,
'args': {
'from_key': "/MDSOpsGroup/MDSOpsDataSet",
'column': 'open',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True
},
},
"mdtargets/closes": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'close',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/mknods": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'mknod',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/links": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'link',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/unlinks": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'unlink',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/mkdirs": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'mkdir',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/rmdirs": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'rmdir',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/renames": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'rename',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/getxattrs": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'getxattr',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/statfss": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'statfs',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/setattrs": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'setattr',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
"mdtargets/getattrs": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'getattr',
'apply_timestep_func': lambda x, timestep: x * timestep,
'transpose': True,
},
},
### MDSOpsGroup, as counts per second
"mdtargets/openrates": {
'func': demux_column,
'args': {
'from_key': "/MDSOpsGroup/MDSOpsDataSet",
'column': 'open',
'transpose': True
},
},
"mdtargets/closerates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'close',
'transpose': True,
},
},
"mdtargets/mknodrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'mknod',
'transpose': True,
},
},
"mdtargets/linkrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'link',
'transpose': True,
},
},
"mdtargets/unlinkrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'unlink',
'transpose': True,
},
},
"mdtargets/mkdirrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'mkdir',
'transpose': True,
},
},
"mdtargets/rmdirrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'rmdir',
'transpose': True,
},
},
"mdtargets/renamerates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'rename',
'transpose': True,
},
},
"mdtargets/getxattrrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'getxattr',
'transpose': True,
},
},
"mdtargets/statfsrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'statfs',
'transpose': True,
},
},
"mdtargets/setattrrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'setattr',
'transpose': True,
},
},
"mdtargets/getattrrates": {
'func': demux_column,
'args': {
'from_key': '/MDSOpsGroup/MDSOpsDataSet',
'column': 'getattr',
'transpose': True,
},
},
},
"1": {
"datatargets/readbytes": {
'func': convert_counts_rates,
'args': {
'from_key': 'datatargets/readrates',
'to_rates': False,
},
},
"datatargets/writebytes": {
'func': convert_counts_rates,
'args': {
'from_key': 'datatargets/writerates',
'to_rates': False,
},
},
"datatargets/readrates": {
'func': convert_counts_rates,
'args': {
'from_key': 'datatargets/readbytes',
'to_rates': True,
},
},
"datatargets/writerates": {
'func': convert_counts_rates,
'args': {
'from_key': 'datatargets/writebytes',
'to_rates': True,
},
},
"mdtargets/opens": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/openrates",
'to_rates': False,
},
},
"mdtargets/closes": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/closerates",
'to_rates': False,
},
},
"mdtargets/mknods": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/mknodrates",
'to_rates': False,
},
},
"mdtargets/links": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/linkrates",
'to_rates': False,
},
},
"mdtargets/unlinks": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/unlinkrates",
'to_rates': False,
},
},
"mdtargets/mkdirs": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/mkdirrates",
'to_rates': False,
},
},
"mdtargets/rmdirs": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/rmdirrates",
'to_rates': False,
},
},
"mdtargets/renames": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/renamerates",
'to_rates': False,
},
},
"mdtargets/getxattrs": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/getxattrrates",
'to_rates': False,
},
},
"mdtargets/statfss": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/statfsrates",
'to_rates': False,
},
},
"mdtargets/setattrs": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/setattrrates",
'to_rates': False,
},
},
"mdtargets/getattrs": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/getattrrates",
'to_rates': False,
},
},
"mdtargets/openrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/opens",
'to_rates': True,
},
},
"mdtargets/closerates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/closes",
'to_rates': True,
},
},
"mdtargets/mknodrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/mknods",
'to_rates': True
},
},
"mdtargets/linkrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/links",
'to_rates': True,
},
},
"mdtargets/unlinkrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/unlinks",
'to_rates': True,
},
},
"mdtargets/mkdirrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/mkdirs",
'to_rates': True,
},
},
"mdtargets/rmdirrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/rmdirs",
'to_rates': True,
},
},
"mdtargets/renamerates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/renames",
'to_rates': True,
},
},
"mdtargets/getxattrrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/getxattrs",
'to_rates': True,
},
},
"mdtargets/statfsrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/statfss",
'to_rates': True,
},
},
"mdtargets/setattrrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/setattrs",
'to_rates': True,
},
},
"mdtargets/getattrrates": {
'func': convert_counts_rates,
'args': {
'from_key': "mdtargets/getattrs",
'to_rates': True,
},
},
},
}
H5LMT_COLUMN_ATTRS = {
'MDSOpsGroup/MDSOpsDataSet': 'OpNames',
'OSTReadGroup/OSTBulkReadDataSet': 'OSTNames',
'OSTWriteGroup/OSTBulkWriteDataSet': 'OSTNames',
'OSSCPUGroup/OSSCPUDataSet': 'OSSNames',
}
[docs]class Hdf5(h5py.File):
"""Hdf5 file class with extra hooks to parse different schemas
Provides an h5py.File-like class with added methods to provide a generic
API that can decode different schemata used to store file system load
data.
Attributes:
always_translate (bool): If True, looking up datasets by keys will
always attempt to map that key to a new dataset according to the
schema even if the key matches the name of an existing dataset.
dataset_providers (dict): Map of logical dataset names (keys) to dicts
that describe the functions used to convert underlying literal
dataset data into the format expected when dereferencing the logical
dataset name.
schema (dict): Map of logical dataset names (keys) to the literal
dataset names in the underlying file (values)
_version (str): Defined and used at initialization time to determine
what schema to apply to map the HDF5 connector API to the underlying
HDF5 file.
_timesteps (dict): Keyed by dataset name (str) and has values
corresponding to the timestep (in seconds) between each sampled
datum in that dataset.
"""
[docs] def __init__(self, *args, **kwargs):
"""Initialize an HDF5 file
This is just an HDF5 file object; the magic is in the additional methods
and indexing that are provided by the TOKIO Time Series-specific HDF5
object.
Args:
ignore_version (bool): If true, do not throw KeyError if the HDF5
file does not contain a valid version.
"""
ignore_version = kwargs.pop('ignore_version', False)
super(Hdf5, self).__init__(*args, **kwargs)
# If True, always translate __getitem__ requests according to the
# schema, even if __getitem__ requests a dataset that exists
self.always_translate = False
self._version = self.attrs.get('version')
if isinstance(self._version, bytes):
self._version = self._version.decode()
self._timesteps = {}
# Connect the schema map to this object
if self._version in SCHEMA:
self.schema = SCHEMA[self._version]
elif self._version is None:
self.schema = {}
elif not ignore_version:
raise KeyError("Unknown schema version %s" % self._version)
# Connect the schema dataset providers to this object
if self._version in SCHEMA_DATASET_PROVIDERS:
self.dataset_providers = SCHEMA_DATASET_PROVIDERS[self._version]
else:
self.dataset_providers = {}
[docs] def __getitem__(self, key):
"""Resolve dataset names into actual data
Provides a single interface through which standard keys can be
dereferenced and a semantically consistent view of data is returned
regardless of the schema of the underlying HDF5 file.
Passes through the underlying h5py.Dataset via direct access or a 1:1
mapping between standardized key and an underlying dataset name, or a
numpy array if an underlying h5py.Dataset must be transformed to match the
structure and semantics of the data requested.
Can also suffix datasets with special meta-dataset names
(e.g., "/missing") to access data that is related to the root
dataset.
Args:
key (str): The standard name of a dataset to be accessed.
Returns:
h5py.Dataset or numpy.ndarray:
* h5py.Dataset if key is a literal dataset name
* h5py.Dataset if key maps directly to a literal dataset name
given the file schema version
* numpy.ndarray if key maps to a provider function that can
calculate the requested data
"""
if not self.always_translate and super(Hdf5, self).__contains__(key):
# If the dataset exists in the underlying HDF5 file, just return it
return super(Hdf5, self).__getitem__(key)
# Quirky way to access the missing data of a dataset through the
# __getitem__ API via recursion
base_key, modifier = reduce_dataset_name(key)
if modifier and modifier == 'missing':
return self.get_missing(base_key)
resolved_key, provider = self._resolve_schema_key(key)
if resolved_key:
# Otherwise, attempt to map the logical key to a literal key
return super(Hdf5, self).__getitem__(resolved_key)
elif provider:
# Or run the value through a key provider
provider_func = provider.get('func')
provider_args = provider.get('args', {})
if provider_func is None:
errmsg = "No provider function for %s" % key
raise KeyError(errmsg)
else:
return provider_func(self, **provider_args)
else:
# This should never be hit based on the possible outputs of _resolve_schema_key
errmsg = "_resolve_schema_key: undefined output from %s" % key
raise KeyError(errmsg)
[docs] def _resolve_schema_key(self, key):
"""
Given a key, either return a key that can be used to index self
directly, or return a provider function and arguments to generate the
dataset dynamically
"""
if super(Hdf5, self).__contains__(key):
# If the dataset exists in the underlying HDF5 file, just return it
return key, None
# Straight mapping between the key and a dataset
key = key.lstrip('/') if tokio.common.isstr(key) else key
if key in self.schema:
hdf5_key = self.schema.get(key)
if super(Hdf5, self).__contains__(hdf5_key):
return hdf5_key, None
# Key maps to a transformation
if key in self.dataset_providers:
return None, self.dataset_providers[key]
errmsg = "Unknown key %s in %s" % (key, self.filename)
raise KeyError(errmsg)
[docs] def get_version(self, dataset_name=None):
"""Get the version attribute from an HDF5 file dataset
Args:
dataset_name (str): Name of dataset to retrieve version. If None,
return the global file's version.
Returns:
str: The version string for the specified dataset
"""
if dataset_name is None:
return self._version
else:
# resolve dataset name
dataset = self.__getitem__(dataset_name)
try:
# dataset can be either an HDF5 dataset or numpy.ndarray
version = dataset.attrs.get("version")
except AttributeError:
version = None
if version is None:
version = self._version
if isinstance(version, bytes):
return version.decode() # for python3
return version
[docs] def set_version(self, version, dataset_name=None):
"""Set the version attribute from an HDF5 file dataset
Provide a portable way to set the global schema version or the version
of a specific dataset.
Args:
version (str): The new version to be set
dataset_name (str): Name of dataset to set version. If None,
set the global file's version.
"""
if dataset_name is None:
self._version = version
return self._version
# resolve dataset name
dataset = self.__getitem__(dataset_name)
if dataset is None:
raise KeyError("Dataset %s does not exist" % dataset_name)
dataset.attrs["version"] = version
return version
[docs] def get_columns(self, dataset_name):
"""Get the column names of a dataset
Args:
dataset_name (str): name of dataset whose columns will be retrieved
Returns:
numpy.ndarray: Array of column names, or empty if no columns defined
"""
# Look for special 'missing' dataset hack
reduced_dataset_name, _ = reduce_dataset_name(dataset_name)
if reduced_dataset_name != dataset_name:
dataset_name = reduced_dataset_name
if self.get_version(dataset_name=dataset_name) is None:
return self._get_columns_h5lmt(dataset_name)
return self.__getitem__(dataset_name).attrs.get(COLUMN_NAME_KEY).astype('U')
[docs] def _get_columns_h5lmt(self, dataset_name):
"""Get the column names of an h5lmt dataset
"""
dataset = self.__getitem__(dataset_name)
orig_dataset_name = dataset_name.lstrip('/')
dataset_name = dataset.name.lstrip('/')
if dataset_name == 'MDSOpsGroup/MDSOpsDataSet' and orig_dataset_name != dataset_name:
return numpy.array([SCHEMA_DATASET_PROVIDERS[None][orig_dataset_name]['args']['column']])
elif dataset_name in H5LMT_COLUMN_ATTRS:
return dataset.attrs[H5LMT_COLUMN_ATTRS[dataset_name]].astype('U')
elif dataset_name == 'MDSCPUGroup/MDSCPUDataSet':
return numpy.array(['_unknown'])
elif dataset_name == 'FSMissingGroup/FSMissingDataSet':
return numpy.array(['_unknown%04d' % i for i in range(dataset.shape[1])])
else:
raise KeyError('Unknown h5lmt dataset %s' % dataset_name)
[docs] def get_timestep(self, dataset_name, timestamps=None):
"""
Cache or calculate the timestep for a dataset
"""
if dataset_name not in self._timesteps:
if timestamps is None:
timestamps = self.get_timestamps(dataset_name)[0:2]
self._timesteps[dataset_name] = timestamps[1] - timestamps[0]
return self._timesteps[dataset_name]
[docs] def get_index(self, dataset_name, target_datetime):
"""
Turn a datetime object into an integer that can be used to reference
specific times in datasets.
"""
timestamps = self.get_timestamps(dataset_name)[0:2]
timestep = self.get_timestep(dataset_name, timestamps)
t_start = datetime.datetime.fromtimestamp(timestamps[0])
return int((target_datetime - t_start).total_seconds() / timestep)
[docs] def get_timestamps(self, dataset_name):
"""Return timestamps dataset corresponding to given dataset name
This method returns a dataset, not a numpy array, so you can face severe
performance penalties trying to iterate directly on the return value!
To iterate over timestamps, it is almost always better to dereference
the dataset to get a numpy array and iterate over that in memory.
Args:
dataset_name (str): Logical name of dataset whose timestamps should
be retrieved
Returns:
h5py.Dataset: The dataset containing the timestamps corresponding to
dataset_name.
"""
return get_timestamps(self, dataset_name)
[docs] def get_missing(self, dataset_name, inverse=False):
"""Convert a dataset into a matrix indicating the abscence of data
Args:
dataset_name (str): name of dataset to access
inverse (bool): return 0 for missing and 1 for present if True
Returns:
numpy.ndarray: Array of numpy.int8 of 1 and 0 to indicate the
presence or absence of specific elements
"""
if self.get_version(dataset_name=dataset_name) is None:
return self._get_missing_h5lmt(dataset_name, inverse=inverse)
return missing_values(self[dataset_name][:], inverse)
[docs] def _get_missing_h5lmt(self, dataset_name, inverse=False):
"""Return the FSMissingGroup dataset from an H5LMT file
Encodes a hot mess of hacks to return something that looks like what
`get_missing()` would return for a real dataset.
Args:
dataset_name (str): name of dataset to access
inverse (bool): return 0 for missing and 1 for present if True
Returns:
numpy.ndarray: Array of numpy.int8 of 1 and 0 to indicate the
presence or absence of specific elements
"""
dataset = self.__getitem__(dataset_name)
missing_dataset = self.get('/FSMissingGroup/FSMissingDataSet')
if len(dataset.shape) == 1:
result = numpy.zeros((dataset.shape[0], 1), dtype=numpy.int8)
elif dataset.shape == missing_dataset.shape:
result = missing_dataset[:, :].astype('i8').T
else:
result = numpy.zeros(dataset[:, :].shape, dtype=numpy.int8).T
if inverse:
return (~result.astype(bool)).astype('i8')
return result
[docs] def to_dataframe(self, dataset_name):
"""Convert a dataset into a dataframe
Args:
dataset_name (str): dataset name to convert to DataFrame
Returns:
pandas.DataFrame: DataFrame indexed by datetime objects
corresponding to timestamps, columns labeled appropriately, and
values from the dataset
"""
if self.get_version(dataset_name=dataset_name) is None:
return self._to_dataframe_h5lmt(dataset_name)
return self._to_dataframe(dataset_name)
[docs] def _to_dataframe(self, dataset_name):
"""Convert a dataset into a dataframe via TOKIO HDF5 schema
"""
values = self[dataset_name][:]
columns = self.get_columns(dataset_name)
timestamps = self.get_timestamps(dataset_name)[...]
if len(columns) < values.shape[1]:
columns.resize(values.shape[1])
# transform missing data into NaNs
mask = missing_values(values) != 0
try:
values[mask] = numpy.nan
except ValueError: # ValueError: cannot convert float NaN to integer
# don't bother converting non-float arrays' -0.0 into NaNs
pass
dataframe = pandas.DataFrame(data=values,
index=[datetime.datetime.fromtimestamp(t) for t in timestamps],
columns=columns)
return dataframe
[docs] def _to_dataframe_h5lmt(self, dataset_name):
"""Convert a dataset into a dataframe via H5LMT native schema
"""
normed_name, modifier = reduce_dataset_name(dataset_name)
if not modifier:
normed_name = dataset_name.lstrip('/')
else:
normed_name = normed_name.lstrip('/')
col_header_key = H5LMT_COLUMN_ATTRS.get(normed_name)
# Hack around datasets that lack column headers to retrieve column names
if col_header_key is not None:
columns = self[dataset_name].attrs[col_header_key]
elif normed_name == 'FSMissingGroup/FSMissingDataSet':
columns = self['/OSSCPUGroup/OSSCPUDataSet'].attrs['OSSNames']
elif normed_name == 'MDSCPUGroup/MDSCPUDataSet':
columns = ['unknown_mds']
else:
columns = None
# Get timestamps through regular API
timestamps = self.get_timestamps(normed_name)[...]
# Retrieve and transform data using H5LMT schema directly
if normed_name == 'FSStepsGroup/FSStepsDataSet':
values = None
else:
num_dims = len(self[dataset_name].shape)
if num_dims == 1:
values = self[dataset_name][:]
num_indices_expected = self[dataset_name].shape[0]
elif num_dims == 2:
# only transpose if dataset_name refers to a native type
if normed_name in SCHEMA_DATASET_PROVIDERS[None]:
values = self[dataset_name][:]
columns = self.get_columns(normed_name)
else:
values = self[dataset_name][:].T
num_indices_expected = values.shape[0]
elif num_dims > 2:
raise Exception("Can only convert 1d or 2d datasets to dataframe")
indices = [datetime.datetime.fromtimestamp(t) for t in timestamps]
num_indices = len(indices)
# if an HDF5 file was initialized but not fully populated, the number of
# timestamps (indices) might be less than the size of the matrix being
# stored. this is OK as long as there are no valid values in the part
# of the matrix that don't have corresponding indices.
if num_indices_expected != num_indices:
warning_msg = "dataset size and timestamps are inconsistent (%d vs %d values; missing sum is %f)" \
% (num_indices_expected, num_indices, values[num_indices:, :].sum())
missings = self.get_missing(normed_name)
# print("shape of defined range: (%d, %d)" % missings[:num_indices, :].shape)
# print("shape of undefined range: (%d, %d)" % missings[num_indices:, :].shape)
# print("missing elements in defined range:", missings[:num_indices, :].sum())
# print("missing elements in undefined range:", missings[num_indices:, :].sum())
# if we have more indices than values, this is unrecoverable because
# we don't know where in `values` data was written twice
if num_indices > num_indices_expected:
raise IndexError(warning_msg)
# h5lmt format doesn't support distinguishing uninitialized elements
# from initialized-but-zero, so we assume that a zero indicates
# missing here
if (self.get_version() is None and values[num_indices:, :].sum() != 0.0) \
or (self.get_version() is not None and ~(missings[num_indices:, :].astype(bool)).sum() != 0):
raise IndexError(warning_msg)
if num_dims == 1:
values = values[:num_indices]
else:
values = values[:num_indices, :]
warnings.warn(warning_msg)
return pandas.DataFrame(data=values,
index=indices,
columns=columns)
[docs] def to_timeseries(self, dataset_name, light=False):
"""Creates a TimeSeries representation of a dataset
Create a TimeSeries dataset object with the data from an existing HDF5
dataset.
Responsible for setting timeseries.dataset_name, timeseries.columns, timeseries.dataset,
timeseries.dataset_metadata, timeseries.group_metadata, timeseries.timestamp_key
Args:
dataset_name (str): Name of existing dataset in self to convert into
a TimeSeries object
light (bool): If True, don't actually load datasets into memory;
reference them directly into the HDF5 file
Returns:
tokio.timeseries.TimeSeries: The in-memory representation of the
given dataset.
"""
timeseries = tokio.timeseries.TimeSeries()
timeseries.dataset_name = dataset_name
try:
dataset = self[dataset_name]
except KeyError:
# can't attach because dataset doesn't exist; pass this back to caller so it can init
return None
timeseries.dataset = dataset if light else dataset[:, :]
# load and decode version of dataset and file schema
timeseries.global_version = self['/'].attrs.get('version')
timeseries.version = self.get_version(dataset_name)
if isinstance(timeseries.version, bytes):
timeseries.version = timeseries.version.decode()
# copy columns into memory
columns = self.get_columns(dataset_name)
timeseries.set_columns(columns)
# copy metadata into memory
for key, value in dataset.attrs.items():
if isinstance(value, bytes):
timeseries.dataset_metadata[key] = value.decode()
else:
timeseries.dataset_metadata[key] = value
for key, value in dataset.parent.attrs.items():
if isinstance(value, bytes):
timeseries.group_metadata[key] = value.decode()
else:
timeseries.group_metadata[key] = value
timeseries.timestamp_key = get_timestamps_key(self, dataset_name)
timeseries.timestamps = self[timeseries.timestamp_key]
timeseries.timestamps = timeseries.timestamps if light else timeseries.timestamps[:]
timeseries.timestep = timeseries.timestamps[1] - timeseries.timestamps[0]
return timeseries
[docs] def commit_timeseries(self, timeseries, **kwargs):
"""Writes contents of a TimeSeries object into a group
Args:
timeseries (tokio.timeseries.TimeSeries): the time series to save
as a dataset within self
kwargs (dict): Extra arguments to pass to self.create_dataset()
"""
extra_dataset_args = {
'dtype': 'f8',
'chunks': True,
'compression': 'gzip',
}
extra_dataset_args.update(kwargs)
# Create the dataset in the HDF5 file (if necessary)
if timeseries.dataset_name in self:
dataset_hdf5 = self[timeseries.dataset_name]
else:
dataset_hdf5 = self.create_dataset(name=timeseries.dataset_name,
shape=timeseries.dataset.shape,
**extra_dataset_args)
# when timestamp_key has been left empty, use the default
timestamp_key = timeseries.timestamp_key
if timestamp_key is None:
timestamp_key = '/'.join(timeseries.dataset_name.split('/')[0:-1] \
+ [DEFAULT_TIMESTAMP_DATASET])
# Create the timestamps in the HDF5 file (if necessary) and calculate
# where to insert our data into the HDF5's dataset
if timestamp_key not in self:
timestamps_hdf5 = self.create_dataset(name=timestamp_key,
shape=timeseries.timestamps.shape,
dtype='i8')
# Copy the in-memory timestamp dataset into the HDF5 file
timestamps_hdf5[:] = timeseries.timestamps[:]
t_start = 0
t_end = timeseries.timestamps.shape[0]
start_timestamp = timeseries.timestamps[0]
end_timestamp = timeseries.timestamps[-1] + timeseries.timestep
else:
existing_timestamps = self.get_timestamps(timeseries.dataset_name)
t_start, t_end = get_insert_indices(timeseries.timestamps, existing_timestamps)
if t_start < 0 \
or t_start > (len(existing_timestamps) - 2) \
or t_end < 1 \
or t_end > len(existing_timestamps):
raise IndexError("cannot commit dataset that is not a subset of existing data")
start_timestamp = existing_timestamps[0]
end_timestamp = existing_timestamps[-1] + timeseries.timestep
# Make sure that the start/end timestamps are consistent with the HDF5
# file's global time range
if 'start' not in self.attrs:
self.attrs['start'] = start_timestamp
self.attrs['end'] = end_timestamp
else:
if self.attrs['start'] != start_timestamp \
or self.attrs['end'] != end_timestamp:
# warnings.warn(
raise IndexError("Mismatched start or end values: %d != %d or %d != %d" % (
start_timestamp, self.attrs['start'],
end_timestamp, self.attrs['end']))
# If we're updating an existing dataset, use its column names and ordering.
# Otherwise sort the columns before committing them.
if COLUMN_NAME_KEY in dataset_hdf5.attrs:
timeseries.rearrange_columns(timeseries.columns)
else:
timeseries.sort_columns()
# Copy the in-memory dataset into the HDF5 file
dataset_hdf5[t_start:t_end, :] = timeseries.dataset[:, :]
# Copy column names into metadata before committing metadata
timeseries.dataset_metadata[COLUMN_NAME_KEY] = timeseries.columns
timeseries.dataset_metadata['updated'] = int(time.mktime(datetime.datetime.now().timetuple()))
# If timeseries.version was never set, don't set a dataset-level version in the HDF5
if timeseries.version is not None:
self.set_version(timeseries.version, dataset_name=timeseries.dataset_name)
# Set the file's global version to indicate its schema
if timeseries.global_version is not None:
self['/'].attrs['version'] = timeseries.global_version
# Insert/update dataset metadata
for key, value in timeseries.dataset_metadata.items():
# special hack for column names
if key == COLUMN_NAME_KEY:
# note: the behavior of numpy.string_(x) where
# type(x) == numpy.array is _different_ in python2 vs. python3.
# Python3 happily converts each element to a numpy.string_,
# while Python2 first calls a.__repr__ to turn it into a single
# string, then converts that to numpy.string_.
dataset_hdf5.attrs[key] = numpy.array([numpy.string_(x) for x in value])
elif tokio.common.isstr(value):
dataset_hdf5.attrs[key] = numpy.string_(value)
elif value is None:
warnings.warn("Skipping attribute %s (null value) for %s" % (key, timeseries.dataset_name))
else:
dataset_hdf5.attrs[key] = value
# Insert/update group metadata
for key, value in timeseries.group_metadata.items():
if tokio.common.isstr(value):
dataset_hdf5.parent.attrs[key] = numpy.string_(value)
else:
dataset_hdf5.parent.attrs[key] = value
[docs]def missing_values(dataset, inverse=False):
"""Identify matrix values that are missing
Because we initialize datasets with -0.0, we can scan the sign bit of every
element of an array to determine how many data were never populated. This
converts negative zeros to ones and all other data into zeros then count up
the number of missing elements in the array.
Args:
dataset: dataset to access
inverse (bool): return 0 for missing and 1 for present if True
Returns:
numpy.ndarray: Array of numpy.int8 of 1 and 0 to indicate the presence
or absence of specific elements
"""
zero = numpy.int8(0)
one = numpy.int8(1)
if inverse:
converter = numpy.vectorize(lambda x:
zero if (x == 0.0 and math.copysign(1, x) < 0.0) else one)
else:
converter = numpy.vectorize(lambda x:
one if (x == 0.0 and math.copysign(1, x) < 0.0) else zero)
return converter(dataset)
[docs]def get_insert_indices(my_timestamps, existing_timestamps):
"""
Given new timestamps and an existing series of timestamps, find the indices
overlap so that new data can be inserted into the middle of an existing
dataset
"""
existing_timestep = existing_timestamps[1] - existing_timestamps[0]
my_timestep = my_timestamps[1] - my_timestamps[0]
# make sure the time delta is ok
if existing_timestep != my_timestep:
raise Exception("Existing dataset has different timestep (mine=%d, existing=%d)"
% (my_timestep, existing_timestep))
my_offset = (my_timestamps[0] - existing_timestamps[0]) // existing_timestep
my_end = my_offset + len(my_timestamps)
return my_offset, my_end