Source code for tokio.timeseries

#/usr/bin/env python
"""
TimeSeries class to simplify updating and manipulating the in-memory
representation of time series data.
"""

import re
import math
import time
import datetime
import warnings
import numpy
from tokio.common import isstr

[docs]class TimeSeries(object): """ In-memory representation of an HDF5 group in a TokioFile. Can either initialize with no datasets, or initialize against an existing HDF5 group. """ def __init__(self, dataset_name=None, start=None, end=None, timestep=None, num_columns=None, column_names=None, timestamp_key=None, sort_hex=False): # numpy.ndarray of timestamp measurements self.timestamps = None # time between consecutive timestamps self.timestep = None # numpy.ndarray of the timeseries data itself self.dataset = None # string containing fully qualified dataset name+path self.dataset_name = None # list of strings serving as column headings self.columns = [] # map between strings in self.columns and its corresponding column index in self.dataset self.column_map = {} # key-value of metadata to be stored in HDF5 group.attrs self.group_metadata = {} # key-value of metadata to be stored in HDF5 dataset.attrs self.dataset_metadata = {} # path corresponding to self.timestamps dataset in HDF5 file self.timestamp_key = timestamp_key # True = natural sort columns assuming hex-encoded numbers; False = only recognize decimals self.sort_hex = sort_hex # string describing dataset version self.version = None # string describing schema version self.global_version = None # attempt to initialize the object if fields are supplied if dataset_name is not None and start and end and timestep and num_columns: self.init(start, end, timestep, num_columns, dataset_name, column_names, timestamp_key)
[docs] def init(self, start, end, timestep, num_columns, dataset_name, column_names=None, timestamp_key=None): """Create a new TimeSeries dataset object Responsible for setting self.timestep, self.timestamp_key, and self.timestamps Args: start (datetime): timestamp to correspond with the 0th index end (datetime): timestamp at which timeseries will end (exclusive) timestep (int): seconds between consecutive timestamp indices num_columns (int): number of columns to initialize in the numpy.ndarray dataset_name (str): an HDF5-compatible name for this timeseries column_names (list of str, optional): strings by which each column should be indexed. Must be less than or equal to num_columns in length; difference remains uninitialized timestamp_key (str, optional): an HDF5-compatible name for this timeseries' timestamp vector. Default is /groupname/timestamps """ if column_names is None: column_names = [] # Attach the timestep dataset self.timestep = timestep # Calculate the hours in a day in epoch-seconds since Python datetime # and timedelta doesn't understand DST time_list = [] end_epoch = int(time.mktime(end.timetuple())) timestamp = int(time.mktime(start.timetuple())) while timestamp < end_epoch: time_list.append(timestamp) timestamp += timestep self.timestamps = numpy.array(time_list) # Attach the dataset itself self.dataset_name = dataset_name self.dataset = numpy.full((len(self.timestamps), num_columns), -0.0) self.set_columns(column_names) self.set_timestamp_key(timestamp_key, safe=True)
[docs] def set_timestamp_key(self, timestamp_key, safe=False): """Set the timestamp key Args: timestamp_key (str): The key for the timestamp dataset. safe (bool): If true, do not overwrite an existing timestamp key """ # Root the timestamp_key at the same parent as the dataset if not safe or self.timestamp_key is None: self.timestamp_key = timestamp_key
[docs] def update_column_map(self): """ Create the mapping of column names to column indices """ self.column_map = {} for index, column_name in enumerate(self.columns): self.column_map[column_name] = index
[docs] def set_columns(self, column_names): """ Set the list of column names """ num_columns = len(column_names) # handle case where HDF5 has more column names than dataset columns via truncation if num_columns > self.dataset.shape[1]: column_names = self.columns[0:self.dataset.shape[1]] truncated = self.columns[self.dataset.shape[1]:] warnings.warn( "Dataset has %d column names but %d columns; dropping columns %s" % (num_columns, self.dataset.shape[1], ', '.join(truncated))) self.columns = [str(x) for x in column_names] # ensure that elements are mutable and not unicode self.update_column_map()
[docs] def add_column(self, column_name): """ Add a new column and update the column map """ index = len(self.columns) if column_name in self.column_map: warnings.warn("Adding degenerate column '%s' at %d (exists at %d)" % (column_name, index, self.column_map[column_name])) self.column_map[column_name] = index if index >= (self.dataset.shape[1]): errmsg = "new index %d (%s) exceeds number of columns %d (%s) in %s" % ( index, column_name, self.dataset.shape[1], self.columns[-1], self.dataset_name) raise IndexError(errmsg) self.columns.append(str(column_name)) # convert from unicode to str for numpy return index
[docs] def sort_columns(self): """ Rearrange the dataset's column data by sorting them by their headings """ self.rearrange_columns(sorted_nodenames(self.columns, sort_hex=self.sort_hex))
[docs] def rearrange_columns(self, new_order): """ Rearrange the dataset's columnar data by an arbitrary column order given as an enumerable list """ # validate the new order - new_order must contain at least all of # the elements in self.columns, but may contain more than that for new_key in new_order: if new_key not in self.columns: raise Exception("key %s in new_order not in columns" % new_key) # walk the new column order for new_index, new_column in enumerate(new_order): # new_order can contain elements that don't exist; this happens when # re-ordering a small dataset to be inserted into an existing, # larger dataset if new_column not in self.columns: warnings.warn("Column '%s' in new order not present in TimeSeries" % new_column) continue old_index = self.column_map[new_column] self.swap_columns(old_index, new_index)
[docs] def swap_columns(self, index1, index2): """ Swap two columns of the dataset in-place """ # save the data from the column we're about to swap saved_column_data = self.dataset[:, index2].copy() saved_column_name = self.columns[index2] # swap column data self.dataset[:, index2] = self.dataset[:, index1] self.dataset[:, index1] = saved_column_data[:] # swap column names too self.columns[index2] = self.columns[index1] self.columns[index1] = saved_column_name # update the column map self.column_map[self.columns[index2]] = index2 self.column_map[self.columns[index1]] = index1
[docs] def get_insert_pos(self, timestamp, column_name, create_col=False): """Determine col and row indices corresponding to timestamp and col name Args: timestamp (datetime.datetime): Timestamp to map to a row index column_name (str): Name of column to map to a column index create_col (bool): If column_name does not exist, create it? Returns: (t_index, c_index) (long or None) """ timestamp_epoch = int(time.mktime(timestamp.timetuple())) t_index = int((timestamp_epoch - self.timestamps[0]) // self.timestep) if t_index >= self.timestamps.shape[0] or t_index < 0: # check bounds return None, None # create a new column label if necessary c_index = self.column_map.get(column_name) if c_index is None and create_col: c_index = self.add_column(column_name) return t_index, c_index
[docs] def insert_element(self, timestamp, column_name, value, reducer=None): """Inserts a value into a (timestamp, column) element Given a timestamp (datetime.datetime object) and a column name (string), update an element of the dataset. If a reducer function is provided, use that function to reconcile any existing values in the element to be updated. Args: timestamp (datetime.datetime): Determines the row index into which `value` should be inserted column_name (str): Determines the column into which `value` should be inserted value: Value to insert into the dataset reducer (function or None): If a value already exists for the given (timestamp, column_name) coordinate, apply this function to the existing value and the input `value` and store the result If None, just overwrite the existing value. Returns: bool: True if insertion was successful, False if no action was taken """ t_index, c_index = self.get_insert_pos(timestamp, column_name, create_col=True) if t_index is None or c_index is None: return False # actually copy the two data points into the datasets old_value = self.dataset[t_index, c_index] if (self.dataset[t_index, c_index] != 0.0 \ or math.copysign(1, old_value) < 0.0) \ and reducer is not None: self.dataset[t_index, c_index] = reducer(old_value, value) else: self.dataset[t_index, c_index] = value return True
[docs] def convert_to_deltas(self, align='l'): """Converts a matrix of monotonically increasing rows into deltas. Replaces self.dataset with a matrix with the same number of columns but one fewer row (taken off the bottom of the matrix). Also adjusts the timestamps dataset. Arguments: align (str): "left" or "right". Determines whether the contents of a cell labeled with timestamp t0 contains the data between t0 and t0 + dt (left) or t0 and t0 - dt (right). """ self.dataset = timeseries_deltas(self.dataset) if align[0] == 'l': self.timestamps = self.timestamps[0:-1] elif align[0] == 'r': self.timestamps = self.timestamps[1:]
[docs] def trim_rows(self, num_rows=1): """ Trim some rows off the end of self.dataset and self.timestamps """ self.dataset = self.dataset[0:-1*num_rows] self.timestamps = self.timestamps[0:-1*num_rows]
[docs] def add_rows(self, num_rows=1): """ Add additional rows to the end of self.dataset and self.timestamps """ new_dataset_rows = numpy.full((num_rows, self.dataset.shape[1]), -0.0) new_timestamps = [] for index in range(num_rows): new_timestamps.append(self.timestamps[-1] + (index + 1) * self.timestep) new_timestamp_rows = numpy.array(new_timestamps) self.dataset = numpy.vstack((self.dataset, new_dataset_rows)) self.timestamps = numpy.hstack((self.timestamps, new_timestamp_rows))
[docs]def sorted_nodenames(nodenames, sort_hex=False): """ Gnarly routine to sort nodenames naturally. Required for nodes named things like 'bb23' and 'bb231'. """ def extract_int(string): """ Convert input into an int if possible; otherwise return unmodified """ try: if sort_hex: return int(string, 16) return int(string) except ValueError: return string def natural_compare(string): """ Tokenize string into alternating strings/ints if possible """ return list(map(extract_int, re.findall(r'(\d+|\D+)', string))) def natural_hex_compare(string): """ Tokenize string into alternating strings/ints if possible. Also recognizes hex, so be careful with ambiguous nodenames like "bb234", which is valid hex. """ return list(map(extract_int, re.findall(r'([0-9a-fA-F]+|[^0-9a-fA-F]+)', string))) # def natural_comp(arg1, arg2): # """ # Cast the parts of a string that look like integers into integers, then # sort based on strings and integers rather than only strings # """ # return cmp(natural_compare(arg1), natural_compare(arg2)) # def natural_hex_comp(arg1, arg2): # """ # Cast the parts of a string that look like hex into integers, then # sort based on strings and integers rather than only strings. # """ # return cmp(natural_hex_compare(arg1), natural_hex_compare(arg2)) if sort_hex: return sorted(nodenames, key=natural_hex_compare) return sorted(nodenames, key=natural_compare)
[docs]def timeseries_deltas(dataset): """Convert monotonically increasing values into deltas Subtract every row of the dataset from the row that precedes it to convert a matrix of monotonically increasing rows into deltas. This is a lossy process because the deltas for the final measurement of the time series cannot be calculated. Args: dataset (numpy.ndarray): The dataset to convert from absolute values into deltas. rows should correspond to time, and columns to individual components Returns: numpy.ndarray: The deltas between each row in the given input dataset. Will have the same number of columns as the input dataset and one fewer rows. """ diff_matrix = numpy.full((dataset.shape[0] - 1, dataset.shape[1]), -0.0) prev_nonzero = [None] * dataset.shape[1] # the last known valid measurement searching = [True] * dataset.shape[1] # are we spanning a gap in data? for irow in range(dataset.shape[0]): for icol in range(dataset.shape[1]): this_element = dataset[irow, icol] if irow == 0: if this_element != 0.0: prev_nonzero[icol] = this_element elif searching[icol]: if this_element != 0.0: if prev_nonzero[icol] is not None and this_element >= prev_nonzero[icol]: diff_matrix[irow - 1, icol] = this_element - prev_nonzero[icol] searching[icol] = False prev_nonzero[icol] = this_element else: if this_element < dataset[irow - 1, icol]: # found a missing data point searching[icol] = True else: diff_matrix[irow - 1, icol] = this_element - dataset[irow - 1, icol] prev_nonzero[icol] = this_element return diff_matrix