Source code for timescales.autoreg.spectral

"""Compute spectra from autoregssive models."""

import warnings
from functools import partial
from multiprocessing import Pool, cpu_count

import numpy as np
from scipy.fft import fftfreq

from statsmodels.regression.linear_model import yule_walker, burg
from spectrum import arma2psd


[docs]def compute_ar_spectrum(sig, fs, order, f_range=None, method='burg', nfft=4096, n_jobs=1): """Compute an autoregressive power spectrum. Parameters ---------- sig : 1d or 2d array Voltage time series. fs : float Sampling rate, in Hz. order : int Number of autogressive coefficients to fit. f_range : tuple Lower and upper frequency bounds. method : str, {'burg', 'yule_walker'} Coefficient estimation method. nfft : int, optional, default: 4096 Window length. n_jobs : optional, int, default: 1 Number of jobs to run in parallel. Returns ------- freqs : 1d array Frequency definitions. powers : 1d array Power values. """ # 2d if sig.ndim == 2: n_jobs = cpu_count() if n_jobs == -1 else n_jobs with Pool(processes=n_jobs) as pool: pfunc = partial(compute_ar_spectrum, fs=fs, order=order, nfft=nfft, f_range=f_range, n_jobs=n_jobs) mapping = pool.map(pfunc, sig) results = np.array(mapping) freqs = results[0, 0] powers = results[:, 1, :] return freqs, powers # 1d if method == 'burg': ar, rho = burg(sig, order=order) else: ar, rho = yule_walker(sig, order=order) powers = arma2psd(A=-ar, rho=rho, T=fs, NFFT=nfft) freqs = fftfreq(nfft, 1/fs) powers = powers[:len(freqs)//2] freqs = freqs[:len(freqs)//2] if f_range is not None: inds = np.where((freqs >= f_range[0]) & (freqs <= f_range[1])) freqs = freqs[inds] powers = powers[inds] return freqs, powers