From b4a90288f77ba3583deac8eb29c8db34db7b3d72 Mon Sep 17 00:00:00 2001 From: Robert Guggenberger Date: Tue, 4 Dec 2018 14:25:40 +0100 Subject: [PATCH 01/12] Implement sync_timestamps Allows to resample (using interpolation or shifting) all streams to have their timestamps sample-wise in sync with the fastest stream --- Python/pyxdf/pyxdf.py | 89 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 57cdc2e..bb7be2e 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -12,7 +12,7 @@ import xml.etree.ElementTree as ET from collections import OrderedDict, defaultdict import logging - +from scipy.interpolate import interp1d import numpy as np __all__ = ['load_xdf'] @@ -26,6 +26,7 @@ def load_xdf(filename, synchronize_clocks=True, handle_clock_resets=True, dejitter_timestamps=True, + sync_timestamps=False, jitter_break_threshold_seconds=1, jitter_break_threshold_samples=500, clock_reset_threshold_seconds=5, @@ -55,6 +56,16 @@ def load_xdf(filename, dejitter_timestamps : Whether to perform jitter removal for regularly sampled streams. (default: true) + sync_timestamps: {bool str} + sync timestamps of all streams sample-wise with the stream to the + highest effective sampling rate + + False -> no syncing + True -> linear syncing + str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, + ‘previous’, ‘next’> for method inherited from + scipy.interpolate.interp1d + on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify the data (for example, sub-sample it). The four input arguments @@ -372,7 +383,14 @@ def __init__(self, xml): stream['info']['effective_srate'] = tmp.effective_srate stream['time_series'] = tmp.time_series stream['time_stamps'] = tmp.time_stamps - + + # sync sampling with the fastest timeseries by interpolation / shifting + if sync_timestamps: + if type(sync_timestamps) is not str: + sync_timestamps = 'linear' + logger.warning('sync_timestamps defaults to "linear"') + streams = _sync_timestamps(streams, kind=sync_timestamps) + streams = [s for s in streams.values()] sort_data = [s['info']['name'][0] for s in streams] streams = [x for _, x in sorted(zip(sort_data, streams))] @@ -553,6 +571,73 @@ def _jitter_removal(streams, return streams +def _sync_timestamps(streams, kind='linear'): + srate_key = 'effective_srate' + + snames = [stream['info']['source_id'] for stream in streams.values()] + stamps = [stream['time_stamps'] for stream in streams.values()] + srates = [stream['info'][srate_key] for stream in streams.values()] + max_fs = max(srates, default=0) + + if max_fs == 0: #either no valid stream or all streams are async + return streams + if srates.count(max_fs) > 1: + # highly unlikely, with floating point precision and sampling noise + # but anyways: better safe than sorry + logger.warning('I found at least two streams with identical effective ' + 'srate. I select one at random for syncing timestamps.') + + fastest_stream = snames[srates.index(max_fs)] + new_timestamps = stamps[srates.index(max_fs)] + + for stream in streams.values(): + channel_format = stream['info']['channel_format'][0] + + if stream['info']['source_id'] == fastest_stream: + # no need to interpolate the fastest stream + continue + if ( (channel_format == 'string') and + (stream['info'][srate_key]== 0) ): + # you can't really interpolate strings; and streams with srate=0 + # don't have a real sampling rate. One approach to sync them is to + # shift their events to the nearest timestamp of the fastest + # timeseries + shifted_x = [] + for x in stream['time_stamps']: + argmin = (abs(new_timestamps-x)).argmin() + shifted_x.append(new_timestamps[argmin]) + + shifted_y = [] + for x in new_timestamps: + try: + idx = shifted_x.index(x) + y = stream['time_series'][idx] + shifted_y.append(y) + except ValueError: + shifted_y.append(['']) + + stream['time_series'] = np.asanyarray((shifted_y)) + stream['time_stamps'] = new_timestamps + + elif channel_format == 'float32': + #a float32 can be interpolated with interp1d + # bounds_error=False replaces everything outside of interpolation + # zone with NaNs + y = stream['time_series'] + x = stream['time_stamps'] + f = interp1d(x, y, kind=kind, axis=0, + assume_sorted=True, #speed up + bounds_error=False) + + stream['time_series'] = f(new_timestamps) + stream['time_stamps'] = new_timestamps + stream['info']['effective_srate'] = max_fs + else: + raise NotImplementedError("Don't know how to sync sampling for " + + 'channel_format={}'.format( + channel_format)) + return streams + # noinspection PyTypeChecker def _robust_fit(A, y, rho=1, iters=1000): """Perform a robust linear regression using the Huber loss function. From 860f44c8b07c278b60fed5cbe91dc377734107e1 Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 13 Dec 2018 10:17:37 +0100 Subject: [PATCH 02/12] Extrapolate timestamps for sync The timestamps of the fastest sampled stream are extrapolated to include the earliert and latest timestamp of any stream --- Python/pyxdf/pyxdf.py | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index bb7be2e..1216c24 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -14,6 +14,7 @@ import logging from scipy.interpolate import interp1d import numpy as np +from collections import deque __all__ = ['load_xdf'] __version__ = '1.14.0' @@ -383,7 +384,7 @@ def __init__(self, xml): stream['info']['effective_srate'] = tmp.effective_srate stream['time_series'] = tmp.time_series stream['time_stamps'] = tmp.time_stamps - + # sync sampling with the fastest timeseries by interpolation / shifting if sync_timestamps: if type(sync_timestamps) is not str: @@ -572,10 +573,9 @@ def _jitter_removal(streams, def _sync_timestamps(streams, kind='linear'): - srate_key = 'effective_srate' - snames = [stream['info']['source_id'] for stream in streams.values()] - stamps = [stream['time_stamps'] for stream in streams.values()] + # selecting the stream with the highest effective sampling rate + srate_key = 'effective_srate' srates = [stream['info'][srate_key] for stream in streams.values()] max_fs = max(srates, default=0) @@ -586,22 +586,35 @@ def _sync_timestamps(streams, kind='linear'): # but anyways: better safe than sorry logger.warning('I found at least two streams with identical effective ' 'srate. I select one at random for syncing timestamps.') - - fastest_stream = snames[srates.index(max_fs)] - new_timestamps = stamps[srates.index(max_fs)] - + + # selecting maximal time range of the whole recording + # streams with fs=0 might are not dejittered be default, and therefore + # indexing first and last might miss the earliest/latest + # we therefore take the min and max timestamp + stamps = [stream['time_stamps'] for stream in streams.values()] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + + # generate new timestamps + # based on extrapolation of the fastest timestamps towards the maximal + # time range of the whole recording + new_timestamps = deque(stamps[srates.index(max_fs)]) + fs_step = 1.0/max_fs + while new_timestamps[0] > ts_first: + new_timestamps.appendleft(new_timestamps[0]-fs_step) + while new_timestamps[-1] < ts_last: + new_timestamps.append(new_timestamps[-1]+fs_step) + + # interpolate or shift all streams to the new timestamps for stream in streams.values(): channel_format = stream['info']['channel_format'][0] - if stream['info']['source_id'] == fastest_stream: - # no need to interpolate the fastest stream - continue if ( (channel_format == 'string') and (stream['info'][srate_key]== 0) ): # you can't really interpolate strings; and streams with srate=0 # don't have a real sampling rate. One approach to sync them is to - # shift their events to the nearest timestamp of the fastest - # timeseries + # shift their events to the nearest timestamp of the new + # timestamps shifted_x = [] for x in stream['time_stamps']: argmin = (abs(new_timestamps-x)).argmin() From ead7568433116f970e4f820d7258f3e9211e2df4 Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 13 Dec 2018 11:16:37 +0100 Subject: [PATCH 03/12] Implement limit to overlap Additional argument which either limits all streams to overlapping recordings, or not --- Python/pyxdf/pyxdf.py | 56 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 1216c24..6e5fd6d 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -28,6 +28,7 @@ def load_xdf(filename, handle_clock_resets=True, dejitter_timestamps=True, sync_timestamps=False, + overlap_timestamps=False, jitter_break_threshold_seconds=1, jitter_break_threshold_samples=500, clock_reset_threshold_seconds=5, @@ -65,8 +66,16 @@ def load_xdf(filename, True -> linear syncing str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’> for method inherited from - scipy.interpolate.interp1d - + scipy.interpolate.interp1d + + overlap_timestamps: bool + If true, return only overlapping streams, i.e. all streams + are limited to periods where all streams have data. (default: True) + If false, depends on whether sync_timestamps is set. If set, it + expands all streams to include the earliest and latest timestamp of + any stream, if not set it simply return streams independently. + + on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify the data (for example, sub-sample it). The four input arguments @@ -358,7 +367,8 @@ def __init__(self, xml): else: stream.time_series = np.zeros((stream.nchns, 0)) - # perform (fault-tolerant) clock synchronization if requested + + # perform (fault-tolerant) clock synchronization if requested if synchronize_clocks: logger.info(' performing clock synchronization...') temp = _clock_sync(temp, handle_clock_resets, @@ -385,6 +395,8 @@ def __init__(self, xml): stream['time_series'] = tmp.time_series stream['time_stamps'] = tmp.time_stamps + + return streams, fileheader # sync sampling with the fastest timeseries by interpolation / shifting if sync_timestamps: if type(sync_timestamps) is not str: @@ -392,6 +404,12 @@ def __init__(self, xml): logger.warning('sync_timestamps defaults to "linear"') streams = _sync_timestamps(streams, kind=sync_timestamps) + + # limit streams to their overlapping periods + if overlap_timestamps: + streams = _limit_streams_to_overlap(streams) + + streams = [s for s in streams.values()] sort_data = [s['info']['name'][0] for s in streams] streams = [x for _, x in sorted(zip(sort_data, streams))] @@ -604,7 +622,8 @@ def _sync_timestamps(streams, kind='linear'): new_timestamps.appendleft(new_timestamps[0]-fs_step) while new_timestamps[-1] < ts_last: new_timestamps.append(new_timestamps[-1]+fs_step) - + + new_timestamps = np.asanyarray(new_timestamps) # interpolate or shift all streams to the new timestamps for stream in streams.values(): channel_format = stream['info']['channel_format'][0] @@ -651,6 +670,35 @@ def _sync_timestamps(streams, kind='linear'): channel_format)) return streams + +def _limit_streams_to_overlap(streams): + + ts_first, ts_last = [], [] + for stream in streams.values(): + # skip streams with fs=0, because they might just not have + # send anything on purpose (e.g. markers) - while data is already + # being recorded. wouldNät make sense to throw that away + if stream['info']['effective_srate'] !=0: + # extrapolation in _sync_timestamps is done with NaNs + not_extrapolated = np.where(~np.isnan(stream['time_series']))[0] + ts_first.append(min(stream['time_stamps'][not_extrapolated])) + ts_last.append(max(stream['time_stamps'][not_extrapolated])) + + ts_first = max(ts_first) + ts_last = min(ts_last) + + for stream in streams.values(): + a = np.where(stream['time_stamps']>=ts_first)[0] + b = np.where(stream['time_stamps']<=ts_last)[0] + select = np.intersect1d(a,b) + stream['time_stamps'] = stream['time_stamps'][select] + if type(stream['time_series']) is list: + stream['time_series'] = [stream['time_series'][s] for s in select] + else: + stream['time_series'] = stream['time_series'][select] + + return streams + # noinspection PyTypeChecker def _robust_fit(A, y, rho=1, iters=1000): """Perform a robust linear regression using the Huber loss function. From 9a8692a73505500b5bf18d54f8e238fc81a77349 Mon Sep 17 00:00:00 2001 From: agricolab Date: Sun, 16 Dec 2018 21:54:50 +0100 Subject: [PATCH 04/12] Write test for _sync_timestamps Using pytest Fix bug in pyxdf where i forgot to remove a return statement used for develop,ment --- Python/pyxdf/pyxdf.py | 20 ++--- .../test/test_limit_streams_to_overlap.py | 4 + Python/pyxdf/test/test_sync_timestamps.py | 78 +++++++++++++++++++ 3 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 Python/pyxdf/test/test_limit_streams_to_overlap.py create mode 100644 Python/pyxdf/test/test_sync_timestamps.py diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 6e5fd6d..fdf48e0 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -14,7 +14,6 @@ import logging from scipy.interpolate import interp1d import numpy as np -from collections import deque __all__ = ['load_xdf'] __version__ = '1.14.0' @@ -396,7 +395,6 @@ def __init__(self, xml): stream['time_stamps'] = tmp.time_stamps - return streams, fileheader # sync sampling with the fastest timeseries by interpolation / shifting if sync_timestamps: if type(sync_timestamps) is not str: @@ -616,14 +614,18 @@ def _sync_timestamps(streams, kind='linear'): # generate new timestamps # based on extrapolation of the fastest timestamps towards the maximal # time range of the whole recording - new_timestamps = deque(stamps[srates.index(max_fs)]) fs_step = 1.0/max_fs - while new_timestamps[0] > ts_first: - new_timestamps.appendleft(new_timestamps[0]-fs_step) - while new_timestamps[-1] < ts_last: - new_timestamps.append(new_timestamps[-1]+fs_step) + new_timestamps = stamps[srates.index(max_fs)] + num_steps = int((new_timestamps[0]-ts_first)/fs_step) + 1 + front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) + num_steps = int((ts_last-new_timestamps[-1])/fs_step) + 1 + end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) + + new_timestamps = np.concatenate((front_stamps, + new_timestamps[1:-1], + end_stamps), + axis=0) - new_timestamps = np.asanyarray(new_timestamps) # interpolate or shift all streams to the new timestamps for stream in streams.values(): channel_format = stream['info']['channel_format'][0] @@ -644,7 +646,7 @@ def _sync_timestamps(streams, kind='linear'): try: idx = shifted_x.index(x) y = stream['time_series'][idx] - shifted_y.append(y) + shifted_y.append([y]) except ValueError: shifted_y.append(['']) diff --git a/Python/pyxdf/test/test_limit_streams_to_overlap.py b/Python/pyxdf/test/test_limit_streams_to_overlap.py new file mode 100644 index 0000000..6500c63 --- /dev/null +++ b/Python/pyxdf/test/test_limit_streams_to_overlap.py @@ -0,0 +1,4 @@ +from pyxdf.pyxdf import _limit_streams_to_overlap + + + diff --git a/Python/pyxdf/test/test_sync_timestamps.py b/Python/pyxdf/test/test_sync_timestamps.py new file mode 100644 index 0000000..595af6f --- /dev/null +++ b/Python/pyxdf/test/test_sync_timestamps.py @@ -0,0 +1,78 @@ +from pyxdf.pyxdf import _sync_timestamps +import numpy as np +from copy import deepcopy +import pytest +# %% +@pytest.fixture(scope='session') +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.1,1.1,251), + np.linspace(2,1,251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + return streams + +@pytest.fixture(scope='session') +def synced(streams): + synced = _sync_timestamps(deepcopy(streams)) + return synced + +#%% test +def test_earliest(synced): + 'should expand to the earliest timestamp, which is 0.1 from stream 2' + for stream in synced.values(): + assert np.isclose(stream['time_stamps'][0], 0.1) + +def test_lastest(synced): + 'should expand to the latest timestamp, which is 2.5 from stream 3' + for stream in synced.values(): + assert np.isclose(stream['time_stamps'][-1], 2.5) + +def test_identical_steps(synced): + for stream in synced.values(): + 'all steos between samples should be identical within float precision' + uq = np.diff(stream['time_stamps']) + assert np.all([np.isclose(u, uq) for u in uq]) + +def test_markerstream(synced, streams): + '''check markers, they should be identical to the original but the second, + which should be shifted towards the next valid sample of the fastest + stream''' + idx = [] + for marker in streams[3]['time_series']: + idx.append(synced[3]['time_series'].tolist().index([marker])) + assert np.all(np.equal(synced[3]['time_stamps'][idx], + np.array([0.2 , 1.107, 1.2 , 1.9 , 2.5 ]))) + +def test_interpolation(synced): + '''the linearly to 1000 Hz interpolated data from stream 2 should + be identical with a linspace with 1001 samples''' + idx = np.where(~np.isnan(synced[2]['time_series']))[0] + assert np.all(np.isclose(synced[2]['time_series'][idx], + np.linspace(2, 1, 1001))) + +def test_identity_after_interpolation(synced, streams): + 'the data for stream 1 should be identical to original' + idx = np.where(~np.isnan(synced[1]['time_series']))[0] + assert np.all(np.isclose(synced[1]['time_series'][idx], + streams[1]['time_series'])) From ed8fe6dfa5e792b5d984cf3f089058e875a92c75 Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 17 Dec 2018 22:18:09 +0100 Subject: [PATCH 05/12] Write test for _limit_to_overlap for streams with unsynced timestamps Fix a bug caused by floating point unprecision --- Python/pyxdf/pyxdf.py | 13 ++- .../test/test_limit_streams_to_overlap.py | 81 ++++++++++++++++++- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index fdf48e0..b147b7f 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -688,12 +688,17 @@ def _limit_streams_to_overlap(streams): ts_first = max(ts_first) ts_last = min(ts_last) - for stream in streams.values(): - a = np.where(stream['time_stamps']>=ts_first)[0] - b = np.where(stream['time_stamps']<=ts_last)[0] + # use np.around to prevent floating point hickups + around = np.around(stream['time_stamps'], 15) + a = np.where(around>=ts_first)[0] + b = np.where(around<=ts_last)[0] select = np.intersect1d(a,b) - stream['time_stamps'] = stream['time_stamps'][select] + if type(stream['time_stamps']) is list: + stream['time_stamps'] = [stream['time_stamps'][s] for s in select] + else: + stream['time_stamps'] = stream['time_stamps'][select] + if type(stream['time_series']) is list: stream['time_series'] = [stream['time_series'][s] for s in select] else: diff --git a/Python/pyxdf/test/test_limit_streams_to_overlap.py b/Python/pyxdf/test/test_limit_streams_to_overlap.py index 6500c63..4d84916 100644 --- a/Python/pyxdf/test/test_limit_streams_to_overlap.py +++ b/Python/pyxdf/test/test_limit_streams_to_overlap.py @@ -1,4 +1,83 @@ from pyxdf.pyxdf import _limit_streams_to_overlap +import numpy as np +# %% +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.4,1.4,251), + np.linspace(0.4,1.4,251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + return streams +# %% test + +def test_earliest_marker_stamp(): + 'check validity of first element in marker stream' + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[3]['time_stamps'][0], 1.1071) + +def test_latest_marker_stamp(): + 'check validity of first element in marker stream' + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[3]['time_stamps'][-1], 1.2) - +def test_earliest_marker_series(): + olap = _limit_streams_to_overlap(streams()) + assert olap[3]['time_series'][0] == 'mark_1' + +def test_latest_marker_series(): + olap = _limit_streams_to_overlap(streams()) + assert olap[3]['time_series'][-1] == 'mark_2' + +def test_earliest_fast_stamp(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][0], 1.) + +def test_earliest_fast_series(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][0], 1.) + +def test_latest_fast_stamp(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][-1], 1.4) + +def test_latest_fast_series(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][-1], 1.4) + +def test_earliest_slow_stamp(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][0], 1.) + +def test_earliest_slow_series(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][0], 1.) + +def test_latest_slow_stamp(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][-1], 1.4) + +def test_latest_slow_series(): + olap = _limit_streams_to_overlap(streams()) + assert np.isclose(olap[1]['time_stamps'][-1], 1.4) + From f55ad95874e67bb8dee67e808ee6d3543b8142a0 Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 20 Dec 2018 18:33:13 +0100 Subject: [PATCH 06/12] Interpolate also integer channel_formats --- Python/pyxdf/pyxdf.py | 16 ++++++++++++++-- Python/pyxdf/test/test_sync_timestamps.py | 14 +++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index b147b7f..c6ff454 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -589,6 +589,9 @@ def _jitter_removal(streams, def _sync_timestamps(streams, kind='linear'): + '''syncs all streams to the fastest sampling rate by shifting or + upsampling + ''' # selecting the stream with the highest effective sampling rate srate_key = 'effective_srate' @@ -653,8 +656,10 @@ def _sync_timestamps(streams, kind='linear'): stream['time_series'] = np.asanyarray((shifted_y)) stream['time_stamps'] = new_timestamps - elif channel_format == 'float32': - #a float32 can be interpolated with interp1d + elif channel_format in ['float32', 'double64', 'int8', 'int16', + 'int32', 'int64']: + # continuous interpolation is possible using interp1d + # discrete interpolation requires some finetuning # bounds_error=False replaces everything outside of interpolation # zone with NaNs y = stream['time_series'] @@ -666,6 +671,13 @@ def _sync_timestamps(streams, kind='linear'): stream['time_series'] = f(new_timestamps) stream['time_stamps'] = new_timestamps stream['info']['effective_srate'] = max_fs + + if channel_format in ['int8', 'int16', 'int32', 'int64']: + # i am stuck with float64s, as integers have no nans + # therefore i round to the nearest integer instead + stream['time_series'] = np.around(stream['time_series'], 0) + + else: raise NotImplementedError("Don't know how to sync sampling for " + 'channel_format={}'.format( diff --git a/Python/pyxdf/test/test_sync_timestamps.py b/Python/pyxdf/test/test_sync_timestamps.py index 595af6f..5a3eb5d 100644 --- a/Python/pyxdf/test/test_sync_timestamps.py +++ b/Python/pyxdf/test/test_sync_timestamps.py @@ -30,6 +30,11 @@ def __init__(self, timestamps, timeseries, effective_srate, channel_format): streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], ['mark_' + str(n) for n in range(0,5,1)], 0, ['string']) + # integer datatype stream, + streams[4] = MockStream(np.linspace(0.1,1.1,251), + np.linspace(0, 250, 251, dtype='int32'), + 250, ['int32']) + return streams @pytest.fixture(scope='session') @@ -70,9 +75,16 @@ def test_interpolation(synced): idx = np.where(~np.isnan(synced[2]['time_series']))[0] assert np.all(np.isclose(synced[2]['time_series'][idx], np.linspace(2, 1, 1001))) - + + def test_identity_after_interpolation(synced, streams): 'the data for stream 1 should be identical to original' idx = np.where(~np.isnan(synced[1]['time_series']))[0] assert np.all(np.isclose(synced[1]['time_series'][idx], streams[1]['time_series'])) + +def test_integer_interpolation(synced, streams): + 'the data of stream 4 should be all integers' + u = np.unique(synced[4]['time_series']) + u = np.int64(np.compress(~np.isnan(u), u)) + assert np.all(streams[4]['time_series'] == u) From b66c9248cfdb20617fee44b1e86686603a408e6b Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 31 Dec 2018 14:46:20 +0100 Subject: [PATCH 07/12] Write tests for overlapping synced streams Fix bug with marker streams where sampling rate is not set after upsampling --- Python/pyxdf/pyxdf.py | 14 ++-- .../pyxdf/test/test_limit_on_synced_stamps.py | 71 +++++++++++++++++++ Python/pyxdf/test/test_sync_timestamps.py | 6 ++ 3 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 Python/pyxdf/test/test_limit_on_synced_stamps.py diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index c6ff454..f302b26 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -670,8 +670,7 @@ def _sync_timestamps(streams, kind='linear'): stream['time_series'] = f(new_timestamps) stream['time_stamps'] = new_timestamps - stream['info']['effective_srate'] = max_fs - + if channel_format in ['int8', 'int16', 'int32', 'int64']: # i am stuck with float64s, as integers have no nans # therefore i round to the nearest integer instead @@ -682,6 +681,8 @@ def _sync_timestamps(streams, kind='linear'): raise NotImplementedError("Don't know how to sync sampling for " + 'channel_format={}'.format( channel_format)) + stream['info']['effective_srate'] = max_fs + return streams @@ -689,10 +690,11 @@ def _limit_streams_to_overlap(streams): ts_first, ts_last = [], [] for stream in streams.values(): - # skip streams with fs=0, because they might just not have - # send anything on purpose (e.g. markers) - while data is already - # being recorded. wouldNät make sense to throw that away - if stream['info']['effective_srate'] !=0: + # skip streams with fs=0 or if they send strings, because they might + # just not yet have send anything on purpose (i.e. markers) + # while other data was already being recorded. + if (stream['info']['effective_srate'] !=0 and + stream['info']['channel_format'][0] != 'string'): # extrapolation in _sync_timestamps is done with NaNs not_extrapolated = np.where(~np.isnan(stream['time_series']))[0] ts_first.append(min(stream['time_stamps'][not_extrapolated])) diff --git a/Python/pyxdf/test/test_limit_on_synced_stamps.py b/Python/pyxdf/test/test_limit_on_synced_stamps.py new file mode 100644 index 0000000..514473b --- /dev/null +++ b/Python/pyxdf/test/test_limit_on_synced_stamps.py @@ -0,0 +1,71 @@ +from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap +import numpy as np +# %% +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.5,1.5,251), + np.linspace(.5, 1.5, 251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + # integer datatype stream, + streams[4] = MockStream(np.linspace(0.4,1.4,251), + np.linspace(4, 140, 251, dtype='int32'), + 250, ['int32']) + + return streams + +def mock(): + synced = _sync_timestamps(streams()) + return _limit_streams_to_overlap(synced) + +#%% test +# earliest overlapping timestamp is 1.0 from stream 1 +# latest overlapping timestamp is 1.4 from stream 4 +# fastest strteam is stream 1 with fs 1000 +def test_timestamps(): + 'check timestamps streams' + for s in mock().values(): + assert np.all(np.isclose(s['time_stamps'], np.linspace(1, 1.4, 401))) + +def test_first_timeseries(): + assert np.all(np.isclose(mock()[1]['time_series'], + np.linspace(1, 1.4, 401))) + +def test_second_timeseries(): + assert np.all(np.isclose(mock()[2]['time_series'], + np.linspace(1, 1.4, 401))) + +def test_third_timeseries(): + s = mock()[3]['time_series'] + idx = np.where(s!='')[0] + assert np.all(idx == [107, 200]) + assert mock()[3]['time_stamps'][idx[0]] == 1.107 # shifted to closest fit + assert mock()[3]['time_stamps'][idx[1]] == 1.2 # fits with fs 1000 + +def test_fourth_timeseries(): + # interpolation is tricky, as it depends on np.around and linear + # interpolation, which can not be approximated with np.linspace + # we therefore only test first and last value of the series + s = mock()[4]['time_series'] + assert np.isclose(s[0], 85) + assert np.isclose(s[-1], 140) \ No newline at end of file diff --git a/Python/pyxdf/test/test_sync_timestamps.py b/Python/pyxdf/test/test_sync_timestamps.py index 5a3eb5d..7504327 100644 --- a/Python/pyxdf/test/test_sync_timestamps.py +++ b/Python/pyxdf/test/test_sync_timestamps.py @@ -43,6 +43,11 @@ def synced(streams): return synced #%% test +def test_samplingrate(synced): + 'check that for all streams the sampling rate is identical to the fastest' + for s in synced.values(): + assert(s['info']['effective_srate']==1000) + def test_earliest(synced): 'should expand to the earliest timestamp, which is 0.1 from stream 2' for stream in synced.values(): @@ -88,3 +93,4 @@ def test_integer_interpolation(synced, streams): u = np.unique(synced[4]['time_series']) u = np.int64(np.compress(~np.isnan(u), u)) assert np.all(streams[4]['time_series'] == u) + \ No newline at end of file From 22b4beb085fb3b877585c7344be739f1934773de Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 31 Dec 2018 20:47:23 +0100 Subject: [PATCH 08/12] Refactor tests and add extrapolation test --- .../test/test_limit_streams_to_overlap.py | 68 ++++++------------- Python/pyxdf/test/test_sync_timestamps.py | 66 +++++++++++++----- 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/Python/pyxdf/test/test_limit_streams_to_overlap.py b/Python/pyxdf/test/test_limit_streams_to_overlap.py index 4d84916..791cc10 100644 --- a/Python/pyxdf/test/test_limit_streams_to_overlap.py +++ b/Python/pyxdf/test/test_limit_streams_to_overlap.py @@ -30,54 +30,24 @@ def __init__(self, timestamps, timeseries, effective_srate, channel_format): return streams # %% test - -def test_earliest_marker_stamp(): - 'check validity of first element in marker stream' - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[3]['time_stamps'][0], 1.1071) - -def test_latest_marker_stamp(): - 'check validity of first element in marker stream' - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[3]['time_stamps'][-1], 1.2) -def test_earliest_marker_series(): - olap = _limit_streams_to_overlap(streams()) - assert olap[3]['time_series'][0] == 'mark_1' - -def test_latest_marker_series(): - olap = _limit_streams_to_overlap(streams()) - assert olap[3]['time_series'][-1] == 'mark_2' - -def test_earliest_fast_stamp(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][0], 1.) - -def test_earliest_fast_series(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][0], 1.) - -def test_latest_fast_stamp(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][-1], 1.4) - -def test_latest_fast_series(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][-1], 1.4) - -def test_earliest_slow_stamp(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][0], 1.) - -def test_earliest_slow_series(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][0], 1.) - -def test_latest_slow_stamp(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][-1], 1.4) - -def test_latest_slow_series(): - olap = _limit_streams_to_overlap(streams()) - assert np.isclose(olap[1]['time_stamps'][-1], 1.4) +def test_timestamps(): + 'test whether the first and last timestamps have been selected as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), (1.1071, 1.2)]): + assert np.isclose(s['time_stamps'][0], v[0]) + assert np.isclose(s['time_stamps'][-1], v[-1]) + +def test_timeseries(): + 'test whether the first and last value are as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): + if s['info']['channel_format'] != ['string']: + assert np.isclose(s['time_series'][0], v[0]) + assert np.isclose(s['time_series'][-1], v[-1]) + else: + assert s['time_series'][0] == v[0] + assert s['time_series'][-1] == v[-1] diff --git a/Python/pyxdf/test/test_sync_timestamps.py b/Python/pyxdf/test/test_sync_timestamps.py index 7504327..fd00602 100644 --- a/Python/pyxdf/test/test_sync_timestamps.py +++ b/Python/pyxdf/test/test_sync_timestamps.py @@ -48,19 +48,15 @@ def test_samplingrate(synced): for s in synced.values(): assert(s['info']['effective_srate']==1000) -def test_earliest(synced): - 'should expand to the earliest timestamp, which is 0.1 from stream 2' - for stream in synced.values(): - assert np.isclose(stream['time_stamps'][0], 0.1) - -def test_lastest(synced): - 'should expand to the latest timestamp, which is 2.5 from stream 3' - for stream in synced.values(): - assert np.isclose(stream['time_stamps'][-1], 2.5) +def test_timestamps(synced): + '''test whether all timestamps are identical and as expected + earliest timestamp is 0.1 from stream 1,4; latest is 2.5 from stream 3''' + for s in synced.values(): + assert np.all(np.isclose(s['time_stamps'], np.linspace(.1, 2.5, 2401))) -def test_identical_steps(synced): - for stream in synced.values(): - 'all steos between samples should be identical within float precision' +def test_identical_timestamp_steps(synced): + 'all steps between samples should be identical within float precision' + for stream in synced.values(): uq = np.diff(stream['time_stamps']) assert np.all([np.isclose(u, uq) for u in uq]) @@ -72,16 +68,47 @@ def test_markerstream(synced, streams): for marker in streams[3]['time_series']: idx.append(synced[3]['time_series'].tolist().index([marker])) assert np.all(np.equal(synced[3]['time_stamps'][idx], - np.array([0.2 , 1.107, 1.2 , 1.9 , 2.5 ]))) + np.array([0.2 , #identical + 1.107, #shifted + 1.2 , 1.9 , 2.5 ]#shifted + ))) def test_interpolation(synced): '''the linearly to 1000 Hz interpolated data from stream 2 should - be identical with a linspace with 1001 samples''' + be identical with a linspace with 1001 samples.''' idx = np.where(~np.isnan(synced[2]['time_series']))[0] assert np.all(np.isclose(synced[2]['time_series'][idx], np.linspace(2, 1, 1001))) - - + +def test_extrapolation(synced, streams): + '''extrapolated data should be nans or "" for markers ''' + # stream 1 is extrapolated from 0.1 up to 1 and after 2 to 2.5 + idx = np.where(np.isnan(synced[1]['time_series']))[0] + values = synced[1]['time_stamps'][idx] + expectation = np.hstack((np.linspace(.1, 1, 900, endpoint=False), + np.linspace(2.001, 2.5, 500, endpoint=True))) + assert np.all(np.isclose(values, expectation)) + + # stream 2 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[2]['time_series']))[0] + values = synced[2]['time_stamps'][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + + # stream 3 is extrapolated everywhere but at the marker stamps + # can be tested easier by checking whether the count of [''] + # equals the total samples minus the original markers + values = np.where(synced[3]['time_series']==[''])[0].shape[0] + expectation = (synced[3]['time_series'].shape[0] - + len(streams[3]['time_series'])) + assert values == expectation + + # stream 4 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[4]['time_series']))[0] + values = synced[4]['time_stamps'][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + def test_identity_after_interpolation(synced, streams): 'the data for stream 1 should be identical to original' idx = np.where(~np.isnan(synced[1]['time_series']))[0] @@ -89,7 +116,12 @@ def test_identity_after_interpolation(synced, streams): streams[1]['time_series'])) def test_integer_interpolation(synced, streams): - 'the data of stream 4 should be all integers' + '''the data of stream 4 should be all integers. + + .. note:: + interpolation is tricky, as it depends on np.around and linear + interpolation, which can not be approximated with np.linspace + ''' u = np.unique(synced[4]['time_series']) u = np.int64(np.compress(~np.isnan(u), u)) assert np.all(streams[4]['time_series'] == u) From 94bcb11d617ad90971ab4fec6d054f53d81407c2 Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 31 Dec 2018 21:23:15 +0100 Subject: [PATCH 09/12] Add docstrings to private functions Add docstring for _sync_timestamps Add docstring for _limit_streams_to_overlap --- Python/pyxdf/pyxdf.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index f302b26..49f5f0c 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -591,6 +591,22 @@ def _jitter_removal(streams, def _sync_timestamps(streams, kind='linear'): '''syncs all streams to the fastest sampling rate by shifting or upsampling + + Depending on a streams channel-format, extrapolation is performed using + with NaNs (numerical formats) or with [''] (string format). + + Interpolation is only performed for numeric values, and depending on the + kind argument which is inherited from scipy.interpolate.interp1d. Consider + that If the channel format is an integer type (i.e. 'int8', 'int16', + 'int32', or 'int64'), integer output is enforced by rounding the values. + Additionally, consider that not all interpolation methods are convex, i.e. + for some kinds, you might receive values above or below the desired + integer type. There is no correction implemented for this, as it is assumed + this is a desired behavior if you give the appropriate argument. + + For string formats, events are shifted towards the nearest feasible + timepoint. Any time-stamps without a marker get then assigned an empty + marker, i.e. ['']. ''' # selecting the stream with the highest effective sampling rate @@ -687,7 +703,15 @@ def _sync_timestamps(streams, kind='linear'): def _limit_streams_to_overlap(streams): + '''takes streams, returns streams limited to time periods overlapping + between all streams + The overlapping periods start and end for each streams with the first and + last sample completely within the overlapping period. + If time_stamps have been synced, these are the same time-points for all + streams. Consider that in the case of unsynced time-stamps, the time-stamps + can not be exactly equal! + ''' ts_first, ts_last = [], [] for stream in streams.values(): # skip streams with fs=0 or if they send strings, because they might From 2a606478bfc91e3ea7a34393c2aa8cc06ce383a4 Mon Sep 17 00:00:00 2001 From: agricolab Date: Sat, 12 Jan 2019 08:51:51 +0100 Subject: [PATCH 10/12] Move scipy import into sync_timestamp --- Python/pyxdf/pyxdf.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 49f5f0c..a9b8126 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -12,7 +12,6 @@ import xml.etree.ElementTree as ET from collections import OrderedDict, defaultdict import logging -from scipy.interpolate import interp1d import numpy as np __all__ = ['load_xdf'] @@ -59,13 +58,14 @@ def load_xdf(filename, sync_timestamps: {bool str} sync timestamps of all streams sample-wise with the stream to the - highest effective sampling rate + highest effective sampling rate. Using sync_timestamps method has + a dependency on scipy, which is not a hard requirement of pyxdf. False -> no syncing True -> linear syncing str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’> for method inherited from - scipy.interpolate.interp1d + scipy.interpolate.interp1d. overlap_timestamps: bool If true, return only overlapping streams, i.e. all streams @@ -608,7 +608,7 @@ def _sync_timestamps(streams, kind='linear'): timepoint. Any time-stamps without a marker get then assigned an empty marker, i.e. ['']. ''' - + from scipy.interpolate import interp1d # selecting the stream with the highest effective sampling rate srate_key = 'effective_srate' srates = [stream['info'][srate_key] for stream in streams.values()] From 91547cba9e8ad0b1c31a31ba6deaa118f79d8a94 Mon Sep 17 00:00:00 2001 From: agricolab Date: Sat, 12 Jan 2019 09:19:47 +0100 Subject: [PATCH 11/12] Move interpolation into private function Check dependencies for scipy/numpy and run at least minimal interpolation if scipy is not installed --- Python/pyxdf/pyxdf.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index a9b8126..0ecde76 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -587,7 +587,20 @@ def _jitter_removal(streams, stream.effective_srate = 0 return streams - +def _interpolate(x:np.ndarray, y:np.ndarray, new_x:np.ndarray, + kind='linear') -> np.ndarray: + try: + from scipy.interpolate import interp1d + f = interp1d(x, y, kind=kind, axis=0, + assume_sorted=True, #speed up + bounds_error=False) + return f(new_x) + except ImportError as e: + if kind != 'linear': + raise e + else: + return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) + def _sync_timestamps(streams, kind='linear'): '''syncs all streams to the fastest sampling rate by shifting or upsampling @@ -607,8 +620,7 @@ def _sync_timestamps(streams, kind='linear'): For string formats, events are shifted towards the nearest feasible timepoint. Any time-stamps without a marker get then assigned an empty marker, i.e. ['']. - ''' - from scipy.interpolate import interp1d + ''' # selecting the stream with the highest effective sampling rate srate_key = 'effective_srate' srates = [stream['info'][srate_key] for stream in streams.values()] @@ -680,11 +692,8 @@ def _sync_timestamps(streams, kind='linear'): # zone with NaNs y = stream['time_series'] x = stream['time_stamps'] - f = interp1d(x, y, kind=kind, axis=0, - assume_sorted=True, #speed up - bounds_error=False) - - stream['time_series'] = f(new_timestamps) + + stream['time_series'] = _interpolate(x, y, new_timestamps, kind) stream['time_stamps'] = new_timestamps if channel_format in ['int8', 'int16', 'int32', 'int64']: From cf8e3f7b9fe8d078b889f9dffc0b158f1897aeaa Mon Sep 17 00:00:00 2001 From: agricolab Date: Sat, 12 Jan 2019 09:54:59 +0100 Subject: [PATCH 12/12] Update docstrings for conditional interpolation --- Python/pyxdf/pyxdf.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 0ecde76..23471ce 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -58,8 +58,11 @@ def load_xdf(filename, sync_timestamps: {bool str} sync timestamps of all streams sample-wise with the stream to the - highest effective sampling rate. Using sync_timestamps method has - a dependency on scipy, which is not a hard requirement of pyxdf. + highest effective sampling rate. Using sync_timestamps with any + method other than linear has dependency on scipy, which is not a + hard requirement of pyxdf. If scipy is not installed in your + environment, the method supports linear interpolation with + numpy. False -> no syncing True -> linear syncing