diff --git a/lasthdr.txt b/lasthdr.txt new file mode 100644 index 00000000000..64a14bdcbb0 --- /dev/null +++ b/lasthdr.txt @@ -0,0 +1 @@ +D:\data\anc\1anc.hdr \ No newline at end of file diff --git a/mne/io/boxy/boxy.py b/mne/io/boxy/boxy.py index 7e775074008..17944615103 100644 --- a/mne/io/boxy/boxy.py +++ b/mne/io/boxy/boxy.py @@ -539,8 +539,14 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): # Save our data based on data type. data_[index_loc, :] = boxy_array[:, channel] + + ###need to fix the first few bad points in the recording### + n_bad_points = 12 + for i_point in range(n_bad_points): + data_[:,i_point] = data_[:,n_bad_points] # Phase unwrapping. + if i_data == 'Ph': print('Fixing phase wrap') # Accounts for sharp, sudden changes in phase @@ -558,7 +564,7 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): print('Detrending phase data') # Remove trends and drifts that occur over time. - y = np.linspace(0, np.size(data_, axis=1) - 1, + y = np.linspace(1, np.size(data_, axis=1), np.size(data_, axis=1)) x = np.transpose(y) for i_chan in range(np.size(data_, axis=0)): @@ -580,7 +586,7 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): ph_out_thr = 3 # Set ddof to 1 to mimic matlab. - sdph = np.std(data_, 1, ddof=1) + sdph = np.std(data_[:, n_bad_points - 1:], 1, ddof=1) n_ph_out = np.zeros(np.size(data_, axis=0), dtype=np.int8) @@ -591,18 +597,27 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): if len(outliers) > 0: if outliers[0] == 0: outliers = outliers[1:] - if len(outliers) > 0: - if (outliers[-1] == np.size(data_, - axis=1) - 1): - outliers = outliers[:-1] - n_ph_out[i_chan] = int(len(outliers)) - for i_pt in range(n_ph_out[i_chan]): - j_pt = outliers[i_pt] - data_[i_chan, j_pt] = ( - (data_[i_chan, j_pt - 1] + - data_[i_chan, j_pt + 1]) / 2) - - # Convert phase to pico seconds. + if (outliers[-1] == np.size(data_, + axis=1) - 1): + outliers = outliers[:-1] + n_ph_out[i_chan] = int(len(outliers)) + for i_pt in range(n_ph_out[i_chan]): + j_pt = outliers[i_pt] + data_[i_chan, j_pt] = ( + (data_[i_chan, j_pt - 1] + + data_[i_chan, j_pt + 1]) / 2) + + # now let's normalise our data # + for i_chan in range(len(data_)): + if i_data == 'Ph': + data_[i_chan,:] = (data_[i_chan,:] - + np.mean(data_[i_chan,:])) + else: + data_[i_chan,:] = (data_[i_chan,:] / + np.mean(data_[i_chan,:]) - 1) + + # Convert phase to pico seconds. + if i_data == 'Ph': for i_chan in range(np.size(data_, axis=0)): data_[i_chan, :] = ((1e12 * data_[i_chan, :]) / (360 * mtg_mdf[i_mtg][i_chan])) diff --git a/mne/io/boxy/tests/test_boxy.py b/mne/io/boxy/tests/test_boxy.py index ed4a75014fb..ef5566e1a42 100644 --- a/mne/io/boxy/tests/test_boxy.py +++ b/mne/io/boxy/tests/test_boxy.py @@ -1,226 +1,336 @@ -# -*- coding: utf-8 -*- -# Authors: Robert Luke -# Eric Larson -# simplified BSD-3 license +# Authors: Kyle Mathewson, Jonathan Kuziek +# +# License: BSD (3-clause) -import os.path as op -import shutil +import os +import re as re +import numpy as np +import scipy.io as spio import pytest -from numpy.testing import assert_allclose, assert_array_equal +from numpy.testing import assert_allclose +import mne from mne.datasets.testing import data_path, requires_testing_data -from mne.io import read_raw_nirx -from mne.io.tests.test_raw import _test_raw_reader -from mne.transforms import apply_trans, _get_trans -from mne.utils import run_tests_if_main -from mne.preprocessing.nirs import source_detector_distances,\ - short_channels +from mne.transforms import apply_trans, get_ras_to_neuromag_trans -fname_nirx_15_0 = op.join(data_path(download=False), - 'NIRx', 'nirx_15_0_recording') -fname_nirx_15_2 = op.join(data_path(download=False), - 'NIRx', 'nirx_15_2_recording') -fname_nirx_15_2_short = op.join(data_path(download=False), - 'NIRx', 'nirx_15_2_recording_w_short') +# Load AC, DC, and Phase data. +boxy_raw_dir = os.path.join(data_path(download=False), + 'BOXY', 'boxy_short_recording') +raw_intensity_dc = mne.io.read_raw_boxy(boxy_raw_dir, 'DC', + verbose=True).load_data() +raw_intensity_ac = mne.io.read_raw_boxy(boxy_raw_dir, 'AC', + verbose=True).load_data() +raw_intensity_ph = mne.io.read_raw_boxy(boxy_raw_dir, 'Ph', + verbose=True).load_data() -@requires_testing_data -def test_nirx_15_2_short(): - """Test reading NIRX files.""" - raw = read_raw_nirx(fname_nirx_15_2_short, preload=True) - - # Test data import - assert raw._data.shape == (26, 145) - assert raw.info['sfreq'] == 12.5 - - # Test channel naming - assert raw.info['ch_names'][:4] == ["S1_D1 760", "S1_D1 850", - "S1_D9 760", "S1_D9 850"] - assert raw.info['ch_names'][24:26] == ["S5_D13 760", "S5_D13 850"] - - # Test frequency encoding - assert raw.info['chs'][0]['loc'][9] == 760 - assert raw.info['chs'][1]['loc'][9] == 850 - - # Test info import - assert raw.info['subject_info'] == dict(sex=1, first_name="MNE", - middle_name="Test", - last_name="Recording") - - # Test distance between optodes matches values from - # nirsite https://github.com/mne-tools/mne-testing-data/pull/51 - # step 4 figure 2 - allowed_distance_error = 0.0002 - distances = source_detector_distances(raw.info) - assert_allclose(distances[::2], [ - 0.0304, 0.0078, 0.0310, 0.0086, 0.0416, - 0.0072, 0.0389, 0.0075, 0.0558, 0.0562, - 0.0561, 0.0565, 0.0077], atol=allowed_distance_error) - - # Test which channels are short - # These are the ones marked as red at - # https://github.com/mne-tools/mne-testing-data/pull/51 step 4 figure 2 - is_short = short_channels(raw.info) - assert_array_equal(is_short[:9:2], [False, True, False, True, False]) - is_short = short_channels(raw.info, threshold=0.003) - assert_array_equal(is_short[:3:2], [False, False]) - is_short = short_channels(raw.info, threshold=50) - assert_array_equal(is_short[:3:2], [True, True]) - - # Test trigger events - assert_array_equal(raw.annotations.description, ['3.0', '2.0', '1.0']) - - # Test location of detectors - # The locations of detectors can be seen in the first - # figure on this page... - # https://github.com/mne-tools/mne-testing-data/pull/51 - # And have been manually copied below - # These values were reported in mm, but according to this page... - # https://mne.tools/stable/auto_tutorials/intro/plot_40_sensor_locations.html - # 3d locations should be specified in meters, so that's what's tested below - # Detector locations are stored in the third three loc values - allowed_dist_error = 0.0002 - locs = [ch['loc'][6:9] for ch in raw.info['chs']] - head_mri_t, _ = _get_trans('fsaverage', 'head', 'mri') - mni_locs = apply_trans(head_mri_t, locs) - - assert raw.info['ch_names'][0][3:5] == 'D1' - assert_allclose( - mni_locs[0], [-0.0841, -0.0464, -0.0129], atol=allowed_dist_error) - - assert raw.info['ch_names'][4][3:5] == 'D3' - assert_allclose( - mni_locs[4], [0.0846, -0.0142, -0.0156], atol=allowed_dist_error) - - assert raw.info['ch_names'][8][3:5] == 'D2' - assert_allclose( - mni_locs[8], [0.0207, -0.1062, 0.0484], atol=allowed_dist_error) - - assert raw.info['ch_names'][12][3:5] == 'D4' - assert_allclose( - mni_locs[12], [-0.0196, 0.0821, 0.0275], atol=allowed_dist_error) - - assert raw.info['ch_names'][16][3:5] == 'D5' - assert_allclose( - mni_locs[16], [-0.0360, 0.0276, 0.0778], atol=allowed_dist_error) - - assert raw.info['ch_names'][19][3:5] == 'D6' - assert_allclose( - mni_locs[19], [0.0352, 0.0283, 0.0780], atol=allowed_dist_error) - - assert raw.info['ch_names'][21][3:5] == 'D7' - assert_allclose( - mni_locs[21], [0.0388, -0.0477, 0.0932], atol=allowed_dist_error) +# Get channel indices for our two montages. +mtg_a = [raw_intensity_ac.ch_names[i_index] for i_index, i_label + in enumerate(raw_intensity_ac.info['ch_names']) + if re.search(r'_D[1-8] ', i_label)] +mtg_b = [raw_intensity_ac.ch_names[i_index] for i_index, i_label + in enumerate(raw_intensity_ac.info['ch_names']) + if re.search(r'_D(9|1[0-6]) ', i_label)] +mtg_list = [mtg_a, mtg_b] -@requires_testing_data -def test_encoding(tmpdir): - """Test NIRx encoding.""" - fname = str(tmpdir.join('latin')) - shutil.copytree(fname_nirx_15_2, fname) - hdr_fname = op.join(fname, 'NIRS-2019-10-02_003.hdr') - hdr = list() - with open(hdr_fname, 'rb') as fid: - hdr.extend(line for line in fid) - hdr[2] = b'Date="jeu. 13 f\xe9vr. 2020"\r\n' - with open(hdr_fname, 'wb') as fid: - for line in hdr: - fid.write(line) - # smoke test - read_raw_nirx(fname) +# Determine to which decimal place we will compare. +thresh = 1e-10 +allowed_dist_error = 0.000001 @requires_testing_data -def test_nirx_15_2(): - """Test reading NIRX files.""" - raw = read_raw_nirx(fname_nirx_15_2, preload=True) - - # Test data import - assert raw._data.shape == (64, 67) - assert raw.info['sfreq'] == 3.90625 - - # Test channel naming - assert raw.info['ch_names'][:4] == ["S1_D1 760", "S1_D1 850", - "S1_D10 760", "S1_D10 850"] +@pytest.mark.parametrize('datatype,data', [('ac', raw_intensity_ac), + ('dc', raw_intensity_dc), + ('ph', raw_intensity_ph)]) +def test_boxy_load(datatype, data): + assert data._data.shape == (162, 376) + assert data.info['sfreq'] == 62.5 + + # Check channel names for montages and markers + assert data.info['ch_names'][:4] == ["S1_D1 690", "S1_D1 830", + "S2_D1 690", "S2_D1 830"] + assert data.info['ch_names'][80:84] == ["S6_D9 690", "S6_D9 830", + "S7_D9 690", "S7_D9 830"] + assert data.info['ch_names'][160:162] == ["Markers a", "Markers b"] + + # Check wavelengths for montages + assert data.info['chs'][0]['loc'][9] == 690 + assert data.info['chs'][1]['loc'][9] == 830 + assert data.info['chs'][2]['loc'][9] == 690 + assert data.info['chs'][3]['loc'][9] == 830 + assert data.info['chs'][80]['loc'][9] == 690 + assert data.info['chs'][81]['loc'][9] == 830 + assert data.info['chs'][82]['loc'][9] == 690 + assert data.info['chs'][83]['loc'][9] == 830 + + # Check our markers + all_events = mne.find_events(data, stim_channel=['Markers a']) + assert np.unique(all_events[:, 2]).tolist() == [1, 2, 1000, 2000] + all_events = mne.find_events(data, stim_channel=['Markers b']) + assert np.unique(all_events[:, 2]).tolist() == [1, 2, 1000, 2000] + + # Check location of sources and detectors + # We'll check the first two unique sources and detectors for each montage + # These values were taken from the .elp file + fiducials = [[0.912775E-01, 0, 0], + [0.599716E-03, 0.784103E-01, -0.231296E-17], + [-0.606755E-02, -0.709034E-01, 0]] + + native_head_t = get_ras_to_neuromag_trans(fiducials[0], + fiducials[1], + fiducials[2]) + + # Sources + assert_allclose(data.info['chs'][0]['loc'][3:6], + apply_trans(native_head_t, + [-0.818852E-01, -0.464419E-01, 0.880970E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][2]['loc'][3:6], + apply_trans(native_head_t, + [-0.898024E-01, -0.456557E-01, 0.600431E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][80]['loc'][3:6], + apply_trans(native_head_t, + [-0.878098E-01, -0.348737E-01, 0.907238E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][82]['loc'][3:6], + apply_trans(native_head_t, + [-0.971674E-01, -0.360763E-01, 0.599644E-01]), + atol=allowed_dist_error) + + # Detectors + assert_allclose(data.info['chs'][0]['loc'][6:9], + apply_trans(native_head_t, + [-0.966161E-01, 0.338437E-01, 0.558559E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][10]['loc'][6:9], + apply_trans(native_head_t, + [-0.965480E-01, 0.295639E-01, 0.802247E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][80]['loc'][6:9], + apply_trans(native_head_t, + [-0.958327E-01, 0.451337E-01, 0.541672E-01]), + atol=allowed_dist_error) + + assert_allclose(data.info['chs'][90]['loc'][6:9], + apply_trans(native_head_t, + [-0.932942E-01, 0.408094E-01, 0.775681E-01]), + atol=allowed_dist_error) + + # Check channel types + chan_type = dict(ac='302 (FIFFV_COIL_FNIRS_CW_AMPLITUDE)', + dc='302 (FIFFV_COIL_FNIRS_CW_AMPLITUDE)', + ph='305 (FIFFV_COIL_FNIRS_FD_PHASE)') + + assert str(data.info['chs'][0]['kind']) == '1100 (FIFFV_FNIRS_CH)' + assert str(data.info['chs'][0]['coil_type']) == chan_type[datatype] + + assert str(data.info['chs'][160]['kind']) == '3 (FIFFV_STIM_CH)' + assert str(data.info['chs'][160]['coil_type']) == '0 (FIFFV_COIL_NONE)' + + assert str(data.info['chs'][161]['kind']) == '3 (FIFFV_STIM_CH)' + assert str(data.info['chs'][161]['coil_type']) == '0 (FIFFV_COIL_NONE)' - # Test info import - assert raw.info['subject_info'] == dict(sex=1, first_name="TestRecording") - # Test trigger events - assert_array_equal(raw.annotations.description, ['4.0', '6.0', '2.0']) - - # Test location of detectors - allowed_dist_error = 0.0002 - locs = [ch['loc'][6:9] for ch in raw.info['chs']] - head_mri_t, _ = _get_trans('fsaverage', 'head', 'mri') - mni_locs = apply_trans(head_mri_t, locs) - - assert raw.info['ch_names'][0][3:5] == 'D1' - assert_allclose( - mni_locs[0], [-0.0292, 0.0852, -0.0142], atol=allowed_dist_error) - - assert raw.info['ch_names'][15][3:5] == 'D4' - assert_allclose( - mni_locs[15], [-0.0739, -0.0756, -0.0075], atol=allowed_dist_error) +@requires_testing_data +@pytest.mark.parametrize('datatype,data', [('ac', raw_intensity_ac), + ('dc', raw_intensity_dc), + ('ph', raw_intensity_ph)]) +def test_boxy_raw(datatype, data): + for m_num, i_mtg in enumerate(['a', 'b']): + for b_num, i_blk in enumerate(['001', '002']): + + # Load our p_pod files. + filename = os.path.join(data_path(download=False), 'BOXY', + 'boxy_short_recording', 'boxy_p_pod_files', + '1anc071' + i_mtg + '.' + i_blk + + 'raw_data.mat') + ppod_raw = spio.loadmat(filename) + + # Grab the appropriate python data based on which block we're on. + data_len = int((np.shape(raw_intensity_ac._data)[1])/2) + py_data = np.transpose(data.copy().pick(mtg_list[m_num]). + _data[:, data_len * b_num:data_len * + (b_num + 1)]) + + # Swap channels back to original. + for i_chan in range(0, np.size(py_data, axis=1), 2): + py_data[:, [i_chan, i_chan + 1]] = (py_data[:, + [i_chan + 1, i_chan]]) + + # Compare our raw data between p_pod and python. + assert (abs(ppod_raw[datatype] - py_data) <= thresh).all() @requires_testing_data -def test_nirx_15_0(): - """Test reading NIRX files.""" - raw = read_raw_nirx(fname_nirx_15_0, preload=True) - - # Test data import - assert raw._data.shape == (20, 92) - assert raw.info['sfreq'] == 6.25 - - # Test channel naming - assert raw.info['ch_names'][:12] == ["S1_D1 760", "S1_D1 850", - "S2_D2 760", "S2_D2 850", - "S3_D3 760", "S3_D3 850", - "S4_D4 760", "S4_D4 850", - "S5_D5 760", "S5_D5 850", - "S6_D6 760", "S6_D6 850"] - - # Test info import - assert raw.info['subject_info'] == {'first_name': 'NIRX', - 'last_name': 'Test', 'sex': '0'} - - # Test trigger events - assert_array_equal(raw.annotations.description, ['1.0', '2.0', '2.0']) - - # Test location of detectors - allowed_dist_error = 0.0002 - locs = [ch['loc'][6:9] for ch in raw.info['chs']] - head_mri_t, _ = _get_trans('fsaverage', 'head', 'mri') - mni_locs = apply_trans(head_mri_t, locs) - - assert raw.info['ch_names'][0][3:5] == 'D1' - assert_allclose( - mni_locs[0], [0.0287, -0.1143, -0.0332], atol=allowed_dist_error) - - assert raw.info['ch_names'][15][3:5] == 'D8' - assert_allclose( - mni_locs[15], [-0.0693, -0.0480, 0.0657], atol=allowed_dist_error) - - # Test distance between optodes matches values from - allowed_distance_error = 0.0002 - distances = source_detector_distances(raw.info) - assert_allclose(distances[::2], [ - 0.0301, 0.0315, 0.0343, 0.0368, 0.0408, - 0.0399, 0.0393, 0.0367, 0.0336, 0.0447], atol=allowed_distance_error) +@pytest.mark.parametrize('datatype,data', [('ac', raw_intensity_ac), + ('dc', raw_intensity_dc), + ('ph', raw_intensity_ph)]) +def test_boxy_epochs(datatype, data): + # Montage A Events. + mtg_a_events = (mne.find_events(raw_intensity_ac.copy().pick( + mtg_a + ['Markers a']), stim_channel=['Markers a'])) + + mtg_a_event_dict = {'Montage_A/Event_1': 1, + 'Montage_A/Event_2': 2} + + # Montage B Events. + mtg_b_events = (mne.find_events(raw_intensity_ac.copy().pick( + mtg_b + ['Markers b']), stim_channel=['Markers b'])) + + mtg_b_event_dict = {'Montage_B/Event_1': 1, + 'Montage_B/Event_2': 2} + + reject_criteria = None + tmin, tmax = -0.032, 0.208 + + # BOXY epochs. + epochs_a = mne.Epochs(data.copy().pick(mtg_a), mtg_a_events, + event_id=mtg_a_event_dict, tmin=tmin, + tmax=tmax, reject=reject_criteria, + reject_by_annotation=False, proj=True, + baseline=None, preload=True, detrend=None, + verbose=True, event_repeated='drop') + + epochs_b = mne.Epochs(data.copy().pick(mtg_b), mtg_b_events, + event_id=mtg_b_event_dict, tmin=tmin, + tmax=tmax, reject=reject_criteria, + reject_by_annotation=False, proj=True, + baseline=None, preload=True, detrend=None, + verbose=True, event_repeated='drop') + + # Swap channels back to original. + for i_epoch in range(np.size(epochs_a, axis=0)): + for i_chan in range(0, np.size(epochs_a, axis=1), 2): + epochs_a._data[i_epoch, [i_chan, i_chan + 1], :] =\ + epochs_a._data[i_epoch, [i_chan + 1, i_chan], :] + + for i_epoch in range(np.size(epochs_b, axis=0)): + for i_chan in range(0, np.size(epochs_b, axis=1), 2): + epochs_b._data[i_epoch, [i_chan, i_chan + 1], :] =\ + epochs_b._data[i_epoch, [i_chan + 1, i_chan], :] + + # Now compare epochs. + epoch_dict = dict(a=epochs_a, + b=epochs_b) + + for m_num, i_mtg in enumerate(['a', 'b']): + + # Load our p_pod files, both blocks. + filename = os.path.join(data_path(download=False), 'BOXY', + 'boxy_short_recording', 'boxy_p_pod_files', + '1anc071' + i_mtg + '.001' + + 'epoch_data.mat') + ppod_epoch1 = spio.loadmat(filename) + + filename = os.path.join(data_path(download=False), 'BOXY', + 'boxy_short_recording', 'boxy_p_pod_files', + '1anc071' + i_mtg + '.002' + + 'epoch_data.mat') + ppod_epoch2 = spio.loadmat(filename) + + # Create empty array to store our combined blocks. + epoch_num = (len(ppod_epoch1[datatype + '_epochs']) + + len(ppod_epoch2[datatype + '_epochs'])) + ppod_epochs = np.zeros((epoch_num, 80, 16)) + + # Loop through both blocks. + for epoch_num, i_epoch in enumerate(ppod_epoch1[datatype + '_epochs']): + ppod_epochs[epoch_num, :, :] = np.transpose(i_epoch[0]) + + last_blk = epoch_num + for epoch_num, i_epoch in enumerate(ppod_epoch2[datatype + '_epochs']): + ppod_epochs[epoch_num + last_blk + 1, :, :] =\ + np.transpose(i_epoch[0]) + + py_epochs = epoch_dict[i_mtg] + + # Compare our epochs between p_pod and python. + assert (abs(ppod_epochs - py_epochs) <= thresh).all() @requires_testing_data -@pytest.mark.parametrize('fname, boundary_decimal', ( - [fname_nirx_15_2_short, 1], - [fname_nirx_15_2, 0], - [fname_nirx_15_0, 0] -)) -def test_nirx_standard(fname, boundary_decimal): - """Test standard operations.""" - _test_raw_reader(read_raw_nirx, fname=fname, - boundary_decimal=boundary_decimal) # low fs - - -run_tests_if_main() +@pytest.mark.parametrize('datatype,data', [('ac', raw_intensity_ac), + ('dc', raw_intensity_dc), + ('ph', raw_intensity_ph)]) +def test_boxy_evoked(datatype, data): + # Montage A Events. + mtg_a_events = (mne.find_events(raw_intensity_ac.copy().pick( + mtg_a + ['Markers a']), stim_channel=['Markers a'])) + + mtg_a_event_dict = {'Montage_A/Event_1': 1, + 'Montage_A/Event_2': 2} + + # Montage B Events. + mtg_b_events = (mne.find_events(raw_intensity_ac.copy().pick( + mtg_b + ['Markers b']), stim_channel=['Markers b'])) + + mtg_b_event_dict = {'Montage_B/Event_1': 1, + 'Montage_B/Event_2': 2} + + reject_criteria = None + tmin, tmax = -0.032, 0.208 + + epochs_a = mne.Epochs(data.copy().pick(mtg_a), mtg_a_events, + event_id=mtg_a_event_dict, tmin=tmin, + tmax=tmax, reject=reject_criteria, + reject_by_annotation=False, proj=True, + baseline=None, preload=True, detrend=None, + verbose=True, event_repeated='drop') + + epochs_b = mne.Epochs(data.copy().pick(mtg_b), mtg_b_events, + event_id=mtg_b_event_dict, tmin=tmin, + tmax=tmax, reject=reject_criteria, + reject_by_annotation=False, proj=True, + baseline=None, preload=True, detrend=None, + verbose=True, event_repeated='drop') + + # Swap channels back to original. + for i_epoch in range(np.size(epochs_a, axis=0)): + for i_chan in range(0, np.size(epochs_a, axis=1), 2): + epochs_a._data[i_epoch, [i_chan, i_chan + 1], :] =\ + epochs_a._data[i_epoch, [i_chan + 1, i_chan], :] + + for i_epoch in range(np.size(epochs_b, axis=0)): + for i_chan in range(0, np.size(epochs_b, axis=1), 2): + epochs_b._data[i_epoch, [i_chan, i_chan + 1], :] =\ + epochs_b._data[i_epoch, [i_chan + 1, i_chan], :] + + evoked_a1 = epochs_a['Montage_A/Event_1'].average() + evoked_a2 = epochs_a['Montage_A/Event_2'].average() + + evoked_b1 = epochs_b['Montage_B/Event_1'].average() + evoked_b2 = epochs_b['Montage_B/Event_2'].average() + + evoke_dict = dict(a1=evoked_a1, + a2=evoked_a2, + b1=evoked_b1, + b2=evoked_b2) + + # Loop through our montages. + for m_num, i_mtg in enumerate(['a', 'b']): + + # Load in our p_pod files. + filename = os.path.join(data_path(download=False), 'BOXY', + 'boxy_short_recording', 'boxy_p_pod_files', + '1anc071' + i_mtg + '.002' + + 'evoke_data.mat') + ppod_evoke_all = spio.loadmat(filename) + + # Just get the relevant p_pod data. + ppod_evoke = ppod_evoke_all['a' + datatype].swapaxes(0, 2) + + # Grab the relevant python data, and combine blocks. + py_evoke = np.zeros(np.shape(ppod_evoke)) + py_evoke[0, :, :] = evoke_dict[i_mtg + "1"]._data + py_evoke[1, :, :] = evoke_dict[i_mtg + "2"]._data + + # Compare p_pod and python evoked. + assert (abs(ppod_evoke - py_evoke) <= thresh).all()