Source code for timescales.autoreg.spectral

"""Compute spectra from autoregssive models."""

import warnings
from functools import partial
from multiprocessing import Pool, cpu_count

import numpy as np
from scipy.fft import fftfreq

from statsmodels.regression.linear_model import yule_walker
from spectrum import arma2psd


[docs] def compute_ar_spectrum(sig, fs, order, f_range=None, method='burg', nfft=4096, n_jobs=1): """Compute an autoregressive power spectrum. Parameters ---------- sig : 1d or 2d array Voltage time series. fs : float Sampling rate, in Hz. order : int Number of autogressive coefficients to fit. f_range : tuple Lower and upper frequency bounds. method : str, {'burg', 'yule_walker'} Coefficient estimation method. nfft : int, optional, default: 4096 Window length. n_jobs : optional, int, default: 1 Number of jobs to run in parallel. Returns ------- freqs : 1d array Frequency definitions. powers : 1d array Power values. """ # 2d if sig.ndim == 2: n_jobs = cpu_count() if n_jobs == -1 else n_jobs with Pool(processes=n_jobs) as pool: pfunc = partial(compute_ar_spectrum, fs=fs, order=order, nfft=nfft, f_range=f_range, n_jobs=n_jobs) mapping = pool.map(pfunc, sig) results = np.array(mapping) freqs = results[0, 0] powers = results[:, 1, :] return freqs, powers # 1d if method == 'burg': ar = burg(sig, order=order) else: ar, _ = yule_walker(sig, order=order) freqs, powers = ar_to_psd(ar, fs, nfft, f_range) return freqs, powers
[docs] def ar_to_psd(ar_coeffs, fs, nfft, f_range=None): """Compute PSD from AR coefficients.""" powers = arma2psd(A=-ar_coeffs, rho=1., T=fs, NFFT=nfft) freqs = fftfreq(nfft, 1/fs) powers = powers[:len(freqs)//2] freqs = freqs[:len(freqs)//2] if f_range is not None: inds = np.where((freqs >= f_range[0]) & (freqs <= f_range[1])) freqs = freqs[inds] powers = powers[inds] return freqs, powers
[docs] def burg(sig, order, demean=True): if demean: sig = sig - sig.mean() # Initalize arrays for reflection coefficients # and ar coefficients k = np.zeros(order) ar = np.zeros(order) # Loop to solve reflection coefficients, k _f = sig _b = sig for i in range(order): # Forward and backward shifts f = _f[1:] b = _b[:-1] # Density, sum of squares if i == 0: den = (f@f) + (b@b) else: # Faster via recursion den = ( (1 - k[i-1]**2) * den - (_f[0]**2) - (_b[-1]**2) ) # Reflection coefficient k[i] = (-2 * b @ f) / den # AR coefficient ar[i] = -k[i] if i > 0: prev = ar[:i] ar[:i] = prev - ar[i] * prev[::-1] # Update forward and backward _f = f + k[i] * b _b = b + k[i] * f return ar