Source code for ndspflow.workflows.bids
"""BIDS input."""
import os
import warnings
import numpy as np
from mne_bids import BIDSPath, read_raw_bids
[docs]class BIDS:
"""BIDS interface.
Attributes
----------
bids_path : str, optional, default: None
Path to BIDS directory.
subjects : list of str, optional, default: None
Subset of subjects to include.
fs : float, optional, default: None
Sampling rate, in Hertz.
y_array : ndarray
Array input.
"""
[docs] def __init__(self, bids_path=None, subjects=None, fs=None, **bids_kwargs):
"""Initalize BIDS object.
Parameters
----------
bids_path : str, optional, default: None
Path to BIDS directory.
subjects : list of str, optional, default: None
Subset of subjects to include.
fs : float, optional, default: None
Sampling rate, in Hertz.
**bids_kwargs
Additional keyword arguments to pass to mne_bids.path.BIDSPath
initalization. Examples include: session, task, acquisition, run, etc.
"""
# Path to bids
self.bids_path = bids_path
# Allows subject sub-selection
if subjects is None and bids_path is not None:
self.subjects = sorted([sub.strip('sub-') for sub in os.listdir(bids_path)
if 'sub-' in sub])
elif subjects is not None:
self.subjects = [sub.strip('sub-') for sub in subjects]
else:
self.subjects = subjects
# MNE BIDSPath initalization kwargs
self.bids_kwargs = bids_kwargs
# Ensure unpackable
self.bids_kwargs = {} if self.bids_kwargs is None else self.bids_kwargs
# Sampling rate
self.fs = fs
# Channel names
self.ch_names = None
# Output array
self.y_array = None
self.nodes = []
[docs] def read_bids(self, subject=None, allow_ragged=False, queue=True):
"""Read the BIDS directory into memory.
Parameters
----------
subject : int, optional, default: None
Read a single subject into memory. If None, the entire BIDS dataset is read
into memory at once.
allow_ragged : bool, optional, default: True
Allow and use ragged arrays if True. Otherwise assumes non-ragged and sets max
output length to min raw length. Only used if ind is None.
queue : bool, optional, default: True
Queue's reading into nodes if True. Otherwise reads y_array in.
"""
if queue:
# Queue for later execution
self.nodes.append(['read_bids', allow_ragged, False])
elif subject is not None:
# Read single subject
subject = subject.strip('sub-')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
bids_path = BIDSPath(root=self.bids_path, subject=subject,
**self.bids_kwargs)
raw = read_raw_bids(bids_path, verbose=False)
if self.fs is None:
self.fs = int(raw.info['sfreq'])
self.y_array = raw.get_data()
self.ch_names = raw.ch_names
del raw
else:
# Read all subjects
for ind, sub in enumerate(self.subjects):
# Raw bids
with warnings.catch_warnings():
warnings.simplefilter('ignore')
bids_path = BIDSPath(root=self.bids_path, subject=sub, **self.bids_kwargs)
raw = read_raw_bids(bids_path, verbose=False)
# Sampling rate
fs = int(raw.info['sfreq'])
if self.fs is None:
self.fs = fs
elif self.fs != fs:
raise ValueError('Resample subjects data to the same sampling rates.')
# Channel names
self.ch_names = raw.ch_names
# Get array
arr = raw.get_data()
del raw
# Initalize array
if ind == 0 and not allow_ragged:
self.y_array = np.zeros((len(self.subjects), *arr.shape))
elif ind == 0 and allow_ragged:
self.y_array = []
if allow_ragged:
self.y_array.append(arr)
else:
# Trim array if needed
y_len = len(self.y_array[ind])
if len(arr) < y_len:
self.y_array = self.y_array[:, :len(arr)]
elif len(arr) > y_len:
arr = arr[:y_len]
self.y_array[ind] = arr
if allow_ragged:
self.y_array = np.array(self.y_array, dtype=object)