Source code for timescales.fit.psd

"""Spectral estimation methods."""

from itertools import repeat
from functools import partial
from multiprocessing import Pool, cpu_count

import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import curve_fit

from neurodsp.spectral import compute_spectrum
from timescales.autoreg import compute_ar_spectrum

from fooof import FOOOF, FOOOFGroup
from fooof.core.funcs import expo_const_function, expo_double_const_function

from timescales.conversions import convert_knee
from timescales.utils import normalize as normalize_psd
from timescales.utils import resample_logspace
from timescales.fit.utils import progress_bar


[docs]class PSD: """Power spectral density class. Parameters ---------- freqs : 1d array Frequencies at which the measure was calculated. powers : 1d or 2d array Power spectral density. Attributes ---------- powers_fit : 1d or 2d array Aperiodic fit. params : 1d or 2d array Parameters as [offset, knee_freq, exp, const]. param_names : list of str Parameter names in order of params. knee_freq : float or 1d array Knee frequency. rsq : float R-squared of the aperiodic fit. guess : list, optional, default: None Inital parameter estimates. bounds : list, optional, default: None Parameters bounds as [(*lower_bounds), (*upper_bounds)]. """
[docs] def __init__(self, freqs=None, powers=None): """Initialize object.""" self.freqs = freqs self.powers = powers self.powers_fit = None # Set via other methods self.params = None self.param_names = ['offset', 'knee_freq', 'exp', 'const'] self.knee_freq = None self.tau = None self.rsq = None self.rsq_full = None self.guess=None self.bounds = None # For comparison to PSD models self.tau = None self.knee_freq = None
[docs] def compute_spectrum(self, sig, fs, ar_order=None, f_range=None, norm_range=None, n_jobs=1, **kwargs): """Compute powers spectral density. Parameters ---------- sig : 1d or 2d array Voltage time series or spike counts. fs : float Sampling rate, in Hz. ar_order : int, optional, default: None Compute an autoregressive spectra when not None. f_range : tuple of (float, float) Frequency range of interest, inclusive. norm_range : tuple of (float, float), optional, default: None The lower and upper normalization range. n_jobs : int, optional, default: -1 Number of jobs to run in parralel, when powers is 2d. Only available when using an ar_order. **kwargs Additional keyword arguments to pass to compute_spectrum or compute_ar_spectrum. """ if ar_order is not None: self.freqs, self.powers = compute_ar_spectrum(sig, fs, ar_order, n_jobs=n_jobs, f_range=f_range, **kwargs) else: self.freqs, self.powers = compute_spectrum(sig, fs, f_range=f_range, **kwargs) if norm_range is not None: self.powers = normalize_psd(self.powers, *norm_range)
[docs] def fit(self, freqs=None, powers=None, f_range=None, ap_mode='single', method='huber', fooof_init=None, bounds=None, guess=None, n_resample=None, f_scale=.1, r_thresh=None, maxfev=1000, sigma=None, n_jobs=1, progress=None): """Fit power spectra. Parameters ---------- freqs : 1d array Frequencies at which the measure was calculated. powers : 1d or 2d array Power spectral density. f_range : tuple of (float, float) Frequency range of interest, inclusive. ap_mode : {'single', 'double'} Aperiodic mode as a single or double timescales (knee) process. Only availble for non-fooof methods. method : {'huber', 'cauchy', 'soft_l1', 'arctan', 'fooof'} Fit using a single scipy curve_fit call using robust regression or use the fooof model. fooof_init : dict, optional, default: None Fooof initialization arguments. Only used if method is 'fooof'. bounds : 2d array-like Parameter bounds. guess : 1d array-like Initial parameter estimates. n_resample : int, optional, default: None: Evenly resample in log-log space to improve robust regression. f_scale : float or 1d array-like, optional, default: 0.1 Value of soft margin between inlier and outlier residuals. Only used if method is in {'huber', 'cauchy', 'soft_l1', 'arctan'}. r_thresh : float, optional, default: None Minimum r-squared required to accept f_scale. Only used when f_scale is a 1d array. When None, all f_scale values are attempted and the highest resulting r-squared is accepted. maxfev : int Maximum number of fitting iterations. Only availble for huber method. sigma : dict, optional, default: None Target error per sample. n_jobs : int Number of jobs to run in parralel, when powers is 2d. progress : {None, 'tqdm', 'tqdm.notebook'} Specify whether to display a progress bar. Uses 'tqdm', if installed. """ if freqs is not None: self.freqs = freqs if powers is not None: self.powers = powers self.bounds = bounds self.guess = guess if f_range is not None: inds = np.where((self.freqs >= f_range[0]) & \ (self.freqs <= f_range[1]))[0] self.freqs = self.freqs[inds] self.powers = self.powers[:, inds] if self.powers.ndim == 2 else self.powers[inds] # Skip 0 hz if using fooof or resampling if (method == 'fooof' or n_resample is not None) and self.freqs[0] == 0: self.freqs = self.freqs[1:] self.powers = self.powers[1:] if self.powers.ndim == 1 else self.powers[:, 1:] # Fit if method != 'fooof' and fooof_init is None: # Robust regression (aperiodic only) self.params, self.powers_fit = fit_psd_robust( self.freqs, self.powers, ap_mode=ap_mode, loss=method, f_scale=f_scale, r_thresh=r_thresh, n_resample=n_resample, bounds=bounds, guess=guess, maxfev=maxfev, sigma=sigma, n_jobs=n_jobs, progress=progress ) elif method == 'fooof' or fooof_init is not None: # Aperiodic and periodic model self.params, self.powers_fit, self.rsq_full = fit_psd_fooof( self.freqs, self.powers, fooof_init=fooof_init, return_rsq=True, ap_bounds=bounds, ap_guess=guess, n_jobs=n_jobs, progress=progress ) else: raise ValueError('method must be in \{\'huber\', \'cauchy\', \'soft_l1\', \'arctan\', \'fooof\'\}.') # Get r-squared, knee freqs, and taus if self.powers_fit.ndim == 1: self.rsq = np.corrcoef(np.log10(self.powers), np.log10(self.powers_fit))[0][1] ** 2 self.knee_freq = self.params[1] self.tau = convert_knee(self.knee_freq) else: self.rsq = np.zeros(len(self.powers_fit)) for ind in range(len(self.powers_fit)): self.rsq[ind] = np.corrcoef(np.log10(self.powers[ind]), np.log10(self.powers_fit[ind]))[0][1] ** 2 self.knee_freq = self.params[:, 1] self.tau = convert_knee(self.knee_freq)
[docs] def plot(self, ax=None, title=None): """Plot spectra. Parameters ---------- ax : AxesSubplot, optional, default: None Axis to plot on. """ if ax is None: _, ax = plt.subplots(figsize=(8, 6)) if self.freqs is None or self.powers is None: raise ValueError('freqs and powers are undefined.') # Plot spectra if self.powers.ndim == 1: ax.loglog(self.freqs, self.powers, label='PSD', color='C0') elif self.powers.ndim == 2: ax.loglog(self.freqs, self.powers.mean(axis=0), label='PSD', color='C0') for power in self.powers: ax.loglog(self.freqs, power, color='C0', alpha=.1) # Plot fits if self.powers_fit is not None and self.powers_fit.ndim == 1: ax.loglog(self.freqs, self.powers_fit, label='Fit', ls='--', color='C1') elif self.powers_fit is not None and self.powers_fit.ndim == 2: ax.loglog(self.freqs, self.powers_fit.mean(axis=0), ls='--', color='C1', label='Mean Fit') ax.legend() ax.set_ylabel('Powers') ax.set_xlabel('Frequencies') title = 'Aperiodic Model Fit' if title is None else title ax.set_title(title)
def fit_psd_fooof(freqs, powers, f_range=None, fooof_init=None, return_rsq=False, ap_bounds=None, ap_guess=None, n_jobs=-1, progress=None): """Fit a PSD, using SpecParam, and estimate tau. Parameters ---------- freqs : 1d array Frequencies at which the measure was calculated. powers : 1d or 2d array Power spectral density. f_range : tuple of (float, float), optional, default: None Frequency range of interest. fooof_init : dict, optional, default: None Fooof initialization arguments. return_rsq, optional, default: False Returns the model's full r-squared if True. ap_bounds : 2d array-like Aperiodic bounds. ap_guess : 1d array-like Initial aperiodic parameter estimates. n_jobs : int Number of jobs to run in parralel, when powers is 2d. progress : {None, 'tqdm', 'tqdm.notebook'} Specify whether to display a progress bar. Uses 'tqdm', if installed. Returns ------- params : 1d or 2d array Parameters as [offset, knee_freq, exp, const]. powers_fit : 1d or 2d array Aperiodic fit. full_rsq : float or 1d array, optional Full model's r-squared. """ if fooof_init is None: fooof_init = {} fooof_init_cp = fooof_init.copy() ap_mode = fooof_init_cp.pop('aperiodic_mode', 'knee_constant') # Init FOOOF if powers.ndim == 1: fm = FOOOF(aperiodic_mode=ap_mode, verbose=False, **fooof_init_cp) elif powers.ndim == 2: fm = FOOOFGroup(aperiodic_mode=ap_mode, verbose=False, **fooof_init_cp) # Parameter bounds and guess if ap_bounds is None: ap_bounds = [[-np.inf, 1e-6, 0, 0], [ np.inf, freqs.max(), np.inf, np.inf]] if ap_guess is None: ap_guess = [0, 1, 1, 1e-6] if ap_mode == 'knee': ap_bounds[0] = ap_bounds[0][:-1] ap_bounds[1] = ap_bounds[1][:-1] ap_guess = ap_guess[:-1] fm._ap_guess = ap_guess fm._ap_bounds = ap_bounds # Fit if powers.ndim == 1: fm.fit(freqs, powers, f_range) else: fm.fit(freqs, powers, f_range, n_jobs, progress) if not fm.has_model: return np.nan, np.nan powers_fit = 10**fm._ap_fit if powers.ndim == 1 else \ np.array([10**fm.get_fooof(i)._ap_fit for i in range(len(fm))]) params = fm.get_params('aperiodic') if return_rsq: return params, powers_fit, fm.get_params('r_squared') else: return params, powers_fit def fit_psd_robust(freqs, powers, f_range=None, ap_mode='single', loss='huber', f_scale=.1, r_thresh=None, n_resample=None, bounds=None, guess=None, maxfev=1000, sigma=None, n_jobs=-1, progress=None): """Fit the aperiodic spectrum using robust regression. Parameters ---------- freqs : 1d array Frequencies at which the measure was calculated. powers : 1d or 2d array Power spectral density. f_range : tuple of (float, float) Frequency range of interest. ap_mode : {'single', 'double'} Aperiodic mode as a single or double timescales (knee) process. loss : {'huber', 'soft_l1', 'cauchy', 'arctan'} Loss function. f_scale : float or 1d array-like, optional, default: 0.1 Value of soft margin between inlier and outlier residuals. r_thresh : float, optional, default: None Minimum r-squared required to accept f_scale. When None, all f_scale values are attempted and the highest resulting r-squared is accepted. n_resample : int, optional, default: None: Evenly resample in log-log space to improve robust regression. bounds : 2d array-like Parameter bounds. guess : 1d array-like Initial parameter estimates. maxfev : int Maximum number of fitting iterations. sigma : dict, optional, default: None Target error per sample. n_jobs : int Number of jobs to run in parralel, when powers is 2d. progress : tqdm, optional, default: None Progress bar. Returns ------- params : 1d or 2d array Parameters as [offset, knee_freq, exp, const, (f_scale, r_squared)]. Note: f_scale and r_squared only returned when optimizing f_scale using an array input. powers_fit : 1d or 2d array Aperiodic fit. """ # Unpack when in a mp pool if powers is None: freqs, powers = freqs # Bound to freq range if f_range is not None: inds = np.where((freqs >= f_range[0]) & (freqs <= f_range[1]))[0] freqs = freqs[inds] powers = powers[:, inds] if powers.ndim == 2 else powers[inds] # Parameter bounds and guess fmax = freqs.max() if bounds is None and ap_mode == 'single': bounds = [[-np.inf, 1e-3, 0, 0], [ np.inf, fmax, 10, 10]] elif bounds is None and ap_mode == 'double': bounds = [[-np.inf, 1e-3, 0, 0, -np.inf, 1e-3, 0, 0], [ np.inf, fmax, 10, 10, np.inf, fmax, 10, 10]] if guess is None and ap_mode == 'single': guess = [1, 1, 2, 1e-3] elif guess is None and ap_mode == 'double': guess = [1, 1, 2, 1e-3, 0, 10, 2, 1e-3] if ap_mode == 'double': # Double knee model expo_func = expo_double_const_function else: # Single knee model expo_func = expo_const_function if powers.ndim == 2: # Recursively call 1d in parallel n_jobs = cpu_count() if n_jobs == -1 else n_jobs _freqs = repeat(freqs) with Pool(processes=n_jobs) as pool: mapping = pool.imap( partial(fit_psd_robust, powers=None, ap_mode=ap_mode, n_resample=n_resample, loss=loss, bounds=bounds, guess=guess, maxfev=maxfev, n_jobs=n_jobs), zip(_freqs, powers) ) results = list(progress_bar(mapping, progress, len(powers), pbar_desc='Fitting PSD')) params = np.array([r[0] for r in results]) powers_fit = np.array([r[1] for r in results]) else: # 1d if n_resample is not None: # Evenly resample in log-log space for improved robust regression freqs_orig = freqs.copy() freqs, powers = resample_logspace(freqs, powers, n_resample) if isinstance(f_scale, (float, int)): params, _ = curve_fit(expo_func, freqs, np.log10(powers), loss=loss, f_scale=f_scale, maxfev=maxfev, p0=guess, bounds=bounds, sigma=sigma) else: rsq = -np.inf for f in f_scale: # Catch non-convergence exceptions try: _params, _ = curve_fit(expo_func, freqs, np.log10(powers), loss=loss, f_scale=f, maxfev=maxfev, p0=guess, bounds=bounds, sigma=sigma) except RuntimeError: continue # Get fit _powers_fit = 10**expo_func(freqs, *_params) # Get r-squared from fit _rsq = np.corrcoef(np.log10(powers), np.log10(_powers_fit))[0][1] ** 2 if r_thresh is not None and _rsq > r_thresh: # Above r-squared threshold params = _params powers_fit = _powers_fit break elif _rsq > rsq: # Track the current best r-squared params = _params powers_fit = _powers_fit rsq = _rsq # Move back to original frequency space if resampling if n_resample is not None: powers_fit = 10**expo_func(freqs_orig, *params) else: powers_fit = 10**expo_func(freqs, *params) # Sort parameters using ascending knee frequency if ap_mode == 'double': if params[1] > params[5]: params = np.hstack((params[4:], params[:4])) return params, powers_fit