Source code for timescales.autoreg.fit

"""Spectral AR fitting."""
import warnings
import numpy as np
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
from timescales.sim import sim_ar

[docs] class ARPSD: """Fits AR(p) model to PSD."""
[docs] def __init__(self, order, fs, bounds=None, ar_bounds=None, guess=None, maxfev=100, loss_fn='linear', scaling="log", f_scale=None, curve_fit_kwargs=None): """Intialize object. Parameters ---------- order : int Autoregressive order. fs : float Sampling rate, in Hertz. bounds : 2d tuple or list, optional, default: None Bounds on the AR weights as (lower, uper). Defaults to (-0.9999, 0.9999). In some cases, (0, 0.9999) may be more appropriate. ar_bounds : tuple of (float, float): Sets bounds across all AR weights. guess : list, optional, default: None Inital AR weights. Defaults to zeros. maxfev : int, optional, default: None Max number of optimization iterations. loss_fn : str, optional, default: 'linear' Name of loss function supported by curve_fit. scaling : str, {"log", "linear"} Scale of power to fit. f_scale : float, optional, default: None Robust regression. Determines inliers/outliers. Between [0, 1]. curve_fit_kwargs : dict, optional, default: None Additonal kwargs to pass to curve_fit. """ self.order = order self.fs = fs self.freqs = None self.powers = None self.bounds = bounds self.ar_bounds = ar_bounds self.guess = guess self.f_scale = f_scale self.maxfev = maxfev self.loss_fn = loss_fn self.scaling = scaling self.params = None self.param_names = [f"phi_{i}" for i in range(order)] self.param_names.append("offset") self.curve_fit_kwargs = {} if curve_fit_kwargs is None else curve_fit_kwargs
[docs] def fit(self, freqs, powers): """Fit PSD. Parameters ---------- freqs : 1d array Frequencies. powers : 1d or 2d array Power. """ # Constants self.freqs = freqs self.powers = powers k = np.arange(1, self.order+1) self._exp = np.exp(-2j * np.pi * np.outer(freqs, k) / self.fs).T # Inital parameters and bounds if self.bounds is None: if self.ar_bounds is not None: l = [self.ar_bounds[0]] * self.order u = [self.ar_bounds[1]] * self.order else: l = [-1+1e-9] * self.order u = [1-1e-9] * self.order self.bounds = [ [*l, 1e-16], [*u, 1e16], ] if self.guess is None: guess = [0] * self.order self.guess = [*guess, 1.] # Fit with warnings.catch_warnings(): warnings.simplefilter("ignore") # log of negative may occur if self.scaling == "log": t = np.log10 f = lambda freqs, *params : t(_ar_spectrum(self._exp, *params)) else: t = lambda i : i # identity f = lambda freqs, *params : t(_ar_spectrum(self._exp, *params)) if powers.ndim == 1: self.params, _ = curve_fit( f, freqs, t(powers), p0=self.guess, bounds=self.bounds, maxfev=self.maxfev, f_scale=self.f_scale, loss=self.loss_fn, **self.curve_fit_kwargs ) self.powers_fit = _ar_spectrum(self._exp, *self.params) else: self.params = np.zeros((len(powers), self.order+1)) self.powers_fit = np.zeros_like(powers) for i, p in enumerate(powers): self.params[i], _ = curve_fit( f, freqs, t(p), p0=self.guess, bounds=self.bounds, maxfev=self.maxfev, f_scale=self.f_scale, loss=self.loss_fn, **self.curve_fit_kwargs ) self.powers_fit[i] = _ar_spectrum(self._exp, *self.params[i])
[docs] def plot(self, ax=None): """Plot model fit.""" if ax is None: _, ax = plt.subplots(figsize=(8, 6)) # Plot spectra if self.powers.ndim == 1: ax.loglog(self.freqs, self.powers, label='PSD', color='C0') elif self.powers.ndim == 2: ax.loglog(self.freqs, self.powers.mean(axis=0), label='PSD', color='C0') for power in self.powers: ax.loglog(self.freqs, power, color='C0', alpha=.1) # Plot fits if self.powers_fit is not None and self.powers_fit.ndim == 1: ax.loglog(self.freqs, self.powers_fit, label='Fit', ls='--', color='C1') elif self.powers_fit is not None and self.powers_fit.ndim == 2: ax.loglog(self.freqs, self.powers_fit.mean(axis=0), ls='--', color='C1', label='Mean Fit') ax.legend() ax.set_ylabel('Powers') ax.set_xlabel('Frequencies') ax.set_title('Aperiodic Model Fit')
[docs] def simulate(self, n_seconds, fs, init=None, error=None, index=None): """Simulate a signal based on learned parameters.""" if self.params is not None and index is None: return sim_ar(n_seconds, fs, self.params[:-1][::-1], init=init, error=error) elif self.params is not None and index is not None: return sim_ar(n_seconds, fs, self.params[index][:-1][::-1], init=init, error=error) else: raise ValueError("Must call .fit prior to simulating.")
@property def is_stationary(self): """Determines if the learned coefficients give a stationary process.""" if self.params is not None and self.params.ndim == 1: roots = np.polynomial.Polynomial(np.insert(-self.params[:-1], 0, 1.)).roots() return np.all(np.abs(roots) > 1.) elif self.params is not None and self.params.ndim == 2: _is_stationary = np.zeros(len(self.params), dtype=bool) for i in range(len(self.params)): roots = np.polynomial.Polynomial(np.insert(-self.params[i][:-1], 0, 1.)).roots() _is_stationary[i] = np.all(np.abs(roots) > 1.) return _is_stationary else: raise ValueError("Must call .fit to check stationarity.")
def _ar_spectrum(exp, *params): """Spectral form of an AR(p) model. Notes ----- This func is for fitting efficiency. Use timescales.sim.autoreg.sim_ar_spectrum otherwise. """ phi = params[:-1] offset = params[-1] denom = 1 - (phi @ exp) powers_fit = offset / np.abs(denom)**2 return powers_fit