"""Compute spectra from autoregssive models."""importwarningsfromfunctoolsimportpartialfrommultiprocessingimportPool,cpu_countimportnumpyasnpfromscipy.fftimportfftfreqfromstatsmodels.regression.linear_modelimportyule_walkerfromspectrumimportarma2psd
[docs]defcompute_ar_spectrum(sig,fs,order,f_range=None,method='burg',nfft=4096,n_jobs=1):"""Compute an autoregressive power spectrum. Parameters ---------- sig : 1d or 2d array Voltage time series. fs : float Sampling rate, in Hz. order : int Number of autogressive coefficients to fit. f_range : tuple Lower and upper frequency bounds. method : str, {'burg', 'yule_walker'} Coefficient estimation method. nfft : int, optional, default: 4096 Window length. n_jobs : optional, int, default: 1 Number of jobs to run in parallel. Returns ------- freqs : 1d array Frequency definitions. powers : 1d array Power values. """# 2difsig.ndim==2:n_jobs=cpu_count()ifn_jobs==-1elsen_jobswithPool(processes=n_jobs)aspool:pfunc=partial(compute_ar_spectrum,fs=fs,order=order,nfft=nfft,f_range=f_range,n_jobs=n_jobs)mapping=pool.map(pfunc,sig)results=np.array(mapping)freqs=results[0,0]powers=results[:,1,:]returnfreqs,powers# 1difmethod=='burg':ar=burg(sig,order=order)else:ar,_=yule_walker(sig,order=order)freqs,powers=ar_to_psd(ar,fs,nfft,f_range)returnfreqs,powers
[docs]defar_to_psd(ar_coeffs,fs,nfft,f_range=None):"""Compute PSD from AR coefficients."""powers=arma2psd(A=-ar_coeffs,rho=1.,T=fs,NFFT=nfft)freqs=fftfreq(nfft,1/fs)powers=powers[:len(freqs)//2]freqs=freqs[:len(freqs)//2]iff_rangeisnotNone:inds=np.where((freqs>=f_range[0])&(freqs<=f_range[1]))freqs=freqs[inds]powers=powers[inds]returnfreqs,powers
[docs]defburg(sig,order,demean=True):ifdemean:sig=sig-sig.mean()# Initalize arrays for reflection coefficients# and ar coefficientsk=np.zeros(order)ar=np.zeros(order)# Loop to solve reflection coefficients, k_f=sig_b=sigforiinrange(order):# Forward and backward shiftsf=_f[1:]b=_b[:-1]# Density, sum of squaresifi==0:den=(f@f)+(b@b)else:# Faster via recursionden=((1-k[i-1]**2)*den-(_f[0]**2)-(_b[-1]**2))# Reflection coefficientk[i]=(-2*b@f)/den# AR coefficientar[i]=-k[i]ifi>0:prev=ar[:i]ar[:i]=prev-ar[i]*prev[::-1]# Update forward and backward_f=f+k[i]*b_b=b+k[i]*freturnar