Source code for mewarpx.diags_store.timeseries

from builtins import object
import collections
import math

import matplotlib.pyplot as plt
import numpy as np
from scipy import ndimage


[docs]def concat_crop_timeseries(timeseries_list, step_begin=None, step_end=None, keys=None, dt=None, debug=False): """Combine or crop timeseries object(s). Note: dt must be a multiple of all individual timeseries' dt. Timeseries that do not share the final dt will be resampled with default smoothing. Arguments: timeseries_list (list of Timeseries): List of objects to concatenate. If empty, return None. step_begin (int): Step to begin the array with, cropping timeseries if needed. If None (default), take all timesteps from inputs. step_end (int): Step to end the array with + 1, cropping timeseries if needed. If None (default), take all timesteps from inputs. dt (float): The timestep to use. If None, use the maximum dt among the series. In all cases, this must be a multiple of each timeseries dt. keys (list of str): If specified, concatenate only these keys and ignore any others held by the objects. debug (bool): If True, print out lots of diagnostic info. Default False. These print statements are commented out by default to prevent frequent if statement checks; so uncomment to enable debug functionality. Returns: timeseries (Timeseries): The concatenated, cropped, Timeseries object. """ if len(timeseries_list) < 1: # debug_print(debug, "No timeseries to concatenate") return None # Get dt. If it doesn't work for resampling, error will be thrown during # resampling. if dt is None: dt = max([x.dt for x in timeseries_list]) # debug_print(debug, "dt auto-calculated as ", dt) ts_resampled = [] for ts in timeseries_list: # debug_print( # debug, # "ts has {} elements".format(ts.step_end - ts.step_begin) # ) ts_resampled.append(ts.resample(new_dt=dt, smooth=True)) # debug_print( # debug, # "resampled ts has step_begin {}, step_end {}".format( # ts_resampled[-1].step_begin, ts_resampled[-1].step_end) # ) # Get step begin/end if step_begin is None: step_begin = min(x.step_begin for x in ts_resampled) # debug_print(debug, "step_begin is ", step_begin) if step_end is None: step_end = max(x.step_end for x in ts_resampled) # debug_print(debug, "step_end is ", step_end) # Create new object final_ts = Timeseries(step_begin=step_begin, step_end=step_end, dt=dt) # Iterate through Timeseries and each key. if keys is None: keys = set([y for x in timeseries_list for y in list(x._array_dict.keys())]) # debug_print(debug, "keys", keys) for key in keys: final_ts._array_dict[key] = np.zeros(step_end - step_begin) for ts in ts_resampled: # Figure out which parts of arrays to transfer by finding begin and # ending indices for each. if ts.step_begin >= step_begin: ts_beginidx = 0 self_beginidx = ts.step_begin - step_begin else: ts_beginidx = step_begin - ts.step_begin self_beginidx = 0 if ts.step_end >= step_end: ts_endidx = step_end - ts.step_begin self_endidx = step_end - step_begin else: ts_endidx = ts.step_end - ts.step_begin self_endidx = ts.step_end - step_begin # debug_print(debug, "Found ts idxs", ts_beginidx, ts_endidx) # debug_print(debug, "Found self idxs", self_beginidx, self_endidx) # In cases where a timeseries falls entirely outside the new range, we # skip it completely. This avoids a bug where you can get eg # ts_beginidx = 0 and ts_endidx = -1; the concatenation will fail. if ts_endidx <= ts_beginidx: continue for key in keys: # debug_print(debug, "Concat", key) if key not in ts._array_dict: continue # debug_print(debug, "Data", # ts._array_dict[key][ts_beginidx:ts_endidx]) final_ts._array_dict[key][self_beginidx:self_endidx] += ( ts._array_dict[key][ts_beginidx:ts_endidx] ) return final_ts
[docs]class Timeseries(object): """Hold a regularly-spaced array of values that we can then manipulate.""" def __init__(self, step_begin, step_end, dt, array_dict=None): """Initialize timeseries object with the arrays for it to hold. Arguments: step_begin (int): First timestep stored in the arrays step_end (int): Last timestep stored in the arrays dt (float): Time increment between steps in seconds array_dict (dict of np.ndarray): A dictionary with keys that are strings, and values that are one-dimensional arrays with length (step_end - step_begin)/stride, containing the value of that string's quantity at each step = step_begin + i*stride. If None, initialize an empty dictionary that can be used later. All quantities should already be rates for resampling to work (eg current, not charge, at a given timestep). """ self.step_begin = int(round(step_begin)) self.step_end = int(round(step_end)) self.dt = dt self._array_dict = array_dict if self._array_dict is None: self._array_dict = collections.OrderedDict() self._check_input() def _check_input(self): """Perform error checking on data.""" if self.step_end <= self.step_begin: raise ValueError("Timeseries must have step_end > step_begin") if self.dt <= 0.: raise ValueError("dt must be greater than 0.") for key, val in self._array_dict.items(): if len(val) != self.n_elements: raise ValueError( f"All arrays must have correct number of elements {self.n_elements} " f"based on step values, but array {key} has {len(val)} elements." ) @property def n_elements(self): """A property so that it automatically adapts to resampling.""" return self.step_end - self.step_begin
[docs] def keys(self): """A user accessible list of keys.""" return list(self._array_dict.keys())
[docs] def set_array(self, key, array): """Add or set an array in the dictionary. Arguments: key (str): Key to add array (np.ndarray): Array to add """ self._array_dict[key] = array self._check_input()
[docs] def get_timeseries_by_key(self, key, include_times=True, default=None): """Get timeseries, including an array of times if requested. Arguments: key (str): Key to look up. include_times (bool): If True, return an n_elements x 2 array that has time in seconds in first column; values in second. If False, return an n_elements array with values only. default (None or float): Value to fill timeseries array with if the given key does not exist in the dictionary or if the dictionary is empty. If None, AttributeError will be raised for an invalid query. """ if self._array_dict is None: array_vals = default else: array_vals = self._array_dict.get(key, default) if array_vals is None: raise AttributeError( '%s not found in the dictionary or the dictionary is None.'%key ) if not include_times: return array_vals timeseries_array = np.empty((self.n_elements, 2)) timeseries_array[:, 0] = ( np.arange(self.step_begin, self.step_end) * self.dt ) timeseries_array[:, 1] = array_vals return timeseries_array
[docs] def get_averagevalue_by_key(self, key, default=None): """Get the rate value of a timeseries. Arguments: key (str): Key to look up. default (float): Value to return if the given key does not exist in the dictionary or if the dictionary is empty. If None, AttributeError will be raised for an invalid query. Returns: value (float): Average value """ if self._array_dict is None: array_vals = default else: array_vals = self._array_dict.get(key, default) if array_vals is None: raise AttributeError( '%s not found in the dictionary or the dictionary is None.'%key ) return np.mean(array_vals)
[docs] def resample(self, new_dt, inplace=False, smooth=True): """Resample the time series at a longer timescale. Arguments: new_dt (float): Must be a multiple of existing dt! inplace (bool): If True, modify this Timeseries. If False, return a new one. Note that if resample is the identity operation, the current Timeseries is returned regardless of this setting. Default False. smooth (bool): If True, first apply Gaussian smoothing with sigma = 0.5*new_dt to the timeseries before sampling from the timeseries. Default True. Note smoothing is never applied if new_dt is equal to current dt. If resampling by 500x or more, smooth using mean values instead. Returns: timeseries (Timeseries): Object with new dt. """ if new_dt < self.dt: raise ValueError( f"new_dt ({new_dt}) must be greater or equal to existing dt ({self.dt})." ) dt_factor = int(round(new_dt / self.dt)) if not np.isclose(new_dt / self.dt, dt_factor): raise ValueError( f"new_dt ({new_dt}) must be a multiple of existing dt ({self.dt})." ) if dt_factor == 1: return self step_begin = self.step_begin // dt_factor step_begin_remainder = self.step_begin % dt_factor if step_begin_remainder > 0: step_begin += 1 idx_offset = dt_factor - step_begin_remainder else: idx_offset = 0 # This calculation makes sure that when there are not an evenly # divisible number of elements, we get the correct step_end - otherwise # there are off-by-one errors with step_end one too small. Eg # step_begin=1, step_end=11, dt_factor=2 should give new_step_begin=1, # new_step_end=6, and sample elements 2, 4, 6, 8, 10 from original. n_elements = int(math.ceil( (self.n_elements - idx_offset) / float(dt_factor) )) step_end = step_begin + n_elements if inplace: target = self self.dt = new_dt self.step_begin = step_begin self.step_end = step_end else: target = Timeseries(step_begin=step_begin, step_end=step_end, dt=new_dt, ) for key, val in self._array_dict.items(): if smooth: if dt_factor < 500: target._array_dict[key] = ( self.gauss_smooth( val, dt_factor/2. )[idx_offset::dt_factor] ) else: target._array_dict[key] = ( self.mean_smooth(val, dt_factor, idx_offset) ) else: target._array_dict[key] = val[idx_offset::dt_factor] target._check_input() return target
[docs] @staticmethod def gauss_smooth(array, sigma): """Apply Gaussian smoothing to the given array. This function is based on minerva.util.gaussian_smoothing, but since it operates in a given way on 1D arrays only, with no resampling, it's re-implemented in simplified form here. Arguments: array (np.ndarray): The 1D array to smooth. sigma (float): The sigma, in units of steps, to use for smoothing. Returns: smoothed_array (np.ndarray): 1D array with smoothing applied. """ # Both truncate=4 and mode=reflect are the default values for this # function. smoothed_array = ndimage.gaussian_filter( array, sigma=sigma, truncate=4.0, mode='reflect' ) return smoothed_array
[docs] @staticmethod def mean_smooth(array, dt_factor, idx_offset=0): """Apply smoothing and resampling to the given array. This function is a simplified smoothing scheme appropriate for cases where many data points will be combined into a single point. Smoothing is done by simply averaging the points to be combined. Arguments: array (np.ndarray): The 1D array to smooth. dt_factor (int): The units of steps to use for smoothing. If 1 or smaller, then no smoothing is performed and the original array is returned as is. If len(array) or larger, then the entire array is smoothed down to a single point. idx_offset (int): Smoothing calculation is performed on the slice array[idx_offset:]. Entries 0:idx_offset are ignored in the input and not returned in the output. Defaults to 0. Returns: smoothed_array (np.ndarray): 1D array with smoothing applied. """ array_length = len(array) - idx_offset # Handle edge cases if dt_factor < 1: dt_factor = 1 if dt_factor > array_length: dt_factor = array_length new_array_length = int(np.ceil(array_length / dt_factor)) full_intervals = int(array_length / dt_factor) end_offset = idx_offset + array_length - array_length % dt_factor smoothed_array = np.zeros(new_array_length) smoothed_array[:full_intervals] = np.mean( np.split(array[idx_offset:end_offset], full_intervals), axis=1 ) if new_array_length > full_intervals: smoothed_array[-1] = np.mean( array[idx_offset + full_intervals * dt_factor:] ) return smoothed_array
[docs]class TimeseriesPlot(object): """Handle plotting of arbitrary timeseries.""" # Start with default params default_timeseries_params = { "xlabel": r'Time ($\mu$s)', "ylabel": 'Timeseries output', "title": 'Timeseries plot', "labelsize": 24, "legendsize": 14, "titlesize": 32, "alpha": 1.0, "yfactor": 1.0, "legend_loc": 'best', "legend_bbox_to_anchor": None, "xoffset": 0.0, } def __init__(self, array_list, ax=None, **kwargs): r"""Plot fluxes throughout the simulation. Arguments: array_list (list of tuples of (name, timeseries_array)): timeseries_arrays are those returned by ``Timeseries.get_timeseries_by_key(key, include_times=True)``. name is the label for the plot legend for that timeseries. List order determines plotting order. Smoothing should be performed before being passed to this function. ax (matplotlib.Axes): If specified, plot on this axes object. If unspecified, get current axes. xlabel (string): abscissa label. Must include 'ns' or '$\mu$s' to specify units in this string. ylabel (string): ordinate label title (string) labelsize (int): default 24 legendsize (int): default 14 titlesize (int): default 32 alpha (float): Transparency of lines to avoid obscuring lines, if desired. Default 1 (disabled). yfactor (float): Multiply y-axis by this value. Eg for correcting the effective area in currents. legend_loc (str): 'loc' argument in legend call legend_bbox_to_anchor (tuple of float): bbox_to_anchor argument in legend call xoffset (float): Offset to apply to times, in seconds """ self.timeseries_params = self.default_timeseries_params.copy() self.timeseries_params.update(kwargs) if ax is None: ax = plt.gca() # determine scaling factor for time-axis if 'ns' in self.timeseries_params["xlabel"]: scaling_factor = 1e9 elif r'$\mu$s' in self.timeseries_params["xlabel"]: scaling_factor = 1e6 else: raise ValueError( r"Unrecognized time in xlabel. Specify 'ns' or '$\mu$s'." ) for (name, ts_array) in array_list: ax.plot( (scaling_factor * (ts_array[:, 0] + self.timeseries_params["xoffset"])), self.timeseries_params["yfactor"]*ts_array[:, 1], alpha=self.timeseries_params["alpha"], label=name ) if kwargs.get('legend', True): ax.legend(fontsize=self.timeseries_params["legendsize"], loc=self.timeseries_params["legend_loc"], bbox_to_anchor=self.timeseries_params[ "legend_bbox_to_anchor"]) ax.set_xlabel(self.timeseries_params["xlabel"], fontsize=self.timeseries_params["labelsize"]) ax.set_ylabel(self.timeseries_params["ylabel"], fontsize=self.timeseries_params["labelsize"]) ax.set_title(self.timeseries_params["title"], fontsize=self.timeseries_params["titlesize"])