Source code for timescales.fit.acf

"""Autocorrelation estimation methods."""

from functools import partial
from multiprocessing import Pool, cpu_count

import matplotlib.pyplot as plt

import numpy as np
from scipy.optimize import curve_fit
from scipy.fft import ifft
from statsmodels.tsa.stattools import acf

from neurodsp.spectral import compute_spectrum
from timescales.autoreg import compute_ar_spectrum

from timescales.sim.acf import sim_acf_cos, sim_exp_decay, sim_damped_cos
from timescales.utils import normalize as normalize_acf
from timescales.conversions import convert_knee, psd_to_acf
from timescales.fit.utils import progress_bar, check_guess_and_bounds


[docs]class ACF: """Autocorrelation function class. Parameters ---------- lags : 1d array Time lag definitions. corrs : 1d or 2d array Autocorrelation coefficients. fs : float Sampling rate, in Hz. Attributes ---------- corrs_fit : 1d array Autocorrelation full fit. params : 1d array Optimized parameters. param_names : list of str Parameter names in order of params. rsq : float R-squared of the full fit. guess : list, optional, default: None Estimated parameters as either: [tau, height, offset] when with_cos is False, or [exp_tau, osc_tau, osc_gamma, osc_freq, amp_ratio, height, offset]. bounds : list, optional, default: None Parameters bounds as [(*lower_bounds), (*upper_bounds)]. Notes ----- Parameters may be set on initialization or using the compute_acf method. """
[docs] def __init__(self, lags=None, corrs=None, fs=None): """Initialize object.""" self.lags = lags self.corrs = corrs self.fs = fs # Set via other methods self.guess = None self.bounds = None self.params = None self.param_names = None self.param_exp = None self.params_cos = None self.tau = None self.knee_freq = None self.corrs_fit = None self.corrs_fit_exp = None self.corrs_fit_cos = None self.params_exp = None self.params_cos = None self.rsq = None # For comparison to PSD models self.tau = None self.knee_freq = None
[docs] def compute_acf(self, sig, fs, nlags=None, normalize=True, from_psd=False, psd_kwargs=None, n_jobs=-1, progress=None): """Compute autocorrelation. Parameters ---------- sig : 1d or 2d array Voltage time series or spike counts. fs : float Sampling rate, in Hz. nlags : int, optional, default: None Number of lags to compute. None defaults to the sampling rate, fs. normalize : bool, optional, default: True Normalizes from zero to one when True. from_psd : bool, optional, default: False Compute correlations from the inverse FFT of the PSD. psd_kwargs : dict, optional, default: None Compute spectrum kwargs. Only used if from_psd is True. n_jobs : int, optional, default: -1 Number of jobs to run in parralel, when corrs is 2d. Default is equal to multiprocessing's cpu_count(). progress : {None, 'tqdm', 'tqdm.notebook'} Specify whether to display a progress bar. Uses 'tqdm', if installed. """ self.fs = fs nlags = int(self.fs) if nlags is None else nlags if not from_psd: # Compute ACF if sig.ndim == 1: self.corrs = acf(sig, nlags=nlags, qstat=False, fft=True)[1:] self.lags = np.arange(1, len(self.corrs)+1) elif sig.ndim == 2: n_jobs = cpu_count() if n_jobs == -1 else n_jobs with Pool(processes=n_jobs) as pool: mapping = pool.map(partial(acf, nlags=nlags, qstat=False, fft=True), sig) results = list(progress_bar(mapping, progress, len(sig), 'Computing ACF')) self.corrs = np.array(results)[:, 1:] self.lags = np.arange(1, len(self.corrs[0])+1) else: raise ValueError('sig must be either 1d or 2d.') else: # Handle kwargs psd_kwargs = {} if psd_kwargs is None else psd_kwargs _psd_kwargs = psd_kwargs.copy() norm_range = _psd_kwargs.pop('norm_range', None) # Compute spectrum if 'ar_order' in psd_kwargs: ar_order = _psd_kwargs.pop('ar_order') freqs, powers = compute_ar_spectrum(sig, self.fs, ar_order, **_psd_kwargs) else: freqs, powers = compute_spectrum(sig, self.fs, **_psd_kwargs) # Normalize power if requested if norm_range is not None: powers = normalize_acf(powers, *norm_range) # Take inverse fft to get acf if sig.ndim == 2: for ind in range(len(powers)): self.lags, _corrs = psd_to_acf(freqs, powers[ind], fs, (0, 1)) if ind == 0: self.corrs = np.zeros((len(powers), len(_corrs))) self.corrs[ind] = _corrs self.lags = self.lags[:nlags] self.corrs = self.corrs[:, :nlags] else: self.lags, self.corrs = psd_to_acf(freqs, powers, fs, (0, 1)) self.lags = self.lags[:nlags] self.corrs = self.corrs[:nlags] if normalize: self.corrs = normalize_acf(self.corrs, 0, 1)
[docs] def fit(self, lags=None, corrs=None, gen_fits=True, gen_components=False, with_cos=False, guess=None, bounds=None, maxfev=1000, n_jobs=-1, progress=None): """Fit without an oscillitory component. Parameters ---------- lags : 1d array Time lag definitions. corrs : 1d or 2d array Autocorrelation coefficients. gen_fits : bool, optional, default: False Generates fit array and r-squared when True. Does not generate full fits when False to prevent OOM. gen_components : bool, optional, default: False When gen_fits and with_cos are True, the exponential decay and cosine components are be generated separately when this parameter is True. with_cos : bool, optional, default: False Includes oscillatory component as a damped cosine. guess : list, optional, default: None Estimated parameters as either: [tau, height, offset] when with_cos is False, or [exp_tau, osc_tau, osc_gamma, osc_freq, amp_ratio, height, offset]. bounds : list, optional, default: None Parameters bounds as [(*lower_bounds), (*upper_bounds)]. maxfev : int Maximum number of fitting iterations. n_jobs : int Number of jobs to run in parralel, when corrs is 2d. progress : {None, 'tqdm', 'tqdm.notebook'} Specify whether to display a progress bar. Uses 'tqdm', if installed. """ if lags is not None: self.lags = lags if corrs is not None: self.corrs = corrs if self.corrs is None or self.lags is None: raise ValueError('Initialize with lags, corrs, and fs. ' 'Or call compute_acf prior to fitting.') if not with_cos: # Non-oscillatory model self.param_names = ['tau', 'height', 'offset'] self.params, self.guess, self.bounds = fit_acf(self.corrs, self.fs, self.lags, guess=guess, bounds=bounds, maxfev=maxfev, n_jobs=n_jobs, progress=progress) else: # Oscillatory model self.param_names = ['exp_tau', 'osc_tau', 'osc_gamma', 'osc_freq', 'amp_ratio', 'height', 'offset'] self.params, self.guess, self.bounds = fit_acf_cos(self.corrs, self.fs, self.lags, guess=guess, bounds=bounds, maxfev=maxfev, n_jobs=n_jobs, progress=progress) if gen_fits and not np.isnan(self.params).any(): self.gen_corrs_fit(gen_components) self.tau = self.params[0] self.knee_freq = convert_knee(self.tau) else: self.tau = np.nan self.knee_freq = np.nan
[docs] def gen_corrs_fit(self, gen_components=False): """Generate fit and r-squared. Parameters ---------- gen_components : bool, optional, default: True Generates oscillatory and exponential components separately, in additon to combined, when True. """ sim_func = sim_exp_decay if self.params.shape[-1] == 3 else sim_acf_cos if self.corrs.ndim == 2: self.corrs_fit = np.zeros((len(self.params), len(self.corrs[0]))) self.rsq = np.zeros(len(self.params)) for ind in range(len(self.params)): self.corrs_fit[ind] = sim_func(self.lags, self.fs, *self.params[ind]) self.rsq[ind] = np.corrcoef(self.corrs[ind], self.corrs_fit[ind])[0][1] ** 2 else: self.corrs_fit = sim_func(self.lags, self.fs, *self.params) self.rsq = np.corrcoef(self.corrs, self.corrs_fit)[0][1] ** 2 # Separate oscillatory and exponential decay compoents n_params = len(self.params) if self.corrs.ndim == 1 else len(self.params[0]) n_corrs = 1 if self.corrs.ndim == 1 else len(self.corrs) if gen_components and n_params != 3: if n_corrs > 1: self.params_exp = np.zeros((n_corrs, 2)) self.params_cos = np.zeros((n_corrs, 4)) self.corrs_fit_exp = np.zeros((n_corrs, len(self.corrs[0]))) self.corrs_fit_cos = np.zeros((n_corrs, len(self.corrs[0]))) for ind in range(n_corrs): exp_tau, osc_tau, osc_gamma, osc_freq, amp_ratio, _, _ = self.params[ind] self.params_exp[ind] = np.array([exp_tau, amp_ratio]) self.params_cos[ind] = np.array([osc_tau, 1-amp_ratio, osc_gamma, osc_freq]) self.corrs_fit_exp[ind] = sim_exp_decay(self.lags, self.fs, exp_tau, amp_ratio) self.corrs_fit_cos[ind] = sim_damped_cos(self.lags, self.fs, osc_tau, 1-amp_ratio, osc_gamma, osc_freq) else: exp_tau, osc_tau, osc_gamma, osc_freq, amp_ratio, _, _ = self.params self.params_exp = np.array([exp_tau, amp_ratio]) self.params_cos = np.array([osc_tau, 1-amp_ratio, osc_gamma, osc_freq]) exp = sim_exp_decay(self.lags, self.fs, exp_tau, amp_ratio) osc = sim_damped_cos(self.lags, self.fs, osc_tau, 1-amp_ratio, osc_gamma, osc_freq) self.corrs_fit_exp = exp self.corrs_fit_cos = osc elif gen_components: raise ValueError('Call the fit method with fit_cos=True for separable components.')
[docs] def plot(self, ax=None, title=None): """Plot ACF. Parameters ---------- ax : AxesSubplot, optional, default: None Axis to plot on. """ if ax is None: _, ax = plt.subplots(figsize=(8, 6)) if self.corrs is None: raise ValueError('corrs and lags are undefined.') # Plot ACF if self.corrs.ndim == 1: ax.plot(self.lags, self.corrs, label='ACF', color='C0') elif self.corrs.ndim == 2: ax.plot(self.lags, self.corrs.mean(axis=0), label='ACF', color='C0') for corrs in self.corrs: ax.plot(self.lags, corrs, color='C0', alpha=.1) # Plot fits if self.corrs_fit is not None and self.corrs_fit.ndim == 1: ax.plot(self.lags, self.corrs_fit, label='Fit', ls='--', color='C1') elif self.corrs_fit is not None and self.corrs_fit.ndim == 2: ax.plot(self.lags, self.corrs_fit.mean(axis=0), ls='--', color='C1', label='Mean Fit') ax.legend() ax.set_ylabel('Correlation') ax.set_xlabel('Lag') title = 'ACF Model Fit' if title is None else title ax.set_title(title)
def fit_acf(corrs, fs, lags=None, guess=None, bounds=None, n_jobs=-1, maxfev=1000, progress=None): """Compute and fit ACF. Parameters ---------- corrs : 1d or 2d array Autocorrelation coefficients. fs : float Sampling rate, in Hz. lags : 1d array, optional, default: None Lags each coefficient was computed for. guess : list, optional, default: None Estimated parameters as [tau, height, offset]. bounds : list, optional, default: None Parameters bounds as [(*lower_bounds), (*upper_bounds)]. n_jobs : int Number of jobs to run in parralel, when corrs is 2d. maxfev : int Maximum number of fitting iterations. guess : 1d or 2d array Curve fit initial guess parameters. bounds : 1d or 2d array Curve fit parameter bounds. """ if corrs.ndim == 1: lags = np.arange(1, len(corrs)+1) if lags is None else lags params, guess, bounds = _fit_acf(corrs, lags, fs, guess, bounds, maxfev) elif corrs.ndim == 2: lags = np.arange(1, len(corrs[0])+1) if lags is None else lags n_jobs = cpu_count() if n_jobs == -1 else n_jobs # Ensure guess and bounds are zipable guess, bounds = check_guess_and_bounds(corrs, guess, bounds) # Proxy function to organize args with Pool(processes=n_jobs) as pool: mapping = pool.imap(partial(_acf_proxy, lags=lags, fs=fs, maxfev=maxfev), zip(corrs, guess, bounds)) params = list(progress_bar(mapping, progress, len(corrs))) guess = np.array([i[1] for i in params]) bounds = np.array([i[2] for i in params]) params = np.array([i[0] for i in params]) return params, guess, bounds def fit_acf_cos(corrs, fs, lags=None, guess=None, bounds=None, maxfev=1000, n_jobs=-1, progress=None): """Fit an autocorraltion as the sum of exponential and cosine components. Parameters ---------- corrs : 1d or 2d array Autocorrelation coefficients. fs : float Sampling rate, in Hz. lags : 1d array, optional, default: None Lags each coefficient was computed for. guess : list or list of list, optional, default: None Estimated parameters as [height, tau, offset]. bounds : list or list of list, optional, default: None Parameters bounds as [(*lower_bounds), (*upper_bounds)]. n_jobs : int Number of jobs to run in parralel, when corrs is 2d. maxfev : int Maximum number of fitting iterations. n_jobs : int Number of jobs to run in parralel, when corrs is 2d. progress : {None, 'tqdm', 'tqdm.notebook'} Specify whether to display a progress bar. Uses 'tqdm', if installed. Returns ------- params : 1d or 2d array Fit params as [exp_tau, exp_amp, osc_tau, osc_amp, osc_gamma, osc_freq, offset]. """ if corrs.ndim == 1: lags = np.arange(1, len(corrs)+1) if lags is None else lags params, guess, bounds = _fit_acf_cos(corrs, lags, fs, guess, bounds, maxfev) params = np.array(params) elif corrs.ndim == 2: lags = np.arange(1, len(corrs[0])+1) if lags is None else lags n_jobs = cpu_count() if n_jobs == -1 else n_jobs # Ensure guess and bounds are zipable guess, bounds = check_guess_and_bounds(corrs, guess, bounds) # Proxy function to organize args with Pool(processes=n_jobs) as pool: mapping = pool.imap(partial(_acf_cos_proxy, lags=lags, fs=fs, maxfev=maxfev), zip(corrs, guess, bounds)) params = list(progress_bar(mapping, progress, len(corrs))) guess = np.array([i[1] for i in params]) bounds = np.array([i[2] for i in params]) params = np.array([i[0] for i in params]) return params, guess, bounds def _fit_acf(corrs, lags, fs, guess=None, bounds=None, maxfev=1000): """Fit 1d ACF.""" if guess is not None: target_tau = guess[1] else: inds = np.argsort(np.abs(corrs - np.max(corrs) * (1/np.exp(1)))) target_tau = inds[1] if inds[0] == 0 else inds[0] target_tau /= fs if guess is None: guess = [target_tau, np.max(corrs), 0.] _bounds = [ (0, 0, -.5), (target_tau * 10, 2, .5) ] if bounds is None: bounds = _bounds else: _bounds = np.array(_bounds) bounds = np.array(bounds) xinds, yinds = np.where(bounds == None) if len(xinds) != 0: for x, y in zip(xinds, yinds): bounds[x, y] = _bounds[x, y] bounds = [tuple(b) for b in bounds.tolist()] # If guess is outside of bounds, # set to midpoint of bounds for ind, g in enumerate(guess): if g <= bounds[0][ind] or g >= bounds[1][ind]: guess[ind] = (bounds[0][ind] + bounds[1][ind]) / 2 try: params, _ = curve_fit( lambda lags, t, amp, off : sim_exp_decay(lags, fs, t, amp, off), lags, corrs, p0=guess, bounds=bounds, maxfev=maxfev ) except RuntimeError: params = np.nan return params, guess, bounds def _fit_acf_cos(corrs, lags, fs, guess=None, bounds=None, maxfev=1000): """Fit 1d ACF with cosine.""" if (guess is None or bounds is None) or (None in guess or None in bounds): # Compute spectrum of autocorrs to determine cos freq _, p = compute_spectrum(corrs, len(corrs)) freq = int(np.argmax(p)) # Tau estimation inds = np.where(np.diff(np.sign(np.diff(corrs))) <= 0)[0] + 1 if len(inds) == 0: inds = np.arange(len(corrs)) exp_est = corrs[inds].copy() exp_est -= np.min(exp_est) exp_est_interp = np.interp(np.arange(lags[inds][0], lags[inds][-1]+1), lags[inds], exp_est) exp_est_bl = exp_est_interp - exp_est_interp[0] / np.exp(1) _inds = np.where(exp_est_bl < 0)[0] if len(_inds) == 0: tau_guess = inds[0] / fs else: pts = [_inds[0]-2, _inds[0]-1] tau_guess = (pts[np.argmin(exp_est_bl[pts])] + inds[0]) / fs # Fit _guess = [tau_guess, 5, 0, freq, .5, np.max(corrs), 0] _bounds = [ (0, 0, 0, 0, 0, 0, -.5), (tau_guess * 10, 1, .1, 10, 1, 1, .5) ] if bounds is None: bounds = _bounds else: bounds = np.array(bounds) xinds, yinds = np.where(bounds == None) if len(xinds) != 0: for x, y in zip(xinds, yinds): bounds[x, y] = _bounds[x][y] bounds = [tuple(b) for b in bounds.tolist()] if guess is None: guess = _guess else: guess = np.array(guess) inds = np.where(guess == None) if len(inds) != 0: for ind in inds[0]: guess[ind] = _guess[ind] guess = guess.tolist() # If guess is outside of bounds, # set to midpoint of bounds for ind, g in enumerate(guess): if g <= bounds[0][ind] or g >= bounds[1][ind]: guess[ind] = (bounds[0][ind] + bounds[1][ind]) / 2 try: params, _ = curve_fit( lambda lags, exp_tau, osc_tau, osc_gamma, freq, amp_ratio, height, offset: \ sim_acf_cos(lags, fs, exp_tau, osc_tau, osc_gamma, freq, amp_ratio, height, offset), lags, corrs, p0=guess, bounds=bounds, maxfev=maxfev ) except RuntimeError: params = np.nan return params, guess, bounds def _acf_proxy(args, lags, fs, maxfev): corrs, guess, bounds = args params, guess, bounds = _fit_acf(corrs, lags, fs, guess, bounds, maxfev) return params, guess, bounds def _acf_cos_proxy(args, lags, fs, maxfev): corrs, guess, bounds = args params, guess, bounds = _fit_acf_cos(corrs, lags, fs, guess, bounds, maxfev) return params, guess, bounds