# -*- coding: UTF-8 -*-
# Copyright (c) 2018, Thomas Hartmann & Dirk Gütlin
#
# This file is part of the fieldtrip2mne Project, see: https://gitlab.com/obob/fieldtrip2mne
#
# fieldtrip2mne is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# fieldtrip2mne is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with obob_subjectdb. If not, see <http://www.gnu.org/licenses/>.
import mne
import numpy
import pymatreader
__all__ = ['read_raw', 'read_epoched', 'read_avg']
"""
Purpose of this module is to read MEG or EEG data, created in MATLABs 'FieldTrip' toolbox and saved in .mat files,
into equivalent data structures of the MNE python module.
"""
[docs]def read_raw(ft_structure_path, data_name='data'):
"""This function extracts FieldTrip single trial raw data structures (FT_DATATYPE_RAW)
from .mat files and converts them to MNE RawArrays.
Parameters
----------
ft_structure_path: str
Path and filename of the .mat file containing the data.
data_name: str, optional
Name of heading dict/ variable name under which the data was originally saved
in MATLAB. Default is 'data'.
Returns
-------
mne.io.RawArray
A MNE RawArray structure consisting of the raw array and measurement info
Examples
--------
>>> read_raw("/home/usr/Documents/FieldTripRawFile.mat")
<RawArray | None, n_channels x n_times : 402 x 60001 (60.0 sec), ~185.0 MB, data loaded>
>>> read_raw("FieldTripRawFile2.mat", data_name="rawdata")
<RawArray | None, n_channels x n_times : 323 x 35001 (35.0 sec), ~117.0 MB, data loaded>
"""
ft_struct = pymatreader.read_mat(ft_structure_path, ignore_fields=['previous'], variable_names=[data_name])
ft_struct = ft_struct[data_name] #:load data and set ft_struct to the heading dictionary
data = numpy.array(ft_struct["trial"]) #:create the main data array
info = _create_info(ft_struct) #: create info structure
custom_raw = mne.io.RawArray(data, info) #: create an MNE RawArray
return custom_raw
[docs]def read_epoched(ft_structure_path, data_name='data', trialinfo_map=None, omit_trialinfo_index=True, omit_non_unique_trialinfo_index=True):
"""This function extracts FieldTrip multiple trial raw data structures (FT_DATATYPE_RAW)
from .mat files and converts them to MNE EpochsArrays.
.. warning:: Only epochs with the same amount of channels and samples are supported!
The data is read as it is. Events however are represented entirely different in FieldTrip compared to MNE.
In FieldTrip, each epoch corresponds to one row in the trialinfo field. This field can have one or more columns.
The function first removes columns according to the two omit parameters.
- If only one column remains, its values are used as event values in the MNE Epoch.
- If two or more columns remain, each unique combination of these values receives a new event value. These event
values are created automatically. In order to match these to conditions, you can use the trialinfo_map parameter.
Parameters
----------
ft_structure_path: str
Path and filename of the .mat file containing the data.
data_name: str, optional
Name of heading dict/ variable name under which the data was originally saved
in MATLAB. Default is 'data'.
trialinfo_map: dict, optional
A dictionary mapping condition strings (MNE's event_ids) to the trialinfo column. The values should be 1D numpy arrays.
See examples for details.
omit_trialinfo_index: bool, optional
Omit trialinfo columns that look like an index of the trials, i.e. in which every row is the row before + 1.
omit_non_unique_trialinfo_index: bool, optional
Omit trialinfo columns that contain a different value for each row. These are most likely additional data
like reaction times that cannot be represented in MNE.
Returns
-------
mne.EpochsArray
A MNE EpochsArray structure consisting of the epochs arrays, an event matrix,
start time before event (if possible, else defaults to 0) and measurement info.
Examples
--------
>>> read_epoched("/home/usr/Documents/FieldTripEpochsFile.mat")
<EpochsArray | n_events : 123 (all good), tmin : -1.0 (s), tmax : 2.99 (s), baseline : None, ~115.7 MB, data loaded,
'1': 21, '2': 9, '3': 12, '4': 22, '5': 20, '6': 7, '7': 9, '8': 23>
>>> read_epoched("FieldTripEpochsFile2.mat", data_name="epoched")
<EpochsArray | n_events : 10 (all good), tmin : -0.1 (s), tmax : 0.5 (s), baseline : None, ~87.5 MB, data loaded,
'1': 5, '2': 5>
>>> read_epoched('FieldTripEpochsFile', trialinfo_map={
>>> 'audio/attend': numpy.array([0, 1]),
>>> 'visual/attend': numpy.array([1, 1]),
>>> 'audio/non_attend': numpy.array([0, 0]),
>>> 'visual/non_attend': numpy.array([1, 0])})
"""
ft_struct = pymatreader.read_mat(ft_structure_path, ignore_fields=['previous'], variable_names=[data_name])
ft_struct = ft_struct[data_name] #:load data and set ft_struct to the heading dictionary
data = numpy.array(ft_struct["trial"]) #:create the epochs data array
(events, event_id) = _create_events(ft_struct, trialinfo_map, omit_trialinfo_index, omit_non_unique_trialinfo_index) #: create the events matrix
tmin = _set_tmin(ft_struct) #: create start time
info = _create_info(ft_struct) #: create info structure
custom_epochs = mne.EpochsArray(data=data, info=info, tmin=tmin, events=events, event_id=event_id) # create an MNE EpochsArray
return custom_epochs
[docs]def read_avg(ft_structure_path, condition='unspecified condition', data_name='data'):
"""This function extracts FieldTrip timelock data structures (FT_DATATYPE_TIMELOCK)
from .mat files and converts them to MNE EvokedArrays.
Parameters
----------
ft_structure_path: str
Path and filename of the .mat file containing the data.
condition: str, optional
Name or comment on the evoked condition. Default is 'unspecified condition'
data_name: str, optional
Name of heading dict/ variable name under which the data was originally saved
in MATLAB. Default is 'data'.
Returns
-------
mne.EvokedArray
A MNE EvokedArray structure consisting of the averaged data array,
comment or condition and measurement info.
Examples
--------
>>> read_avg("/home/usr/Documents/FieldTripTimelockFile.mat")
<Evoked | comment : 'unspecified condition', kind : average, time : [0.000000, 3.990000], n_epochs : 1, n_channels x n_times : 306 x 400, ~1.7 MB>
>>> read_avg("FieldTripTimelockFile2.mat", data_name="evoked")
<Evoked | comment : 'unspecified condition', kind : average, time : [0.000000, 2.490000], n_epochs : 1, n_channels x n_times : 306 x 250, ~1.1 MB>
>>> read_avg("FieldTripTimelockFile.mat", condition= "visual stimulus")
<Evoked | comment : 'visual stimulus', kind : average, time : [0.000000, 3.990000], n_epochs : 1, n_channels x n_times : 306 x 400, ~1.7 MB>
"""
ft_struct = pymatreader.read_mat(ft_structure_path, ignore_fields=['previous'], variable_names=[data_name])
ft_struct = ft_struct[data_name] #:load data and set ft_struct to the heading dictionary
data_evoked = ft_struct["avg"] #:create evoked data
info = _create_info(ft_struct) #: create info structure
evoked_array = mne.EvokedArray(data_evoked, info, comment=condition) #:create MNE EvokedArray
return evoked_array
def _create_info(ft_struct):
"""private function which creates an MNE info structure from a preloaded FieldTrip file"""
ch_names = list(ft_struct["label"])
sfreq = _set_sfreq(ft_struct)
ch_types = _set_ch_types(ft_struct)
montage = _create_montage(ft_struct)
info = mne.create_info(ch_names, sfreq, ch_types, montage)
return info
def _convert_ch_types(ch_type_array):
"""private function which converts the channel type names from filedtrip style (e.g. megmag)
to mne style (e.g. mag)"""
for index, name in enumerate(ch_type_array):
if name in ("megplanar", "meggrad"):
ch_type_array[index] = "grad"
elif name == "megmag":
ch_type_array[index] = "mag"
elif name == "eeg":
ch_type_array[index] = "eeg"
elif name in ("refmag", "refgrad"): # is it correct to put both of them here??
ch_type_array[index] = "ref_meg"
elif name in ("unknown", "clock"):
ch_type_array[index] = "misc"
elif name in ("analog trigger", "digital trigger", "trigger"):
ch_type_array[index] = "stim"
elif name.startswith('MEG'):
if name.endswith(('2', '3')):
ch_type_array[index] = 'grad'
elif name.endswith('1'):
ch_type_array[index] = 'mag'
else:
raise ValueError('Unknown MEG channel')
elif name.startswith('EEG'):
ch_type_array[index] = 'eeg'
else:
ch_type_array[index] = 'misc'
return ch_type_array
def _set_ch_types(ft_struct):
"""private function which finds the channel types for every channel"""
if 'hdr' in ft_struct and 'chantype' in ft_struct['hdr']:
available_channels = numpy.where(numpy.in1d(ft_struct["hdr"]['label'], ft_struct['label']))
ch_types = _convert_ch_types(ft_struct["hdr"]["chantype"][available_channels])
elif 'grad' in ft_struct and 'chantype' in ft_struct['grad']:
available_channels = numpy.where(numpy.in1d(ft_struct["grad"]['label'], ft_struct['label']))
ch_types = _convert_ch_types(ft_struct["grad"]["chantype"][available_channels])
elif 'elec' in ft_struct and 'chantype' in ft_struct['grad']:
available_channels = numpy.where(numpy.in1d(ft_struct["elec"]['label'], ft_struct['label']))
ch_types = _convert_ch_types(ft_struct["elec"]["chantype"][available_channels])
elif 'label' in ft_struct:
ch_types = _convert_ch_types(ft_struct['label'])
else:
raise ValueError('Cannot find channel types')
return ch_types
def _create_montage(ft_struct):
"""private function which creates a montage from the FieldTrip data"""
# try to create a montage
montage_pos, montage_ch_names = list(), list()
# see if there is a grad field in the structure
try:
available_channels = numpy.where(numpy.in1d(ft_struct["grad"]['label'], ft_struct['label']))
montage_ch_names.extend(ft_struct['grad']['label'][available_channels])
montage_pos.extend(ft_struct['grad']['chanpos'][available_channels])
except KeyError:
pass
# see if there is a elec field in the structure
try:
available_channels = numpy.where(numpy.in1d(ft_struct["elec"]['label'], ft_struct['label']))
montage_ch_names.extend(ft_struct['elec']['label'][available_channels])
montage_pos.extend(ft_struct['elec']['chanpos'][available_channels])
except KeyError:
pass
montage = None
if len(montage_ch_names) > 0 and len(montage_pos) > 0 and len(montage_ch_names) == len(montage_pos):
montage = mne.channels.DigMontage(dig_ch_pos=dict(zip(montage_ch_names, montage_pos)))
return montage
def _set_sfreq(ft_struct):
"""private function which sets the sample frequency"""
try:
sfreq = ft_struct["fsample"]
except KeyError:
try:
t1 = ft_struct["time"][0]
t2 = ft_struct["time"][1]
difference = abs(t1 - t2)
sfreq = 1 / difference
except KeyError:
raise ValueError("No Source for sfreq found")
return sfreq
def _set_tmin(ft_struct):
"""private function which sets the start time before the event in evoked data if possible"""
times = ft_struct["time"]
time_check = all(times[i][0] == times[i - 1][0] for i, x in enumerate(times))
if time_check == True:
tmin = times[0][0]
else:
tmin = None
return tmin
def _create_events(ft_struct, trialinfo_map, omit_trialinfo_index, omit_non_unique_trialinfo_index):
"""private function which creates an event matrix from the FieldTrip structure"""
# sanitize trialinfo_map
if trialinfo_map:
for (key, value) in trialinfo_map.items():
trialinfo_map[key] = numpy.array(value)
event_type = ft_struct["trialinfo"]
event_number = range(len(event_type))
# events_trans_val: Event Transition values. I have no idea where to take them from,
# but they appear to be mostly 0, so i used numpy.zeros
event_trans_val = numpy.zeros(len(event_type))
if omit_trialinfo_index:
index_columns = numpy.all(numpy.diff(event_type, axis=0) == 1, axis=0)
event_type = event_type[:, numpy.logical_not(index_columns)]
if omit_non_unique_trialinfo_index:
unique_columns = numpy.any(numpy.diff(numpy.sort(event_type, axis=0), axis=0) == 0, axis=0)
event_type = event_type[:, unique_columns]
(unique_events, unique_events_index) = numpy.unique(event_type, axis=0, return_inverse=True)
if event_type.ndim == 1:
final_event_types = event_type
else:
final_event_types = unique_events_index + 1
events = numpy.vstack([numpy.array(event_number), event_trans_val, final_event_types]).astype('int').T
event_id = dict()
if not trialinfo_map:
idx = 1
trialinfo_map = dict()
for cur_unique_event in unique_events:
trialinfo_map['condition %02d' % (idx, )] = cur_unique_event
idx = idx + 1
for (cur_event_id, cur_trialinfo_mat) in trialinfo_map.items():
event_index = numpy.where(numpy.all(unique_events == cur_trialinfo_mat, axis=1))[0][0] + 1
event_id[cur_event_id] = event_index
return (events, event_id)