From 3db5c6d1a5dec3055ec3e1237d4a45cde97530b8 Mon Sep 17 00:00:00 2001 From: agricolab Date: Fri, 3 Nov 2023 11:31:52 +0100 Subject: [PATCH 01/14] Update to most recent tip of main --- .gitignore | 2 +- pyxdf/pyxdf.py | 231 +++++++++++++++++++- pyxdf/test/conftest.py | 16 ++ pyxdf/test/test_data.py | 64 ++++-- pyxdf/test/test_limit_on_synced_stamps.py | 94 ++++++++ pyxdf/test/test_limit_streams_to_overlap.py | 53 +++++ pyxdf/test/test_sync_timestamps.py | 157 +++++++++++++ 7 files changed, 589 insertions(+), 28 deletions(-) create mode 100644 pyxdf/test/conftest.py create mode 100644 pyxdf/test/test_limit_on_synced_stamps.py create mode 100644 pyxdf/test/test_limit_streams_to_overlap.py create mode 100644 pyxdf/test/test_sync_timestamps.py diff --git a/.gitignore b/.gitignore index e168bc2..296a331 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ __pycache__/ *.py[cod] *$py.class - +example-files # C extensions *.so diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index e1f16d5..7eb37f8 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -17,7 +17,7 @@ from collections import OrderedDict, defaultdict import logging from pathlib import Path - +import warnings import numpy as np @@ -81,7 +81,9 @@ def load_xdf( clock_reset_threshold_offset_seconds=1, clock_reset_threshold_offset_stds=10, winsor_threshold=0.0001, - verbose=None + verbose=None, + sync_timestamps=False, + overlap_timestamps=False, ): """Import an XDF file. @@ -122,6 +124,27 @@ def load_xdf( dejitter_timestamps : Whether to perform jitter removal for regularly sampled streams. (default: true) + sync_timestamps: {bool str} + sync timestamps of all streams sample-wise with the stream to the + highest effective sampling rate. Using sync_timestamps with any + method other than linear has dependency on scipy, which is not a + hard requirement of pyxdf. If scipy is not installed in your + environment, the method supports linear interpolation with + numpy. + + False -> no syncing + True -> linear syncing + str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, + ‘previous’, ‘next’> for method inherited from + scipy.interpolate.interp1d. + + overlap_timestamps: bool + If true, return only overlapping streams, i.e. all streams + are limited to periods where all streams have data. (default: True) + If false, depends on whether sync_timestamps is set. If set, it + expands all streams to include the earliest and latest timestamp of + any stream, if not set it simply return streams independently. + ` on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify the data (for example, sub-sample it). The four input arguments @@ -402,6 +425,17 @@ def load_xdf( stream["time_series"] = tmp.time_series stream["time_stamps"] = tmp.time_stamps + # sync sampling with the fastest timeseries by interpolation / shifting + if sync_timestamps: + if type(sync_timestamps) is not str: + sync_timestamps = "linear" + logger.warning('sync_timestamps defaults to "linear"') + streams = _sync_timestamps(streams, kind=sync_timestamps) + + # limit streams to their overlapping periods + if overlap_timestamps: + streams = _limit_streams_to_overlap(streams) + streams = [s for s in streams.values()] return streams, fileheader @@ -501,7 +535,7 @@ def _xml2dict(t): def _scan_forward(f): """Scan forward through file object until after the next boundary chunk.""" - blocklen = 2 ** 20 + blocklen = 2**20 signature = bytes( [ 0x43, @@ -859,3 +893,194 @@ def _read_chunks(f): def _parse_streamheader(xml): """Parse stream header XML.""" return {el.tag: el.text for el in xml if el.tag != "desc"} + + +def _interpolate( + x: np.ndarray, y: np.ndarray, new_x: np.ndarray, kind="linear" +) -> np.ndarray: + """Perform interpolation for _sync_timestamps + + If scipy is not installed, the method falls back to numpy, and then only + supports linear interpolation. + """ + try: + from scipy.interpolate import interp1d + + f = interp1d( + x, + y, + kind=kind, + axis=0, + assume_sorted=True, # speed up + bounds_error=False, + ) + return f(new_x) + except ImportError as e: + if kind != "linear": + raise e + else: + return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) + + +def _sync_timestamps(streams, kind="linear"): + """Sync all streams to the fastest sampling rate by shifting or upsampling. + + Depending on a streams channel-format, extrapolation is performed using + with NaNs (numerical formats) or with [''] (string format). + + Interpolation is only performed for numeric values, and depending on the + kind argument which is inherited from scipy.interpolate.interp1d. Consider + that If the channel format is an integer type (i.e. 'int8', 'int16', + 'int32', or 'int64'), integer output is enforced by rounding the values. + Additionally, consider that not all interpolation methods are convex, i.e. + for some kinds, you might receive values above or below the desired + integer type. There is no correction implemented for this, as it is assumed + this is a desired behavior if you give the appropriate argument. + + For string formats, events are shifted towards the nearest feasible + timepoint. Any time-stamps without a marker get then assigned an empty + marker, i.e. ['']. + """ + # selecting the stream with the highest effective sampling rate + srate_key = "effective_srate" + srates = [stream["info"][srate_key] for stream in streams.values()] + max_fs = max(srates, default=0) + + if max_fs == 0: # either no valid stream or all streams are async + return streams + if srates.count(max_fs) > 1: + # highly unlikely, with floating point precision and sampling noise + # but anyways: better safe than sorry + logger.warning( + "I found at least two streams with identical effective " + "srate. I select one at random for syncing timestamps." + ) + + # selecting maximal time range of the whole recording + # streams with fs=0 might are not dejittered be default, and therefore + # indexing first and last might miss the earliest/latest + # we therefore take the min and max timestamp + stamps = [stream["time_stamps"] for stream in streams.values()] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + + # generate new timestamps + # based on extrapolation of the fastest timestamps towards the maximal + # time range of the whole recording + fs_step = 1.0 / max_fs + new_timestamps = stamps[srates.index(max_fs)] + num_steps = int((new_timestamps[0] - ts_first) / fs_step) + 1 + front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) + num_steps = int((ts_last - new_timestamps[-1]) / fs_step) + 1 + end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) + + new_timestamps = np.concatenate( + (front_stamps, new_timestamps[1:-1], end_stamps), axis=0 + ) + + # interpolate or shift all streams to the new timestamps + for stream in streams.values(): + channel_format = stream["info"]["channel_format"][0] + + if (channel_format == "string") and (stream["info"][srate_key] == 0): + # you can't really interpolate strings; and streams with srate=0 + # don't have a real sampling rate. One approach to sync them is to + # shift their events to the nearest timestamp of the new + # timestamps + shifted_x = [] + for x in stream["time_stamps"]: + argmin = (abs(new_timestamps - x)).argmin() + shifted_x.append(new_timestamps[argmin]) + + shifted_y = [] + for x in new_timestamps: + try: + idx = shifted_x.index(x) + y = stream["time_series"][idx] + shifted_y.append([y]) + except ValueError: + shifted_y.append([""]) + + stream["time_series"] = np.asanyarray((shifted_y)) + stream["time_stamps"] = new_timestamps + + elif channel_format in [ + "float32", + "double64", + "int8", + "int16", + "int32", + "int64", + ]: + # continuous interpolation is possible using interp1d + # discrete interpolation requires some finetuning + # bounds_error=False replaces everything outside of interpolation + # zone with NaNs + y = stream["time_series"] + x = stream["time_stamps"] + + stream["time_series"] = _interpolate(x, y, new_timestamps, kind) + stream["time_stamps"] = new_timestamps + + if channel_format in ["int8", "int16", "int32", "int64"]: + # i am stuck with float64s, as integers have no nans + # therefore i round to the nearest integer instead + stream["time_series"] = np.around(stream["time_series"], 0) + elif (channel_format == "string") and (stream["info"][srate_key] != 0): + warnings.warn( + "Can't interpolate a channel with channel format string and an effective sampling rate != 0" + ) + else: + raise NotImplementedError( + "Don't know how to sync sampling for " + "channel_format=" + "{}".format(channel_format) + ) + stream["info"]["effective_srate"] = max_fs + + return streams + + +def _limit_streams_to_overlap(streams): + """takes streams, returns streams limited to time periods overlapping + between all streams + + The overlapping periods start and end for each streams with the first and + last sample completely within the overlapping period. + If time_stamps have been synced, these are the same time-points for all + streams. Consider that in the case of unsynced time-stamps, the time-stamps + can not be exactly equal! + """ + ts_first, ts_last = [], [] + for stream in streams.values(): + # skip streams with fs=0 or if they send strings, because they might + # just not yet have send anything on purpose (i.e. markers) + # while other data was already being recorded. + if ( + stream["info"]["effective_srate"] != 0 + and stream["info"]["channel_format"][0] != "string" + ): + # extrapolation in _sync_timestamps is done with NaNs + not_extrapolated = np.where(~np.isnan(stream["time_series"]))[0] + ts_first.append(min(stream["time_stamps"][not_extrapolated])) + ts_last.append(max(stream["time_stamps"][not_extrapolated])) + + ts_first = max(ts_first) + ts_last = min(ts_last) + for stream in streams.values(): + # use np.around to prevent floating point hickups + around = np.around(stream["time_stamps"], 15) + a = np.where(around >= ts_first)[0] + b = np.where(around <= ts_last)[0] + select = np.intersect1d(a, b) + if type(stream["time_stamps"]) is list: + stream["time_stamps"] = [stream["time_stamps"][s] for s in select] + else: + stream["time_stamps"] = stream["time_stamps"][select] + + if type(stream["time_series"]) is list: + stream["time_series"] = [stream["time_series"][s] for s in select] + else: + stream["time_series"] = stream["time_series"][select] + + return streams diff --git a/pyxdf/test/conftest.py b/pyxdf/test/conftest.py new file mode 100644 index 0000000..f5f7d7b --- /dev/null +++ b/pyxdf/test/conftest.py @@ -0,0 +1,16 @@ +from pytest import fixture + + +@fixture(scope="session") +def example_files(): + from pathlib import Path + + # requires git clone https://github.com/xdf-modules/example-files.git + # into the root xdf-python folder + path = Path("example-files") + extensions = ["*.xdf", "*.xdfz", "*.xdf.gz"] + files = [] + for ext in extensions: + files.extend(path.glob(ext)) + files = [str(file) for file in files] + yield files diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index b8005d0..ce782b1 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -29,16 +29,21 @@ def test_load_file(file): assert streams[0]["info"]["channel_format"][0] == "int16" assert streams[0]["info"]["stream_id"] == 0 - s = np.array([[192, 255, 238], - [12, 22, 32], - [13, 23, 33], - [14, 24, 34], - [15, 25, 35], - [12, 22, 32], - [13, 23, 33], - [14, 24, 34], - [15, 25, 35]], dtype=np.int16) - t = np.array([5., 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8]) + s = np.array( + [ + [192, 255, 238], + [12, 22, 32], + [13, 23, 33], + [14, 24, 34], + [15, 25, 35], + [12, 22, 32], + [13, 23, 33], + [14, 24, 34], + [15, 25, 35], + ], + dtype=np.int16, + ) + t = np.array([5.0, 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8]) np.testing.assert_array_equal(streams[0]["time_series"], s) np.testing.assert_array_almost_equal(streams[0]["time_stamps"], t) @@ -49,20 +54,31 @@ def test_load_file(file): assert streams[1]["info"]["channel_format"][0] == "string" assert streams[1]["info"]["stream_id"] == 0x02C0FFEE - s = [['LabRecorder xdfwriter' - '5.1' - '5.99' - '-.01' - '-.02' - ''], - ['Hello'], - ['World'], - ['from'], - ['LSL'], - ['Hello'], - ['World'], - ['from'], - ['LSL']] + s = [ + [ + 'LabRecorder xdfwriter' + "5.1" + "5.99" + "-.01" + "-.02" + "" + ], + ["Hello"], + ["World"], + ["from"], + ["LSL"], + ["Hello"], + ["World"], + ["from"], + ["LSL"], + ] t = np.array([5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8, 5.9]) assert streams[1]["time_series"] == s np.testing.assert_array_almost_equal(streams[1]["time_stamps"], t) + + +def test_smoketest_sync(example_files): + for file in files: + streams, header = load_xdf( + file, sync_timestamps="linear", overlap_timestamps=False + ) diff --git a/pyxdf/test/test_limit_on_synced_stamps.py b/pyxdf/test/test_limit_on_synced_stamps.py new file mode 100644 index 0000000..a0276d5 --- /dev/null +++ b/pyxdf/test/test_limit_on_synced_stamps.py @@ -0,0 +1,94 @@ +from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap +import numpy as np + + +# %% +def streams(): + # generate mock-streams + class MockStream(dict): + def __init__( + self, timestamps, timeseries, effective_srate, channel_format + ): + self["time_series"] = timeseries + self["time_stamps"] = timestamps + self["info"] = {} + self["info"]["effective_srate"] = effective_srate + self["info"]["channel_format"] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream( + np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, ["float32"] + ) + + # slowest stream, earliest timestamp + streams[2] = MockStream( + np.linspace(0.5, 1.5, 251), + np.linspace(0.5, 1.5, 251), + 250, + ["float32"], + ) + + # marker stream + streams[3] = MockStream( + [0.2, 1.1071, 1.2, 1.9, 2.5], + ["mark_" + str(n) for n in range(0, 5, 1)], + 0, + ["string"], + ) + # integer datatype stream, + streams[4] = MockStream( + np.linspace(0.4, 1.4, 251), + np.linspace(4, 140, 251, dtype="int32"), + 250, + ["int32"], + ) + + return streams + + +def mock(): + synced = _sync_timestamps(streams()) + return _limit_streams_to_overlap(synced) + + +# %% test +# earliest overlapping timestamp is 1.0 from stream 1 +# latest overlapping timestamp is 1.4 from stream 4 +# fastest strteam is stream 1 with fs 1000 +def test_timestamps(): + "check timestamps streams" + for s in mock().values(): + assert np.all(np.isclose(s["time_stamps"], np.linspace(1, 1.4, 401))) + + +def test_first_timeseries(): + assert np.all( + np.isclose(mock()[1]["time_series"], np.linspace(1, 1.4, 401)) + ) + + +def test_second_timeseries(): + assert np.all( + np.isclose(mock()[2]["time_series"], np.linspace(1, 1.4, 401)) + ) + + +def test_third_timeseries(): + s = mock()[3]["time_series"] + idx = [] + for i, _s in enumerate(s): + if _s != "": + idx.append(i) + assert np.all(idx == [107, 200]) + assert mock()[3]["time_stamps"][idx[0]] == 1.107 # shifted to closest fit + assert mock()[3]["time_stamps"][idx[1]] == 1.2 # fits with fs 1000 + + +def test_fourth_timeseries(): + # interpolation is tricky, as it depends on np.around and linear + # interpolation, which can not be approximated with np.linspace + # we therefore only test first and last value of the series + s = mock()[4]["time_series"] + assert np.isclose(s[0], 85) + assert np.isclose(s[-1], 140) diff --git a/pyxdf/test/test_limit_streams_to_overlap.py b/pyxdf/test/test_limit_streams_to_overlap.py new file mode 100644 index 0000000..791cc10 --- /dev/null +++ b/pyxdf/test/test_limit_streams_to_overlap.py @@ -0,0 +1,53 @@ +from pyxdf.pyxdf import _limit_streams_to_overlap +import numpy as np +# %% +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.4,1.4,251), + np.linspace(0.4,1.4,251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + return streams + +# %% test + +def test_timestamps(): + 'test whether the first and last timestamps have been selected as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), (1.1071, 1.2)]): + assert np.isclose(s['time_stamps'][0], v[0]) + assert np.isclose(s['time_stamps'][-1], v[-1]) + +def test_timeseries(): + 'test whether the first and last value are as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): + if s['info']['channel_format'] != ['string']: + assert np.isclose(s['time_series'][0], v[0]) + assert np.isclose(s['time_series'][-1], v[-1]) + else: + assert s['time_series'][0] == v[0] + assert s['time_series'][-1] == v[-1] + diff --git a/pyxdf/test/test_sync_timestamps.py b/pyxdf/test/test_sync_timestamps.py new file mode 100644 index 0000000..952cb1c --- /dev/null +++ b/pyxdf/test/test_sync_timestamps.py @@ -0,0 +1,157 @@ +from pyxdf.pyxdf import _sync_timestamps +import numpy as np +from copy import deepcopy +import pytest + + +# %% +@pytest.fixture(scope="session") +def streams(): + # generate mock-streams + class MockStream(dict): + def __init__( + self, timestamps, timeseries, effective_srate, channel_format + ): + self["time_series"] = timeseries + self["time_stamps"] = timestamps + self["info"] = {} + self["info"]["effective_srate"] = effective_srate + self["info"]["channel_format"] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream( + np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, ["float32"] + ) + + # slowest stream, earliest timestamp + streams[2] = MockStream( + np.linspace(0.1, 1.1, 251), np.linspace(2, 1, 251), 250, ["float32"] + ) + + # marker stream + streams[3] = MockStream( + [0.2, 1.1071, 1.2, 1.9, 2.5], + ["mark_" + str(n) for n in range(0, 5, 1)], + 0, + ["string"], + ) + # integer datatype stream, + streams[4] = MockStream( + np.linspace(0.1, 1.1, 251), + np.linspace(0, 250, 251, dtype="int32"), + 250, + ["int32"], + ) + + return streams + + +@pytest.fixture(scope="session") +def synced(streams): + synced = _sync_timestamps(deepcopy(streams)) + return synced + + +# %% test +def test_samplingrate(synced): + "check that for all streams the sampling rate is identical to the fastest" + for s in synced.values(): + assert s["info"]["effective_srate"] == 1000 + + +def test_timestamps(synced): + """test whether all timestamps are identical and as expected + earliest timestamp is 0.1 from stream 1,4; latest is 2.5 from stream 3""" + for s in synced.values(): + assert np.all( + np.isclose(s["time_stamps"], np.linspace(0.1, 2.5, 2401)) + ) + + +def test_identical_timestamp_steps(synced): + "all steps between samples should be identical within float precision" + for stream in synced.values(): + uq = np.diff(stream["time_stamps"]) + assert np.all([np.isclose(u, uq) for u in uq]) + + +def test_markerstream(synced, streams): + """check markers, they should be identical to the original but the second, + which should be shifted towards the next valid sample of the fastest + stream""" + idx = [] + for marker in streams[3]["time_series"]: + idx.append(synced[3]["time_series"].tolist().index([marker])) + assert np.all( + np.equal( + synced[3]["time_stamps"][idx], + np.array( + [0.2, 1.107, 1.2, 1.9, 2.5] # identical # shifted # shifted + ), + ) + ) + + +def test_interpolation(synced): + """the linearly to 1000 Hz interpolated data from stream 2 should + be identical with a linspace with 1001 samples.""" + idx = np.where(~np.isnan(synced[2]["time_series"]))[0] + assert np.all( + np.isclose(synced[2]["time_series"][idx], np.linspace(2, 1, 1001)) + ) + + +def test_extrapolation(synced, streams): + """extrapolated data should be nans or "" for markers""" + # stream 1 is extrapolated from 0.1 up to 1 and after 2 to 2.5 + idx = np.where(np.isnan(synced[1]["time_series"]))[0] + values = synced[1]["time_stamps"][idx] + expectation = np.hstack( + ( + np.linspace(0.1, 1, 900, endpoint=False), + np.linspace(2.001, 2.5, 500, endpoint=True), + ) + ) + assert np.all(np.isclose(values, expectation)) + + # stream 2 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[2]["time_series"]))[0] + values = synced[2]["time_stamps"][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + + # stream 3 is extrapolated everywhere but at the marker stamps + # can be tested easier by checking whether the count of [''] + # equals the total samples minus the original markers + values = np.where(synced[3]["time_series"] == [""])[0].shape[0] + expectation = synced[3]["time_series"].shape[0] - len( + streams[3]["time_series"] + ) + assert values == expectation + + # stream 4 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[4]["time_series"]))[0] + values = synced[4]["time_stamps"][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + + +def test_identity_after_interpolation(synced, streams): + "the data for stream 1 should be identical to original" + idx = np.where(~np.isnan(synced[1]["time_series"]))[0] + assert np.all( + np.isclose(synced[1]["time_series"][idx], streams[1]["time_series"]) + ) + + +def test_integer_interpolation(synced, streams): + """the data of stream 4 should be all integers. + + .. note:: + interpolation is tricky, as it depends on np.around and linear + interpolation, which can not be approximated with np.linspace + """ + u = np.unique(synced[4]["time_series"]) + u = np.int64(np.compress(~np.isnan(u), u)) + assert np.all(streams[4]["time_series"] == u) From 60d99684f94ee7a116cd86b923e9dbad40c1b5cd Mon Sep 17 00:00:00 2001 From: agricolab Date: Fri, 3 Nov 2023 12:00:48 +0100 Subject: [PATCH 02/14] Accept a use_samplingrate argument --- pyxdf/pyxdf.py | 55 ++++++++++++++++++++---------- pyxdf/test/test_sync_timestamps.py | 16 ++++++++- 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 7eb37f8..9b997db 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -83,6 +83,7 @@ def load_xdf( winsor_threshold=0.0001, verbose=None, sync_timestamps=False, + use_samplingrate="highest", overlap_timestamps=False, ): """Import an XDF file. @@ -125,8 +126,8 @@ def load_xdf( sampled streams. (default: true) sync_timestamps: {bool str} - sync timestamps of all streams sample-wise with the stream to the - highest effective sampling rate. Using sync_timestamps with any + sync timestamps of all streams sample-wise with the stream of the + highest effective sampling rate (or as set in use_samplingrate). Using sync_timestamps with any method other than linear has dependency on scipy, which is not a hard requirement of pyxdf. If scipy is not installed in your environment, the method supports linear interpolation with @@ -138,6 +139,11 @@ def load_xdf( ‘previous’, ‘next’> for method inherited from scipy.interpolate.interp1d. + use_samplingrate: {int str} = "highest" + resamples all channels to an effective sampling rate as given by this argument, using the method as defined by sync_timestamps. Will do nothing if sync_timestamps is False or the sampling rate is not larger than zero. + int>0: resample to this sampling rate. + str:<'highest'> use the highest + overlap_timestamps: bool If true, return only overlapping streams, i.e. all streams are limited to periods where all streams have data. (default: True) @@ -430,7 +436,9 @@ def load_xdf( if type(sync_timestamps) is not str: sync_timestamps = "linear" logger.warning('sync_timestamps defaults to "linear"') - streams = _sync_timestamps(streams, kind=sync_timestamps) + streams = _sync_timestamps( + streams, kind=sync_timestamps, use_samplingrate=use_samplingrate + ) # limit streams to their overlapping periods if overlap_timestamps: @@ -922,7 +930,7 @@ def _interpolate( return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) -def _sync_timestamps(streams, kind="linear"): +def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): """Sync all streams to the fastest sampling rate by shifting or upsampling. Depending on a streams channel-format, extrapolation is performed using @@ -945,7 +953,6 @@ def _sync_timestamps(streams, kind="linear"): srate_key = "effective_srate" srates = [stream["info"][srate_key] for stream in streams.values()] max_fs = max(srates, default=0) - if max_fs == 0: # either no valid stream or all streams are async return streams if srates.count(max_fs) > 1: @@ -961,22 +968,34 @@ def _sync_timestamps(streams, kind="linear"): # indexing first and last might miss the earliest/latest # we therefore take the min and max timestamp stamps = [stream["time_stamps"] for stream in streams.values()] - ts_first = min((min(s) for s in stamps)) - ts_last = max((max(s) for s in stamps)) + ts_first = min((min(s) for s in stamps)) # earliest sample of all streams + ts_last = max((max(s) for s in stamps)) # latest sample of all streams # generate new timestamps # based on extrapolation of the fastest timestamps towards the maximal # time range of the whole recording - fs_step = 1.0 / max_fs - new_timestamps = stamps[srates.index(max_fs)] - num_steps = int((new_timestamps[0] - ts_first) / fs_step) + 1 - front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) - num_steps = int((ts_last - new_timestamps[-1]) / fs_step) + 1 - end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) - - new_timestamps = np.concatenate( - (front_stamps, new_timestamps[1:-1], end_stamps), axis=0 - ) + if use_samplingrate == "highest": + new_srate = max_fs + elif type(use_samplingrate) == int: + new_srate = use_samplingrate + else: + raise NotImplementedError( + f"Unknown parameter for use_samplingrate: {use_samplingrate}" + ) + fs_step = 1.0 / new_srate + if new_srate == max_fs: + new_timestamps = stamps[ + srates.index(max_fs) + ] # pick fastest stream regardless of finally used srate + num_steps = int((new_timestamps[0] - ts_first) / fs_step) + 1 + front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) + num_steps = int((ts_last - new_timestamps[-1]) / fs_step) + 1 + end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) + new_timestamps = np.concatenate( + (front_stamps, new_timestamps[1:-1], end_stamps), axis=0 + ) + else: + new_timestamps = np.arange(ts_first, ts_last, fs_step) # interpolate or shift all streams to the new timestamps for stream in streams.values(): @@ -1036,7 +1055,7 @@ def _sync_timestamps(streams, kind="linear"): "channel_format=" "{}".format(channel_format) ) - stream["info"]["effective_srate"] = max_fs + stream["info"]["effective_srate"] = new_srate return streams diff --git a/pyxdf/test/test_sync_timestamps.py b/pyxdf/test/test_sync_timestamps.py index 952cb1c..9b39031 100644 --- a/pyxdf/test/test_sync_timestamps.py +++ b/pyxdf/test/test_sync_timestamps.py @@ -53,8 +53,22 @@ def synced(streams): return synced +@pytest.fixture(scope="session") +def resampled(streams): + resampled = _sync_timestamps(deepcopy(streams), use_samplingrate=256) + return resampled + + # %% test -def test_samplingrate(synced): +def test_samplingrate_resampled(resampled): + "check that for all streams the sampling rate is identical to the fastest" + for s in resampled.values(): + assert s["info"]["effective_srate"] == 256 + efs = 1 / np.min(np.diff(s["time_stamps"])) + assert (efs - 256) < 0.001 + + +def test_samplingrate_highest(synced): "check that for all streams the sampling rate is identical to the fastest" for s in synced.values(): assert s["info"]["effective_srate"] == 1000 From a487058f1ca4551e85d229c54a4792f782ee2223 Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 9 Nov 2023 09:57:47 +0100 Subject: [PATCH 03/14] Update unit tests to new interface --- pyxdf/pyxdf.py | 100 +++++++++++--------- pyxdf/test/test_limit_on_synced_stamps.py | 40 ++++---- pyxdf/test/test_limit_streams_to_overlap.py | 34 +++---- pyxdf/test/test_sync_timestamps.py | 63 ++++++------ 4 files changed, 122 insertions(+), 115 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 9b997db..04b6f14 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -416,6 +416,24 @@ def load_xdf( else: stream.effective_srate = 0.0 + # sync sampling with the fastest timeseries by interpolation / shifting + for k in streams.keys(): + stream = streams[k] + tmp = temp[k] + tmp.channel_format = stream["info"]["channel_format"][0] + + if sync_timestamps: + if type(sync_timestamps) is not str: + sync_timestamps = "linear" + logger.warning('sync_timestamps defaults to "linear"') + temp = _sync_timestamps( + temp, kind=sync_timestamps, use_samplingrate=use_samplingrate + ) + + # limit streams to their overlapping periods + if overlap_timestamps: + temp = _limit_streams_to_overlap(temp) + for k in streams.keys(): stream = streams[k] tmp = temp[k] @@ -430,20 +448,6 @@ def load_xdf( stream["info"]["effective_srate"] = tmp.effective_srate stream["time_series"] = tmp.time_series stream["time_stamps"] = tmp.time_stamps - - # sync sampling with the fastest timeseries by interpolation / shifting - if sync_timestamps: - if type(sync_timestamps) is not str: - sync_timestamps = "linear" - logger.warning('sync_timestamps defaults to "linear"') - streams = _sync_timestamps( - streams, kind=sync_timestamps, use_samplingrate=use_samplingrate - ) - - # limit streams to their overlapping periods - if overlap_timestamps: - streams = _limit_streams_to_overlap(streams) - streams = [s for s in streams.values()] return streams, fileheader @@ -701,7 +705,8 @@ def _jitter_removal(streams, threshold_seconds=1, threshold_samples=500): # 0th sample is a segment start and last sample is a segment stop seg_starts = np.hstack(([0], break_inds)) seg_stops = np.hstack((break_inds - 1, nsamples - 1)) # inclusive - + stream.seg_starts = seg_starts + stream.seg_stops = seg_stops # Process each segment separately for start_ix, stop_ix in zip(seg_starts, seg_stops): # Calculate time stamps assuming constant intervals within each @@ -950,8 +955,7 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): marker, i.e. ['']. """ # selecting the stream with the highest effective sampling rate - srate_key = "effective_srate" - srates = [stream["info"][srate_key] for stream in streams.values()] + srates = [stream.effective_srate for stream in streams.values()] max_fs = max(srates, default=0) if max_fs == 0: # either no valid stream or all streams are async return streams @@ -967,7 +971,7 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): # streams with fs=0 might are not dejittered be default, and therefore # indexing first and last might miss the earliest/latest # we therefore take the min and max timestamp - stamps = [stream["time_stamps"] for stream in streams.values()] + stamps = [stream.time_stamps for stream in streams.values()] ts_first = min((min(s) for s in stamps)) # earliest sample of all streams ts_last = max((max(s) for s in stamps)) # latest sample of all streams @@ -999,15 +1003,15 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): # interpolate or shift all streams to the new timestamps for stream in streams.values(): - channel_format = stream["info"]["channel_format"][0] - - if (channel_format == "string") and (stream["info"][srate_key] == 0): + if (stream.channel_format == "string") and ( + stream.effective_srate == 0 + ): # you can't really interpolate strings; and streams with srate=0 # don't have a real sampling rate. One approach to sync them is to # shift their events to the nearest timestamp of the new # timestamps shifted_x = [] - for x in stream["time_stamps"]: + for x in stream.time_stamps: argmin = (abs(new_timestamps - x)).argmin() shifted_x.append(new_timestamps[argmin]) @@ -1015,15 +1019,15 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): for x in new_timestamps: try: idx = shifted_x.index(x) - y = stream["time_series"][idx] + y = stream.time_series[idx] shifted_y.append([y]) except ValueError: shifted_y.append([""]) - stream["time_series"] = np.asanyarray((shifted_y)) - stream["time_stamps"] = new_timestamps + stream.time_series = np.asanyarray((shifted_y)) + stream.time_stamps = new_timestamps - elif channel_format in [ + elif stream.channel_format in [ "float32", "double64", "int8", @@ -1035,17 +1039,19 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): # discrete interpolation requires some finetuning # bounds_error=False replaces everything outside of interpolation # zone with NaNs - y = stream["time_series"] - x = stream["time_stamps"] + y = stream.time_series + x = stream.time_stamps - stream["time_series"] = _interpolate(x, y, new_timestamps, kind) - stream["time_stamps"] = new_timestamps + stream.time_series = _interpolate(x, y, new_timestamps, kind) + stream.time_stamps = new_timestamps - if channel_format in ["int8", "int16", "int32", "int64"]: + if stream.channel_format in ["int8", "int16", "int32", "int64"]: # i am stuck with float64s, as integers have no nans # therefore i round to the nearest integer instead - stream["time_series"] = np.around(stream["time_series"], 0) - elif (channel_format == "string") and (stream["info"][srate_key] != 0): + stream.time_series = np.around(stream.time_series, 0) + elif (stream.channel_format == "string") and ( + stream.effective_srate != 0 + ): warnings.warn( "Can't interpolate a channel with channel format string and an effective sampling rate != 0" ) @@ -1053,9 +1059,9 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): raise NotImplementedError( "Don't know how to sync sampling for " "channel_format=" - "{}".format(channel_format) + "{}".format(stream.channel_format) ) - stream["info"]["effective_srate"] = new_srate + stream.effective_srate = new_srate return streams @@ -1076,30 +1082,30 @@ def _limit_streams_to_overlap(streams): # just not yet have send anything on purpose (i.e. markers) # while other data was already being recorded. if ( - stream["info"]["effective_srate"] != 0 - and stream["info"]["channel_format"][0] != "string" + stream.effective_srate != 0 + and stream.channel_format != "string" ): # extrapolation in _sync_timestamps is done with NaNs - not_extrapolated = np.where(~np.isnan(stream["time_series"]))[0] - ts_first.append(min(stream["time_stamps"][not_extrapolated])) - ts_last.append(max(stream["time_stamps"][not_extrapolated])) + not_extrapolated = np.where(~np.isnan(stream.time_series))[0] + ts_first.append(min(stream.time_stamps[not_extrapolated])) + ts_last.append(max(stream.time_stamps[not_extrapolated])) ts_first = max(ts_first) ts_last = min(ts_last) for stream in streams.values(): # use np.around to prevent floating point hickups - around = np.around(stream["time_stamps"], 15) + around = np.around(stream.time_stamps, 15) a = np.where(around >= ts_first)[0] b = np.where(around <= ts_last)[0] select = np.intersect1d(a, b) - if type(stream["time_stamps"]) is list: - stream["time_stamps"] = [stream["time_stamps"][s] for s in select] + if type(stream.time_stamps) is list: + stream.time_stamps = [stream.time_stamps[s] for s in select] else: - stream["time_stamps"] = stream["time_stamps"][select] + stream.time_stamps = stream.time_stamps[select] - if type(stream["time_series"]) is list: - stream["time_series"] = [stream["time_series"][s] for s in select] + if type(stream.time_series) is list: + stream.time_series = [stream.time_series[s] for s in select] else: - stream["time_series"] = stream["time_series"][select] + stream.time_series = stream.time_series[select] return streams diff --git a/pyxdf/test/test_limit_on_synced_stamps.py b/pyxdf/test/test_limit_on_synced_stamps.py index a0276d5..7ff04e0 100644 --- a/pyxdf/test/test_limit_on_synced_stamps.py +++ b/pyxdf/test/test_limit_on_synced_stamps.py @@ -5,20 +5,20 @@ # %% def streams(): # generate mock-streams - class MockStream(dict): - def __init__( - self, timestamps, timeseries, effective_srate, channel_format - ): - self["time_series"] = timeseries - self["time_stamps"] = timestamps - self["info"] = {} - self["info"]["effective_srate"] = effective_srate - self["info"]["channel_format"] = channel_format + class MockStream(): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self.time_series = timeseries + self.time_stamps = timestamps + self.effective_srate = effective_srate + self.channel_format = channel_format streams = {} # fastest stream, latest timestamp streams[1] = MockStream( - np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, ["float32"] + np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), + 1000, + "float32" ) # slowest stream, earliest timestamp @@ -26,7 +26,7 @@ def __init__( np.linspace(0.5, 1.5, 251), np.linspace(0.5, 1.5, 251), 250, - ["float32"], + "float32", ) # marker stream @@ -34,14 +34,14 @@ def __init__( [0.2, 1.1071, 1.2, 1.9, 2.5], ["mark_" + str(n) for n in range(0, 5, 1)], 0, - ["string"], + "string", ) # integer datatype stream, streams[4] = MockStream( np.linspace(0.4, 1.4, 251), np.linspace(4, 140, 251, dtype="int32"), 250, - ["int32"], + "int32", ) return streams @@ -59,36 +59,36 @@ def mock(): def test_timestamps(): "check timestamps streams" for s in mock().values(): - assert np.all(np.isclose(s["time_stamps"], np.linspace(1, 1.4, 401))) + assert np.all(np.isclose(s.time_stamps, np.linspace(1, 1.4, 401))) def test_first_timeseries(): assert np.all( - np.isclose(mock()[1]["time_series"], np.linspace(1, 1.4, 401)) + np.isclose(mock()[1].time_series, np.linspace(1, 1.4, 401)) ) def test_second_timeseries(): assert np.all( - np.isclose(mock()[2]["time_series"], np.linspace(1, 1.4, 401)) + np.isclose(mock()[2].time_series, np.linspace(1, 1.4, 401)) ) def test_third_timeseries(): - s = mock()[3]["time_series"] + s = mock()[3].time_series idx = [] for i, _s in enumerate(s): if _s != "": idx.append(i) assert np.all(idx == [107, 200]) - assert mock()[3]["time_stamps"][idx[0]] == 1.107 # shifted to closest fit - assert mock()[3]["time_stamps"][idx[1]] == 1.2 # fits with fs 1000 + assert mock()[3].time_stamps[idx[0]] == 1.107 # shifted to closest fit + assert mock()[3].time_stamps[idx[1]] == 1.2 # fits with fs 1000 def test_fourth_timeseries(): # interpolation is tricky, as it depends on np.around and linear # interpolation, which can not be approximated with np.linspace # we therefore only test first and last value of the series - s = mock()[4]["time_series"] + s = mock()[4].time_series assert np.isclose(s[0], 85) assert np.isclose(s[-1], 140) diff --git a/pyxdf/test/test_limit_streams_to_overlap.py b/pyxdf/test/test_limit_streams_to_overlap.py index 791cc10..c15bfcd 100644 --- a/pyxdf/test/test_limit_streams_to_overlap.py +++ b/pyxdf/test/test_limit_streams_to_overlap.py @@ -3,30 +3,32 @@ # %% def streams(): #generate mock-streams - class MockStream(dict): + class MockStream(): def __init__(self, timestamps, timeseries, effective_srate, channel_format): - self['time_series'] = timeseries - self['time_stamps'] = timestamps - self['info'] = {} - self['info']['effective_srate'] = effective_srate - self['info']['channel_format'] = channel_format + self.time_series = timeseries + self.time_stamps = timestamps + self.effective_srate = effective_srate + self.channel_format = channel_format streams = {} # fastest stream, latest timestamp streams[1] = MockStream(np.linspace(1,2,1001), np.linspace(1,2,1001), - 1000, ['float32']) + 1000, + 'float32') # slowest stream, earliest timestamp streams[2] = MockStream(np.linspace(0.4,1.4,251), np.linspace(0.4,1.4,251), - 250, ['float32']) + 250, + 'float32') # marker stream streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], ['mark_' + str(n) for n in range(0,5,1)], - 0, ['string']) + 0, + 'string') return streams # %% test @@ -36,18 +38,18 @@ def test_timestamps(): olap = _limit_streams_to_overlap(streams()) for s,v in zip(olap.values(), [(1, 1.4), (1, 1.4), (1.1071, 1.2)]): - assert np.isclose(s['time_stamps'][0], v[0]) - assert np.isclose(s['time_stamps'][-1], v[-1]) + assert np.isclose(s.time_stamps[0], v[0]) + assert np.isclose(s.time_stamps[-1], v[-1]) def test_timeseries(): 'test whether the first and last value are as expected' olap = _limit_streams_to_overlap(streams()) for s,v in zip(olap.values(), [(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): - if s['info']['channel_format'] != ['string']: - assert np.isclose(s['time_series'][0], v[0]) - assert np.isclose(s['time_series'][-1], v[-1]) + if s.channel_format != 'string': + assert np.isclose(s.time_series[0], v[0]) + assert np.isclose(s.time_series[-1], v[-1]) else: - assert s['time_series'][0] == v[0] - assert s['time_series'][-1] == v[-1] + assert s.time_series[0] == v[0] + assert s.time_series[-1] == v[-1] diff --git a/pyxdf/test/test_sync_timestamps.py b/pyxdf/test/test_sync_timestamps.py index 9b39031..5991171 100644 --- a/pyxdf/test/test_sync_timestamps.py +++ b/pyxdf/test/test_sync_timestamps.py @@ -12,21 +12,20 @@ class MockStream(dict): def __init__( self, timestamps, timeseries, effective_srate, channel_format ): - self["time_series"] = timeseries - self["time_stamps"] = timestamps - self["info"] = {} - self["info"]["effective_srate"] = effective_srate - self["info"]["channel_format"] = channel_format + self.time_series = timeseries + self.time_stamps = timestamps + self.effective_srate = effective_srate + self.channel_format = channel_format streams = {} # fastest stream, latest timestamp streams[1] = MockStream( - np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, ["float32"] + np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, "float32" ) # slowest stream, earliest timestamp streams[2] = MockStream( - np.linspace(0.1, 1.1, 251), np.linspace(2, 1, 251), 250, ["float32"] + np.linspace(0.1, 1.1, 251), np.linspace(2, 1, 251), 250, "float32" ) # marker stream @@ -34,14 +33,14 @@ def __init__( [0.2, 1.1071, 1.2, 1.9, 2.5], ["mark_" + str(n) for n in range(0, 5, 1)], 0, - ["string"], + "string", ) # integer datatype stream, streams[4] = MockStream( np.linspace(0.1, 1.1, 251), np.linspace(0, 250, 251, dtype="int32"), 250, - ["int32"], + "int32", ) return streams @@ -63,15 +62,15 @@ def resampled(streams): def test_samplingrate_resampled(resampled): "check that for all streams the sampling rate is identical to the fastest" for s in resampled.values(): - assert s["info"]["effective_srate"] == 256 - efs = 1 / np.min(np.diff(s["time_stamps"])) + assert s.effective_srate == 256 + efs = 1 / np.min(np.diff(s.time_stamps)) assert (efs - 256) < 0.001 def test_samplingrate_highest(synced): "check that for all streams the sampling rate is identical to the fastest" for s in synced.values(): - assert s["info"]["effective_srate"] == 1000 + assert s.effective_srate == 1000 def test_timestamps(synced): @@ -79,14 +78,14 @@ def test_timestamps(synced): earliest timestamp is 0.1 from stream 1,4; latest is 2.5 from stream 3""" for s in synced.values(): assert np.all( - np.isclose(s["time_stamps"], np.linspace(0.1, 2.5, 2401)) + np.isclose(s.time_stamps, np.linspace(0.1, 2.5, 2401)) ) def test_identical_timestamp_steps(synced): "all steps between samples should be identical within float precision" for stream in synced.values(): - uq = np.diff(stream["time_stamps"]) + uq = np.diff(stream.time_stamps) assert np.all([np.isclose(u, uq) for u in uq]) @@ -95,11 +94,11 @@ def test_markerstream(synced, streams): which should be shifted towards the next valid sample of the fastest stream""" idx = [] - for marker in streams[3]["time_series"]: - idx.append(synced[3]["time_series"].tolist().index([marker])) + for marker in streams[3].time_series: + idx.append(synced[3].time_series.tolist().index([marker])) assert np.all( np.equal( - synced[3]["time_stamps"][idx], + synced[3].time_stamps[idx], np.array( [0.2, 1.107, 1.2, 1.9, 2.5] # identical # shifted # shifted ), @@ -110,17 +109,17 @@ def test_markerstream(synced, streams): def test_interpolation(synced): """the linearly to 1000 Hz interpolated data from stream 2 should be identical with a linspace with 1001 samples.""" - idx = np.where(~np.isnan(synced[2]["time_series"]))[0] + idx = np.where(~np.isnan(synced[2].time_series))[0] assert np.all( - np.isclose(synced[2]["time_series"][idx], np.linspace(2, 1, 1001)) + np.isclose(synced[2].time_series[idx], np.linspace(2, 1, 1001)) ) def test_extrapolation(synced, streams): """extrapolated data should be nans or "" for markers""" # stream 1 is extrapolated from 0.1 up to 1 and after 2 to 2.5 - idx = np.where(np.isnan(synced[1]["time_series"]))[0] - values = synced[1]["time_stamps"][idx] + idx = np.where(np.isnan(synced[1].time_series))[0] + values = synced[1].time_stamps[idx] expectation = np.hstack( ( np.linspace(0.1, 1, 900, endpoint=False), @@ -130,32 +129,32 @@ def test_extrapolation(synced, streams): assert np.all(np.isclose(values, expectation)) # stream 2 is extrapolated from after 1.1 to 2.5 - idx = np.where(np.isnan(synced[2]["time_series"]))[0] - values = synced[2]["time_stamps"][idx] + idx = np.where(np.isnan(synced[2].time_series))[0] + values = synced[2].time_stamps[idx] expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) assert np.all(np.isclose(values, expectation)) # stream 3 is extrapolated everywhere but at the marker stamps # can be tested easier by checking whether the count of [''] # equals the total samples minus the original markers - values = np.where(synced[3]["time_series"] == [""])[0].shape[0] - expectation = synced[3]["time_series"].shape[0] - len( - streams[3]["time_series"] + values = np.where(synced[3].time_series == [""])[0].shape[0] + expectation = synced[3].time_series.shape[0] - len( + streams[3].time_series ) assert values == expectation # stream 4 is extrapolated from after 1.1 to 2.5 - idx = np.where(np.isnan(synced[4]["time_series"]))[0] - values = synced[4]["time_stamps"][idx] + idx = np.where(np.isnan(synced[4].time_series))[0] + values = synced[4].time_stamps[idx] expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) assert np.all(np.isclose(values, expectation)) def test_identity_after_interpolation(synced, streams): "the data for stream 1 should be identical to original" - idx = np.where(~np.isnan(synced[1]["time_series"]))[0] + idx = np.where(~np.isnan(synced[1].time_series))[0] assert np.all( - np.isclose(synced[1]["time_series"][idx], streams[1]["time_series"]) + np.isclose(synced[1].time_series[idx], streams[1].time_series) ) @@ -166,6 +165,6 @@ def test_integer_interpolation(synced, streams): interpolation is tricky, as it depends on np.around and linear interpolation, which can not be approximated with np.linspace """ - u = np.unique(synced[4]["time_series"]) + u = np.unique(synced[4].time_series) u = np.int64(np.compress(~np.isnan(u), u)) - assert np.all(streams[4]["time_series"] == u) + assert np.all(streams[4].time_series == u) From 90cfdf7a21878562eba29e7606b9110bf3ba3573 Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 20 Nov 2023 00:12:30 +0100 Subject: [PATCH 04/14] Implement align by shifting [WIP] --- pyxdf/__init__.py | 4 +- pyxdf/pyxdf.py | 202 ++++++++++++++------ pyxdf/test/test_data.py | 12 +- pyxdf/test/test_limit_on_synced_stamps.py | 94 --------- pyxdf/test/test_limit_streams_to_overlap.py | 55 ------ pyxdf/test/test_shift_align.py | 13 ++ pyxdf/test/test_sync_timestamps.py | 170 ---------------- 7 files changed, 165 insertions(+), 385 deletions(-) delete mode 100644 pyxdf/test/test_limit_on_synced_stamps.py delete mode 100644 pyxdf/test/test_limit_streams_to_overlap.py create mode 100644 pyxdf/test/test_shift_align.py delete mode 100644 pyxdf/test/test_sync_timestamps.py diff --git a/pyxdf/__init__.py b/pyxdf/__init__.py index 50ecda7..dd4683b 100644 --- a/pyxdf/__init__.py +++ b/pyxdf/__init__.py @@ -8,6 +8,6 @@ __version__ = get_distribution(__name__).version except DistributionNotFound: # package is not installed __version__ = None -from .pyxdf import load_xdf, resolve_streams, match_streaminfos +from .pyxdf import load_xdf, resolve_streams, match_streaminfos, align_streams -__all__ = [load_xdf, resolve_streams, match_streaminfos] +__all__ = [load_xdf, resolve_streams, match_streaminfos, align_streams] diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 04b6f14..d06c471 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -14,18 +14,17 @@ import itertools import gzip from xml.etree.ElementTree import fromstring -from collections import OrderedDict, defaultdict +from collections import OrderedDict, defaultdict, Counter import logging from pathlib import Path import warnings import numpy as np -__all__ = ["load_xdf"] +__all__ = ["load_xdf", "align_streams"] logger = logging.getLogger(__name__) - class StreamData: """Temporary per-stream data.""" @@ -82,9 +81,9 @@ def load_xdf( clock_reset_threshold_offset_stds=10, winsor_threshold=0.0001, verbose=None, - sync_timestamps=False, + align_timestamps=False, use_samplingrate="highest", - overlap_timestamps=False, + only_overlapping=False, ): """Import an XDF file. @@ -125,32 +124,31 @@ def load_xdf( dejitter_timestamps : Whether to perform jitter removal for regularly sampled streams. (default: true) - sync_timestamps: {bool str} + align_timestamps: {bool, str} sync timestamps of all streams sample-wise with the stream of the - highest effective sampling rate (or as set in use_samplingrate). Using sync_timestamps with any + highest effective sampling rate (or as set in use_samplingrate). Using align_timestamps with any method other than linear has dependency on scipy, which is not a hard requirement of pyxdf. If scipy is not installed in your environment, the method supports linear interpolation with numpy. - False -> no syncing - True -> linear syncing - str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, - ‘previous’, ‘next’> for method inherited from - scipy.interpolate.interp1d. + False -> do not align samples + True -> align samples using linear interpolation + str: align data using the method <'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’> as implemented in scipy.interpolate.interp1d. - use_samplingrate: {int str} = "highest" - resamples all channels to an effective sampling rate as given by this argument, using the method as defined by sync_timestamps. Will do nothing if sync_timestamps is False or the sampling rate is not larger than zero. + use_samplingrate: {int, str} = "highest" + resamples all channels to an effective sampling rate as given by this argument, using the method as defined by align_timestamps. Will do nothing if align_timestamps is False or the sampling rate is not larger than zero. + Plase note that setting a sampling rate smaller than the fastest sampling rate of all streams can result in aliasing. int>0: resample to this sampling rate. str:<'highest'> use the highest - overlap_timestamps: bool + only_overlapping: bool = False If true, return only overlapping streams, i.e. all streams - are limited to periods where all streams have data. (default: True) - If false, depends on whether sync_timestamps is set. If set, it + are limited to periods where all streams have data. (default: False) + If false, depends on whether align_timestamps is set. If set, it expands all streams to include the earliest and latest timestamp of - any stream, if not set it simply return streams independently. - ` + any stream, if not set it simply return streams as they are. + on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify the data (for example, sub-sample it). The four input arguments @@ -422,16 +420,16 @@ def load_xdf( tmp = temp[k] tmp.channel_format = stream["info"]["channel_format"][0] - if sync_timestamps: - if type(sync_timestamps) is not str: - sync_timestamps = "linear" - logger.warning('sync_timestamps defaults to "linear"') - temp = _sync_timestamps( - temp, kind=sync_timestamps, use_samplingrate=use_samplingrate + if align_timestamps: + if type(align_timestamps) is not str: + align_timestamps = "linear" + logger.warning('align_timestamps defaults to "linear"') + temp = _align_timestamps( + temp, kind=align_timestamps, use_samplingrate=use_samplingrate ) # limit streams to their overlapping periods - if overlap_timestamps: + if only_overlapping: temp = _limit_streams_to_overlap(temp) for k in streams.keys(): @@ -449,7 +447,7 @@ def load_xdf( stream["time_series"] = tmp.time_series stream["time_stamps"] = tmp.time_stamps streams = [s for s in streams.values()] - return streams, fileheader + return list(streams), fileheader def open_xdf(file): @@ -911,7 +909,7 @@ def _parse_streamheader(xml): def _interpolate( x: np.ndarray, y: np.ndarray, new_x: np.ndarray, kind="linear" ) -> np.ndarray: - """Perform interpolation for _sync_timestamps + """Perform interpolation for _align_timestamps If scipy is not installed, the method falls back to numpy, and then only supports linear interpolation. @@ -935,7 +933,9 @@ def _interpolate( return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) -def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): + + +def _align_timestamps(streams, kind="linear", use_samplingrate="highest"): """Sync all streams to the fastest sampling rate by shifting or upsampling. Depending on a streams channel-format, extrapolation is performed using @@ -974,7 +974,7 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): stamps = [stream.time_stamps for stream in streams.values()] ts_first = min((min(s) for s in stamps)) # earliest sample of all streams ts_last = max((max(s) for s in stamps)) # latest sample of all streams - + full_dur = ts_last-ts_first # generate new timestamps # based on extrapolation of the fastest timestamps towards the maximal # time range of the whole recording @@ -987,28 +987,15 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): f"Unknown parameter for use_samplingrate: {use_samplingrate}" ) fs_step = 1.0 / new_srate - if new_srate == max_fs: - new_timestamps = stamps[ - srates.index(max_fs) - ] # pick fastest stream regardless of finally used srate - num_steps = int((new_timestamps[0] - ts_first) / fs_step) + 1 - front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) - num_steps = int((ts_last - new_timestamps[-1]) / fs_step) + 1 - end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) - new_timestamps = np.concatenate( - (front_stamps, new_timestamps[1:-1], end_stamps), axis=0 - ) - else: - new_timestamps = np.arange(ts_first, ts_last, fs_step) + new_timestamps = np.linspace(ts_first, ts_last, + num=int(full_dur*new_srate)+1, + endpoint=True) # interpolate or shift all streams to the new timestamps for stream in streams.values(): - if (stream.channel_format == "string") and ( - stream.effective_srate == 0 - ): - # you can't really interpolate strings; and streams with srate=0 - # don't have a real sampling rate. One approach to sync them is to - # shift their events to the nearest timestamp of the new + if (stream.channel_format == "string"): + # you can't really interpolate strings. One approach to sync them + # is to shift their events to the nearest timestamp of the new # timestamps shifted_x = [] for x in stream.time_stamps: @@ -1020,11 +1007,11 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): try: idx = shifted_x.index(x) y = stream.time_series[idx] - shifted_y.append([y]) + shifted_y.append(y) except ValueError: - shifted_y.append([""]) - - stream.time_series = np.asanyarray((shifted_y)) + shifted_y.append("") + + stream.time_series = shifted_y stream.time_stamps = new_timestamps elif stream.channel_format in [ @@ -1049,12 +1036,6 @@ def _sync_timestamps(streams, kind="linear", use_samplingrate="highest"): # i am stuck with float64s, as integers have no nans # therefore i round to the nearest integer instead stream.time_series = np.around(stream.time_series, 0) - elif (stream.channel_format == "string") and ( - stream.effective_srate != 0 - ): - warnings.warn( - "Can't interpolate a channel with channel format string and an effective sampling rate != 0" - ) else: raise NotImplementedError( "Don't know how to sync sampling for " @@ -1085,7 +1066,7 @@ def _limit_streams_to_overlap(streams): stream.effective_srate != 0 and stream.channel_format != "string" ): - # extrapolation in _sync_timestamps is done with NaNs + # extrapolation in _align_timestamps is done with NaNs not_extrapolated = np.where(~np.isnan(stream.time_series))[0] ts_first.append(min(stream.time_stamps[not_extrapolated])) ts_last.append(max(stream.time_stamps[not_extrapolated])) @@ -1109,3 +1090,104 @@ def _limit_streams_to_overlap(streams): stream.time_series = stream.time_series[select] return streams + +def _shift_align(old_timestamps, old_timeseries, new_timestamps): + old_timestamps = np.array(old_timestamps) + old_timeseries = np.array(old_timeseries) + new_timestamps = np.array(new_timestamps) + ts_last = old_timestamps[-1] + ts_first = old_timestamps[0] + source = list() + target = list() + new_timeseries = np.empty(( + new_timestamps.shape[0], # new sample count + old_timeseries.shape[1], # old channel count + ), dtype=object) + new_timeseries.fill(np.nan) + too_old = list() + too_young = list() + for nix, nts in enumerate(new_timestamps): + closest = (np.abs(old_timestamps - nts)).argmin() + # remember the edge cases, + if (nts>ts_last): + too_young.append((nix, nts)) + elif (nts < ts_first): + too_old.append((nix,nts)) + else: + closest = (np.abs(old_timestamps - nts)).argmin() + source.append(closest) + target.append(nix) + # check the edge cases, + for nix, nts in reversed(too_old): + closest = (np.abs(old_timestamps - nts)).argmin() + if (closest not in source): + source.append(closest) + target.append(nix) + break + for nix, nts in too_young: + closest = (np.abs(old_timestamps - nts)).argmin() + if (closest not in source): + source.append(closest) + target.append(nix) + break + + if len(set(source)) != len(source): #non-unique mapping + cnt = Counter(source) + toomany = defaultdict(list) + for v,n in zip(source, target): + if cnt[v] != 1: + toomany[old_timestamps[source[v]]].append(new_timestamps[target[n]]) + for k,v in toomany.items(): + print("The old time_stamp ", k, + "is a closest neighbor of", len(v) ,"new time_stamps:", v) + raise RuntimeError("Can not align streams. Could not create an unique mapping") + for chan in range(old_timeseries.shape[1]): + new_timeseries[target, chan] = old_timeseries[source,chan] + return new_timeseries + + +def align_streams(streams, # List[defaultdict] + align_foo=dict(), # defaultdict[int, Callable] + time_stamps=None, # List[float] + sampling_rate=None # float +): + if sampling_rate is not None and time_stamps is not None: + raise ValueError("You can not specify time_stamps and sampling_rate at the same time") + + if sampling_rate is None: + # we pick the effective sampling rate from the fastest stream + srates = [stream["info"]["effective_srate"] for stream in streams] + sampling_rate = max(srates, default=0) + if sampling_rate <= 0: # either no valid stream or all streams are async + warnings.warn("Can not align streams: Fastest effective sampling rate was 0 or smaller.") + return streams + + if time_stamps is None: + # we pick the oldest and youngest timestamp of all streams + stamps = [stream["time_stamps"] for stream in streams] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + full_dur = ts_last-ts_first + n_samples = int(full_dur * sampling_rate)+1 + # we create new regularized timestamps + time_stamps = np.linspace(ts_first, ts_last, n_samples) + + + channels = 0 + for stream in streams: + print(stream) + channels += int(stream["info"]["channel_count"][0]) + # https://stackoverflow.com/questions/1704823/create-numpy-matrix-filled-with-nans The timings show a preference for ndarray.fill(..) as the faster alternative. + aligned_timeseries = np.empty((len(time_stamps), + channels,), dtype=object) + aligned_timeseries.fill(np.nan) + + where = 0 + to = 0 + for stream in streams: + sid = stream["info"]["stream_id"] + new_timeseries = align_foo.get(sid, _shift_align)(stream["time_stamps"], stream["time_series"], time_stamps) + where = to + to += int(stream["info"]["channel_count"][0]) + aligned_timeseries[:, where:to] = new_timeseries + return aligned_timeseries diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index ce782b1..08d4108 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -1,5 +1,5 @@ from pathlib import Path -from pyxdf import load_xdf +from pyxdf import load_xdf, align_streams import pytest import numpy as np @@ -78,7 +78,11 @@ def test_load_file(file): def test_smoketest_sync(example_files): - for file in files: + for file in example_files: streams, header = load_xdf( - file, sync_timestamps="linear", overlap_timestamps=False - ) + file) + if file.endswith("minimal.xdf"): + align_streams(streams) + + + diff --git a/pyxdf/test/test_limit_on_synced_stamps.py b/pyxdf/test/test_limit_on_synced_stamps.py deleted file mode 100644 index 7ff04e0..0000000 --- a/pyxdf/test/test_limit_on_synced_stamps.py +++ /dev/null @@ -1,94 +0,0 @@ -from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap -import numpy as np - - -# %% -def streams(): - # generate mock-streams - class MockStream(): - - def __init__(self, timestamps, timeseries, effective_srate, channel_format): - self.time_series = timeseries - self.time_stamps = timestamps - self.effective_srate = effective_srate - self.channel_format = channel_format - - streams = {} - # fastest stream, latest timestamp - streams[1] = MockStream( - np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), - 1000, - "float32" - ) - - # slowest stream, earliest timestamp - streams[2] = MockStream( - np.linspace(0.5, 1.5, 251), - np.linspace(0.5, 1.5, 251), - 250, - "float32", - ) - - # marker stream - streams[3] = MockStream( - [0.2, 1.1071, 1.2, 1.9, 2.5], - ["mark_" + str(n) for n in range(0, 5, 1)], - 0, - "string", - ) - # integer datatype stream, - streams[4] = MockStream( - np.linspace(0.4, 1.4, 251), - np.linspace(4, 140, 251, dtype="int32"), - 250, - "int32", - ) - - return streams - - -def mock(): - synced = _sync_timestamps(streams()) - return _limit_streams_to_overlap(synced) - - -# %% test -# earliest overlapping timestamp is 1.0 from stream 1 -# latest overlapping timestamp is 1.4 from stream 4 -# fastest strteam is stream 1 with fs 1000 -def test_timestamps(): - "check timestamps streams" - for s in mock().values(): - assert np.all(np.isclose(s.time_stamps, np.linspace(1, 1.4, 401))) - - -def test_first_timeseries(): - assert np.all( - np.isclose(mock()[1].time_series, np.linspace(1, 1.4, 401)) - ) - - -def test_second_timeseries(): - assert np.all( - np.isclose(mock()[2].time_series, np.linspace(1, 1.4, 401)) - ) - - -def test_third_timeseries(): - s = mock()[3].time_series - idx = [] - for i, _s in enumerate(s): - if _s != "": - idx.append(i) - assert np.all(idx == [107, 200]) - assert mock()[3].time_stamps[idx[0]] == 1.107 # shifted to closest fit - assert mock()[3].time_stamps[idx[1]] == 1.2 # fits with fs 1000 - - -def test_fourth_timeseries(): - # interpolation is tricky, as it depends on np.around and linear - # interpolation, which can not be approximated with np.linspace - # we therefore only test first and last value of the series - s = mock()[4].time_series - assert np.isclose(s[0], 85) - assert np.isclose(s[-1], 140) diff --git a/pyxdf/test/test_limit_streams_to_overlap.py b/pyxdf/test/test_limit_streams_to_overlap.py deleted file mode 100644 index c15bfcd..0000000 --- a/pyxdf/test/test_limit_streams_to_overlap.py +++ /dev/null @@ -1,55 +0,0 @@ -from pyxdf.pyxdf import _limit_streams_to_overlap -import numpy as np -# %% -def streams(): - #generate mock-streams - class MockStream(): - - def __init__(self, timestamps, timeseries, effective_srate, channel_format): - self.time_series = timeseries - self.time_stamps = timestamps - self.effective_srate = effective_srate - self.channel_format = channel_format - - streams = {} - # fastest stream, latest timestamp - streams[1] = MockStream(np.linspace(1,2,1001), - np.linspace(1,2,1001), - 1000, - 'float32') - - # slowest stream, earliest timestamp - streams[2] = MockStream(np.linspace(0.4,1.4,251), - np.linspace(0.4,1.4,251), - 250, - 'float32') - - # marker stream - streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], - ['mark_' + str(n) for n in range(0,5,1)], - 0, - 'string') - return streams - -# %% test - -def test_timestamps(): - 'test whether the first and last timestamps have been selected as expected' - olap = _limit_streams_to_overlap(streams()) - for s,v in zip(olap.values(), - [(1, 1.4), (1, 1.4), (1.1071, 1.2)]): - assert np.isclose(s.time_stamps[0], v[0]) - assert np.isclose(s.time_stamps[-1], v[-1]) - -def test_timeseries(): - 'test whether the first and last value are as expected' - olap = _limit_streams_to_overlap(streams()) - for s,v in zip(olap.values(), - [(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): - if s.channel_format != 'string': - assert np.isclose(s.time_series[0], v[0]) - assert np.isclose(s.time_series[-1], v[-1]) - else: - assert s.time_series[0] == v[0] - assert s.time_series[-1] == v[-1] - diff --git a/pyxdf/test/test_shift_align.py b/pyxdf/test/test_shift_align.py new file mode 100644 index 0000000..d1f153f --- /dev/null +++ b/pyxdf/test/test_shift_align.py @@ -0,0 +1,13 @@ +from pyxdf.pyxdf import _shift_align +import numpy as np + +def test_shift_align(): + old_timestamps = np.linspace(1, 1.5, 51) + old_timeseries = np.empty((51,1)) + old_timeseries[:,0] = np.linspace(0, 50, 51) + new_timestamps = np.linspace(1.001, 1.5001, 51) + new_timeseries = _shift_align(old_timestamps, old_timeseries, new_timestamps) + for x, y, xhat in zip(old_timestamps, old_timeseries, new_timestamps): + print(x, xhat, y[0]) + new_timestamps = np.linspace(0.99, 1.499, 51) + _shift_align(old_timestamps, old_timeseries, new_timestamps) \ No newline at end of file diff --git a/pyxdf/test/test_sync_timestamps.py b/pyxdf/test/test_sync_timestamps.py deleted file mode 100644 index 5991171..0000000 --- a/pyxdf/test/test_sync_timestamps.py +++ /dev/null @@ -1,170 +0,0 @@ -from pyxdf.pyxdf import _sync_timestamps -import numpy as np -from copy import deepcopy -import pytest - - -# %% -@pytest.fixture(scope="session") -def streams(): - # generate mock-streams - class MockStream(dict): - def __init__( - self, timestamps, timeseries, effective_srate, channel_format - ): - self.time_series = timeseries - self.time_stamps = timestamps - self.effective_srate = effective_srate - self.channel_format = channel_format - - streams = {} - # fastest stream, latest timestamp - streams[1] = MockStream( - np.linspace(1, 2, 1001), np.linspace(1, 2, 1001), 1000, "float32" - ) - - # slowest stream, earliest timestamp - streams[2] = MockStream( - np.linspace(0.1, 1.1, 251), np.linspace(2, 1, 251), 250, "float32" - ) - - # marker stream - streams[3] = MockStream( - [0.2, 1.1071, 1.2, 1.9, 2.5], - ["mark_" + str(n) for n in range(0, 5, 1)], - 0, - "string", - ) - # integer datatype stream, - streams[4] = MockStream( - np.linspace(0.1, 1.1, 251), - np.linspace(0, 250, 251, dtype="int32"), - 250, - "int32", - ) - - return streams - - -@pytest.fixture(scope="session") -def synced(streams): - synced = _sync_timestamps(deepcopy(streams)) - return synced - - -@pytest.fixture(scope="session") -def resampled(streams): - resampled = _sync_timestamps(deepcopy(streams), use_samplingrate=256) - return resampled - - -# %% test -def test_samplingrate_resampled(resampled): - "check that for all streams the sampling rate is identical to the fastest" - for s in resampled.values(): - assert s.effective_srate == 256 - efs = 1 / np.min(np.diff(s.time_stamps)) - assert (efs - 256) < 0.001 - - -def test_samplingrate_highest(synced): - "check that for all streams the sampling rate is identical to the fastest" - for s in synced.values(): - assert s.effective_srate == 1000 - - -def test_timestamps(synced): - """test whether all timestamps are identical and as expected - earliest timestamp is 0.1 from stream 1,4; latest is 2.5 from stream 3""" - for s in synced.values(): - assert np.all( - np.isclose(s.time_stamps, np.linspace(0.1, 2.5, 2401)) - ) - - -def test_identical_timestamp_steps(synced): - "all steps between samples should be identical within float precision" - for stream in synced.values(): - uq = np.diff(stream.time_stamps) - assert np.all([np.isclose(u, uq) for u in uq]) - - -def test_markerstream(synced, streams): - """check markers, they should be identical to the original but the second, - which should be shifted towards the next valid sample of the fastest - stream""" - idx = [] - for marker in streams[3].time_series: - idx.append(synced[3].time_series.tolist().index([marker])) - assert np.all( - np.equal( - synced[3].time_stamps[idx], - np.array( - [0.2, 1.107, 1.2, 1.9, 2.5] # identical # shifted # shifted - ), - ) - ) - - -def test_interpolation(synced): - """the linearly to 1000 Hz interpolated data from stream 2 should - be identical with a linspace with 1001 samples.""" - idx = np.where(~np.isnan(synced[2].time_series))[0] - assert np.all( - np.isclose(synced[2].time_series[idx], np.linspace(2, 1, 1001)) - ) - - -def test_extrapolation(synced, streams): - """extrapolated data should be nans or "" for markers""" - # stream 1 is extrapolated from 0.1 up to 1 and after 2 to 2.5 - idx = np.where(np.isnan(synced[1].time_series))[0] - values = synced[1].time_stamps[idx] - expectation = np.hstack( - ( - np.linspace(0.1, 1, 900, endpoint=False), - np.linspace(2.001, 2.5, 500, endpoint=True), - ) - ) - assert np.all(np.isclose(values, expectation)) - - # stream 2 is extrapolated from after 1.1 to 2.5 - idx = np.where(np.isnan(synced[2].time_series))[0] - values = synced[2].time_stamps[idx] - expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) - assert np.all(np.isclose(values, expectation)) - - # stream 3 is extrapolated everywhere but at the marker stamps - # can be tested easier by checking whether the count of [''] - # equals the total samples minus the original markers - values = np.where(synced[3].time_series == [""])[0].shape[0] - expectation = synced[3].time_series.shape[0] - len( - streams[3].time_series - ) - assert values == expectation - - # stream 4 is extrapolated from after 1.1 to 2.5 - idx = np.where(np.isnan(synced[4].time_series))[0] - values = synced[4].time_stamps[idx] - expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) - assert np.all(np.isclose(values, expectation)) - - -def test_identity_after_interpolation(synced, streams): - "the data for stream 1 should be identical to original" - idx = np.where(~np.isnan(synced[1].time_series))[0] - assert np.all( - np.isclose(synced[1].time_series[idx], streams[1].time_series) - ) - - -def test_integer_interpolation(synced, streams): - """the data of stream 4 should be all integers. - - .. note:: - interpolation is tricky, as it depends on np.around and linear - interpolation, which can not be approximated with np.linspace - """ - u = np.unique(synced[4].time_series) - u = np.int64(np.compress(~np.isnan(u), u)) - assert np.all(streams[4].time_series == u) From c53c95f78a6182c94565a8b2ee474c9f1a5c60f0 Mon Sep 17 00:00:00 2001 From: agricolab Date: Wed, 29 Nov 2023 21:51:23 +0100 Subject: [PATCH 05/14] Raise when not all old samples were assigned Test slightly delayed, slightly earlier, and jittered timestamps --- pyxdf/pyxdf.py | 5 +++- pyxdf/test/test_shift_align.py | 43 +++++++++++++++++++++++++++------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index d06c471..7a46860 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -1130,7 +1130,10 @@ def _shift_align(old_timestamps, old_timeseries, new_timestamps): source.append(closest) target.append(nix) break - + + if len(set(source)) != len(old_timestamps): + missed = len(old_timestamps)-len(set(source)) + raise RuntimeError(f"Too few new timestamps. {missed} of {len(old_timestamps)} old samples could not be assigned.") if len(set(source)) != len(source): #non-unique mapping cnt = Counter(source) toomany = defaultdict(list) diff --git a/pyxdf/test/test_shift_align.py b/pyxdf/test/test_shift_align.py index d1f153f..834d76d 100644 --- a/pyxdf/test/test_shift_align.py +++ b/pyxdf/test/test_shift_align.py @@ -1,13 +1,38 @@ from pyxdf.pyxdf import _shift_align import numpy as np +import pytest +old_timestamps = np.linspace(1.0, 1.5, 51) +old_timeseries = np.empty((51,1)) +old_timeseries[:,0] = np.linspace(0, 50, 51) -def test_shift_align(): - old_timestamps = np.linspace(1, 1.5, 51) - old_timeseries = np.empty((51,1)) - old_timeseries[:,0] = np.linspace(0, 50, 51) - new_timestamps = np.linspace(1.001, 1.5001, 51) +def test_shift_align_too_few_new_stamps(): + # not all old samples were assigned + new_timestamps = np.linspace(1.001, 1.5001, 50) + with pytest.raises(RuntimeError): + new_timeseries = _shift_align(old_timestamps, old_timeseries, new_timestamps) + +def test_shift_align_slightly_later(): + print("\n==================") + new_timestamps = np.arange(1.001, 1.5011, 0.01) new_timeseries = _shift_align(old_timestamps, old_timeseries, new_timestamps) - for x, y, xhat in zip(old_timestamps, old_timeseries, new_timestamps): - print(x, xhat, y[0]) - new_timestamps = np.linspace(0.99, 1.499, 51) - _shift_align(old_timestamps, old_timeseries, new_timestamps) \ No newline at end of file + for x, y, xhat, yhat in zip(old_timestamps, old_timeseries, new_timestamps, new_timeseries): + print(f"{x:3.4f} -> {xhat:3.4f} = {y[0]:3.0f} / {yhat[0]:3.0f} ") + assert y == yhat + +def test_shift_align_slightly_earlier(): + print("\n==================") + new_timestamps = np.arange(0.999, 1.499, 0.01) + new_timeseries= _shift_align(old_timestamps, old_timeseries, new_timestamps) + + for x, y, xhat, yhat in zip(old_timestamps, old_timeseries, new_timestamps, new_timeseries): + print(f"{x:3.4f} -> {xhat:3.4f} = {y[0]:3.0f} / {yhat[0]:3.0f} ") + assert y == yhat + +def test_shift_align_jittered(): + print("\n==================") + jittered_timestamps = np.random.randn(*old_timestamps.shape)*0.0005 + jittered_timestamps += old_timestamps + new_timeseries = _shift_align(jittered_timestamps, old_timeseries, old_timestamps) + for x, y, xhat, yhat in zip(jittered_timestamps, old_timeseries, old_timestamps, new_timeseries): + print(f"{x:3.4f} -> {xhat:3.4f} = {y[0]:3.0f} / {yhat[0]:3.0f} ") + assert y == yhat \ No newline at end of file From 1050094817c0723cb420419a243a9d213299632c Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 30 Nov 2023 09:16:43 +0100 Subject: [PATCH 06/14] Test edges --- pyxdf/test/test_shift_align.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pyxdf/test/test_shift_align.py b/pyxdf/test/test_shift_align.py index 834d76d..6e6a9b9 100644 --- a/pyxdf/test/test_shift_align.py +++ b/pyxdf/test/test_shift_align.py @@ -35,4 +35,24 @@ def test_shift_align_jittered(): new_timeseries = _shift_align(jittered_timestamps, old_timeseries, old_timestamps) for x, y, xhat, yhat in zip(jittered_timestamps, old_timeseries, old_timestamps, new_timeseries): print(f"{x:3.4f} -> {xhat:3.4f} = {y[0]:3.0f} / {yhat[0]:3.0f} ") - assert y == yhat \ No newline at end of file + assert y == yhat + +def test_shift_align_edges(): + print("\n==================") + new_timestamps = np.arange(0.5, 1.7, 0.01) + # idx = np.argwhere(old_timestamps<1.2)[-1][0]+1 + idx = len(old_timestamps) + new_timeseries = _shift_align(old_timestamps[:idx], old_timeseries, new_timestamps) + for xhat, yhat in zip(new_timestamps, new_timeseries): + delta = xhat-old_timestamps + idx_old = np.argmin(np.abs(delta)) + x = old_timestamps[idx_old] + if xhat < 1.00 or xhat > 1.5001: + y = np.nan + else: + y = old_timeseries[idx_old,0] + print(f"{x:3.4f} -> {xhat:3.4f} = {y:3.0f} / {yhat[0]:3.0f} ") + if (np.isnan(y)): + assert np.isnan(yhat[0]) + else: + assert y == yhat[0] \ No newline at end of file From 7727dae598b52606454d97d1105214ce7857da1d Mon Sep 17 00:00:00 2001 From: agricolab Date: Thu, 30 Nov 2023 12:33:19 +0100 Subject: [PATCH 07/14] Propagate segments to ["info"]["segments"] --- pyxdf/pyxdf.py | 49 +++++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 7a46860..cbe529f 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -81,10 +81,7 @@ def load_xdf( clock_reset_threshold_offset_stds=10, winsor_threshold=0.0001, verbose=None, - align_timestamps=False, - use_samplingrate="highest", - only_overlapping=False, -): + ): """Import an XDF file. This is an importer for multi-stream XDF (Extensible Data Format) @@ -399,6 +396,10 @@ def load_xdf( ) # perform jitter removal if requested + for stream in temp.values(): + #initialize segment list in case jitter_removal was not selected + stream.segments = [(0, len(stream.time_series)-1)] #inclusive + if dejitter_timestamps: logger.info(" performing jitter removal...") temp = _jitter_removal( @@ -413,24 +414,8 @@ def load_xdf( stream.effective_srate = len(stream.time_stamps) / duration else: stream.effective_srate = 0.0 + - # sync sampling with the fastest timeseries by interpolation / shifting - for k in streams.keys(): - stream = streams[k] - tmp = temp[k] - tmp.channel_format = stream["info"]["channel_format"][0] - - if align_timestamps: - if type(align_timestamps) is not str: - align_timestamps = "linear" - logger.warning('align_timestamps defaults to "linear"') - temp = _align_timestamps( - temp, kind=align_timestamps, use_samplingrate=use_samplingrate - ) - - # limit streams to their overlapping periods - if only_overlapping: - temp = _limit_streams_to_overlap(temp) for k in streams.keys(): stream = streams[k] @@ -444,6 +429,7 @@ def load_xdf( ) stream["info"]["stream_id"] = k stream["info"]["effective_srate"] = tmp.effective_srate + stream["info"]["segments"] = tmp.segments stream["time_series"] = tmp.time_series stream["time_stamps"] = tmp.time_stamps streams = [s for s in streams.values()] @@ -689,22 +675,25 @@ def _clock_sync( def _jitter_removal(streams, threshold_seconds=1, threshold_samples=500): for stream_id, stream in streams.items(): stream.effective_srate = 0 # will be recalculated if possible - nsamples = len(stream.time_stamps) + stream.segments = [] + nsamples = len(stream.time_stamps) if nsamples > 0 and stream.srate > 0: # Identify breaks in the time_stamps diffs = np.diff(stream.time_stamps) - b_breaks = diffs > np.max( + threshold = np.max( (threshold_seconds, threshold_samples * stream.tdiff) ) + b_breaks = diffs > threshold # find indices (+ 1 to compensate for lost sample in np.diff) - break_inds = np.where(b_breaks)[0] + 1 - + break_inds = np.where(b_breaks)[0] + 1 # Get indices delimiting segments without breaks # 0th sample is a segment start and last sample is a segment stop seg_starts = np.hstack(([0], break_inds)) seg_stops = np.hstack((break_inds - 1, nsamples - 1)) # inclusive - stream.seg_starts = seg_starts - stream.seg_stops = seg_stops + for a,b in zip(seg_starts, seg_stops): + stream.segments.append((a,b)) + + stream.seg_stops = seg_stops.tolist() # Process each segment separately for start_ix, stop_ix in zip(seg_starts, seg_stops): # Calculate time stamps assuming constant intervals within each @@ -724,15 +713,15 @@ def _jitter_removal(streams, threshold_seconds=1, threshold_samples=500): stream.time_stamps[seg_stops] + stream.tdiff ) - stream.time_stamps[seg_starts] stream.effective_srate = np.sum(counts) / np.sum(durations) - + else: + stream.segments = [0, nsamples-1] srate, effective_srate = stream.srate, stream.effective_srate if srate != 0 and np.abs(srate - effective_srate) / srate > 0.1: msg = ( "Stream %d: Calculated effective sampling rate %.4f Hz is" " different from specified rate %.4f Hz." ) - logger.warning(msg, stream_id, effective_srate, srate) - + logger.warning(msg, stream_id, effective_srate, srate) return streams From 80e90b994b486214358a5831aaefcc9d9b80bbba Mon Sep 17 00:00:00 2001 From: agricolab Date: Tue, 5 Dec 2023 23:15:09 +0100 Subject: [PATCH 08/14] Account for segments in old_stream aligned_timestamps can not have gaps when data-driven, as it is created by np.linspace. Only when an aligned_timestamps is supported by the user, can this list have 'gaps' --- pyxdf/pyxdf.py | 57 ++++++++++++++++++++++++++--------------- pyxdf/test/test_data.py | 21 ++++++++++++--- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index cbe529f..fee19ac 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -73,7 +73,7 @@ def load_xdf( synchronize_clocks=True, handle_clock_resets=True, dejitter_timestamps=True, - jitter_break_threshold_seconds=1, + jitter_break_threshold_seconds=1.0, jitter_break_threshold_samples=500, clock_reset_threshold_seconds=5, clock_reset_threshold_stds=5, @@ -1140,11 +1140,11 @@ def _shift_align(old_timestamps, old_timeseries, new_timestamps): def align_streams(streams, # List[defaultdict] align_foo=dict(), # defaultdict[int, Callable] - time_stamps=None, # List[float] + aligned_timestamps=None, # List[float] sampling_rate=None # float ): - if sampling_rate is not None and time_stamps is not None: - raise ValueError("You can not specify time_stamps and sampling_rate at the same time") + if sampling_rate is not None and aligned_timestamps is not None: + raise ValueError("You can not specify aligned_timestamps and sampling_rate at the same time") if sampling_rate is None: # we pick the effective sampling rate from the fastest stream @@ -1154,32 +1154,49 @@ def align_streams(streams, # List[defaultdict] warnings.warn("Can not align streams: Fastest effective sampling rate was 0 or smaller.") return streams - if time_stamps is None: + + if aligned_timestamps is None: # we pick the oldest and youngest timestamp of all streams stamps = [stream["time_stamps"] for stream in streams] ts_first = min((min(s) for s in stamps)) ts_last = max((max(s) for s in stamps)) - full_dur = ts_last-ts_first - n_samples = int(full_dur * sampling_rate)+1 + full_dur = ts_last-ts_first + n_samples = int(np.round((full_dur * sampling_rate),0))+1 # we create new regularized timestamps - time_stamps = np.linspace(ts_first, ts_last, n_samples) - - + aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) + channels = 0 for stream in streams: - print(stream) + # print(stream) channels += int(stream["info"]["channel_count"][0]) # https://stackoverflow.com/questions/1704823/create-numpy-matrix-filled-with-nans The timings show a preference for ndarray.fill(..) as the faster alternative. - aligned_timeseries = np.empty((len(time_stamps), + aligned_timeseries = np.empty((len(aligned_timestamps), channels,), dtype=object) aligned_timeseries.fill(np.nan) - where = 0 - to = 0 + chan_start = 0 + chan_end = 0 for stream in streams: - sid = stream["info"]["stream_id"] - new_timeseries = align_foo.get(sid, _shift_align)(stream["time_stamps"], stream["time_series"], time_stamps) - where = to - to += int(stream["info"]["channel_count"][0]) - aligned_timeseries[:, where:to] = new_timeseries - return aligned_timeseries + sid = stream["info"]["stream_id"] + align = align_foo.get(sid, _shift_align) + chan_cnt = int(stream["info"]["channel_count"][0]) + new_timeseries = np.empty((len(aligned_timestamps), chan_cnt), dtype=object) + new_timeseries.fill(np.nan) + for seg_start, seg_stop in stream["info"]["segments"]: + _new_timeseries = align( + stream["time_stamps"][seg_start:seg_stop+1], + stream["time_series"][seg_start:seg_stop+1], + aligned_timestamps) + # pick indices of the NEW timestamps closest to when segments start and stop + a = stream["time_stamps"][seg_start] + b = stream["time_stamps"][seg_stop] + aix = np.argmin(np.abs(aligned_timestamps-a)) + bix = np.argmin(np.abs(aligned_timestamps-b)) + # and store only this aligned segment, leaving the rest as nans (or aligned as other segments) + new_timeseries[aix:bix+1] = _new_timeseries[aix:bix+1] + + # store the new timeseries at the respective channel indices in the 2D array + chan_start = chan_end + chan_end += chan_cnt + aligned_timeseries[:, chan_start:chan_end] = new_timeseries + return aligned_timeseries, aligned_timestamps diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index 08d4108..d5cfa7f 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -77,12 +77,27 @@ def test_load_file(file): np.testing.assert_array_almost_equal(streams[1]["time_stamps"], t) -def test_smoketest_sync(example_files): +def test_smoketest_sync_unsegmented(example_files): + for file in example_files: + streams, header = load_xdf(file) + if file.endswith("minimal.xdf"): + a_series, a_stamps = align_streams(streams) + print("unsegmented") + for d, s in zip(a_series, a_stamps): + print(f"{s:5.3f} : {d}") + +def test_smoketest_sync_with_gaps(example_files): for file in example_files: streams, header = load_xdf( - file) + file, + jitter_break_threshold_seconds=0.001, jitter_break_threshold_samples=1 + ) if file.endswith("minimal.xdf"): - align_streams(streams) + a_series, a_stamps = align_streams(streams) + print("segmented") + #TODO swallows some samples + for d, s in zip(a_series, a_stamps): + print(f"{s:5.3f} : {d}") From 0fcac6beaff106196991019a03ef6c02bbf88c6d Mon Sep 17 00:00:00 2001 From: agricolab Date: Tue, 5 Dec 2023 23:18:29 +0100 Subject: [PATCH 09/14] Add test for forced aligned_timestamps --- pyxdf/test/test_data.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index d5cfa7f..c69a4a0 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -85,6 +85,15 @@ def test_smoketest_sync_unsegmented(example_files): print("unsegmented") for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") + +def test_smoketest_sync_forced(example_files): + for file in example_files: + streams, header = load_xdf(file) + if file.endswith("minimal.xdf"): + a_series, a_stamps = align_streams(streams, aligned_timestamps=np.arange(5.001, 5.92, 0.1) ) + print("forced") + for d, s in zip(a_series, a_stamps): + print(f"{s:5.3f} : {d}") def test_smoketest_sync_with_gaps(example_files): for file in example_files: @@ -94,8 +103,7 @@ def test_smoketest_sync_with_gaps(example_files): ) if file.endswith("minimal.xdf"): a_series, a_stamps = align_streams(streams) - print("segmented") - #TODO swallows some samples + print("segmented") for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") From b673ea7e8d6309c32237b19d5653dd5d1b1487be Mon Sep 17 00:00:00 2001 From: agricolab Date: Tue, 5 Dec 2023 23:29:07 +0100 Subject: [PATCH 10/14] Switch to arange to handle enforced sampling_rate --- pyxdf/pyxdf.py | 9 +++++++-- pyxdf/test/test_data.py | 10 ++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index fee19ac..68f0119 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -1161,9 +1161,14 @@ def align_streams(streams, # List[defaultdict] ts_first = min((min(s) for s in stamps)) ts_last = max((max(s) for s in stamps)) full_dur = ts_last-ts_first - n_samples = int(np.round((full_dur * sampling_rate),0))+1 + step = 1/sampling_rate # we create new regularized timestamps - aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) + aligned_timestamps = np.arange(ts_first, ts_last+step/2, step) + # using np.linspace only differs in step if n_samples is different (as n_samples must be an integer number (see implementation below). + # therefore we stick with np.arange (in spite of possible floating point error accumulation, but to make sure that ts_last is included, we add a half-step. This therefore comes at the cost of a overshoot, but i consider this acceptable considering this stamp would only be from one stream, and not part of all other and therefore is kind of arbitray anyways. + # linspace implementation: + # n_samples = int(np.round((full_dur * sampling_rate),0))+1 + # aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) channels = 0 for stream in streams: diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index c69a4a0..4d68ff1 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -90,8 +90,14 @@ def test_smoketest_sync_forced(example_files): for file in example_files: streams, header = load_xdf(file) if file.endswith("minimal.xdf"): - a_series, a_stamps = align_streams(streams, aligned_timestamps=np.arange(5.001, 5.92, 0.1) ) - print("forced") + print("forced_stamps") + a_series, a_stamps = align_streams(streams, aligned_timestamps=np.arange(5.001, 5.92, 0.1) ) + for d, s in zip(a_series, a_stamps): + print(f"{s:5.3f} : {d}") + + print("forced_rate") + a_series, a_stamps = align_streams(streams, + sampling_rate=10.15) for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") From eb06c6cc885318ff023c8a0e432fc043128f573c Mon Sep 17 00:00:00 2001 From: agricolab Date: Tue, 5 Dec 2023 23:35:59 +0100 Subject: [PATCH 11/14] Test align_foo dict --- pyxdf/pyxdf.py | 160 +--------------------------------------- pyxdf/test/test_data.py | 11 ++- 2 files changed, 11 insertions(+), 160 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 68f0119..985cb0c 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -901,7 +901,7 @@ def _interpolate( """Perform interpolation for _align_timestamps If scipy is not installed, the method falls back to numpy, and then only - supports linear interpolation. + supports linear interpolation. Otherwise, it supports ‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, or ‘next’. """ try: from scipy.interpolate import interp1d @@ -922,164 +922,6 @@ def _interpolate( return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) - - -def _align_timestamps(streams, kind="linear", use_samplingrate="highest"): - """Sync all streams to the fastest sampling rate by shifting or upsampling. - - Depending on a streams channel-format, extrapolation is performed using - with NaNs (numerical formats) or with [''] (string format). - - Interpolation is only performed for numeric values, and depending on the - kind argument which is inherited from scipy.interpolate.interp1d. Consider - that If the channel format is an integer type (i.e. 'int8', 'int16', - 'int32', or 'int64'), integer output is enforced by rounding the values. - Additionally, consider that not all interpolation methods are convex, i.e. - for some kinds, you might receive values above or below the desired - integer type. There is no correction implemented for this, as it is assumed - this is a desired behavior if you give the appropriate argument. - - For string formats, events are shifted towards the nearest feasible - timepoint. Any time-stamps without a marker get then assigned an empty - marker, i.e. ['']. - """ - # selecting the stream with the highest effective sampling rate - srates = [stream.effective_srate for stream in streams.values()] - max_fs = max(srates, default=0) - if max_fs == 0: # either no valid stream or all streams are async - return streams - if srates.count(max_fs) > 1: - # highly unlikely, with floating point precision and sampling noise - # but anyways: better safe than sorry - logger.warning( - "I found at least two streams with identical effective " - "srate. I select one at random for syncing timestamps." - ) - - # selecting maximal time range of the whole recording - # streams with fs=0 might are not dejittered be default, and therefore - # indexing first and last might miss the earliest/latest - # we therefore take the min and max timestamp - stamps = [stream.time_stamps for stream in streams.values()] - ts_first = min((min(s) for s in stamps)) # earliest sample of all streams - ts_last = max((max(s) for s in stamps)) # latest sample of all streams - full_dur = ts_last-ts_first - # generate new timestamps - # based on extrapolation of the fastest timestamps towards the maximal - # time range of the whole recording - if use_samplingrate == "highest": - new_srate = max_fs - elif type(use_samplingrate) == int: - new_srate = use_samplingrate - else: - raise NotImplementedError( - f"Unknown parameter for use_samplingrate: {use_samplingrate}" - ) - fs_step = 1.0 / new_srate - new_timestamps = np.linspace(ts_first, ts_last, - num=int(full_dur*new_srate)+1, - endpoint=True) - - # interpolate or shift all streams to the new timestamps - for stream in streams.values(): - if (stream.channel_format == "string"): - # you can't really interpolate strings. One approach to sync them - # is to shift their events to the nearest timestamp of the new - # timestamps - shifted_x = [] - for x in stream.time_stamps: - argmin = (abs(new_timestamps - x)).argmin() - shifted_x.append(new_timestamps[argmin]) - - shifted_y = [] - for x in new_timestamps: - try: - idx = shifted_x.index(x) - y = stream.time_series[idx] - shifted_y.append(y) - except ValueError: - shifted_y.append("") - - stream.time_series = shifted_y - stream.time_stamps = new_timestamps - - elif stream.channel_format in [ - "float32", - "double64", - "int8", - "int16", - "int32", - "int64", - ]: - # continuous interpolation is possible using interp1d - # discrete interpolation requires some finetuning - # bounds_error=False replaces everything outside of interpolation - # zone with NaNs - y = stream.time_series - x = stream.time_stamps - - stream.time_series = _interpolate(x, y, new_timestamps, kind) - stream.time_stamps = new_timestamps - - if stream.channel_format in ["int8", "int16", "int32", "int64"]: - # i am stuck with float64s, as integers have no nans - # therefore i round to the nearest integer instead - stream.time_series = np.around(stream.time_series, 0) - else: - raise NotImplementedError( - "Don't know how to sync sampling for " - "channel_format=" - "{}".format(stream.channel_format) - ) - stream.effective_srate = new_srate - - return streams - - -def _limit_streams_to_overlap(streams): - """takes streams, returns streams limited to time periods overlapping - between all streams - - The overlapping periods start and end for each streams with the first and - last sample completely within the overlapping period. - If time_stamps have been synced, these are the same time-points for all - streams. Consider that in the case of unsynced time-stamps, the time-stamps - can not be exactly equal! - """ - ts_first, ts_last = [], [] - for stream in streams.values(): - # skip streams with fs=0 or if they send strings, because they might - # just not yet have send anything on purpose (i.e. markers) - # while other data was already being recorded. - if ( - stream.effective_srate != 0 - and stream.channel_format != "string" - ): - # extrapolation in _align_timestamps is done with NaNs - not_extrapolated = np.where(~np.isnan(stream.time_series))[0] - ts_first.append(min(stream.time_stamps[not_extrapolated])) - ts_last.append(max(stream.time_stamps[not_extrapolated])) - - ts_first = max(ts_first) - ts_last = min(ts_last) - for stream in streams.values(): - # use np.around to prevent floating point hickups - around = np.around(stream.time_stamps, 15) - a = np.where(around >= ts_first)[0] - b = np.where(around <= ts_last)[0] - select = np.intersect1d(a, b) - if type(stream.time_stamps) is list: - stream.time_stamps = [stream.time_stamps[s] for s in select] - else: - stream.time_stamps = stream.time_stamps[select] - - if type(stream.time_series) is list: - stream.time_series = [stream.time_series[s] for s in select] - else: - stream.time_series = stream.time_series[select] - - return streams - def _shift_align(old_timestamps, old_timeseries, new_timestamps): old_timestamps = np.array(old_timestamps) old_timeseries = np.array(old_timeseries) diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index 4d68ff1..3ed7b93 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -81,10 +81,13 @@ def test_smoketest_sync_unsegmented(example_files): for file in example_files: streams, header = load_xdf(file) if file.endswith("minimal.xdf"): - a_series, a_stamps = align_streams(streams) print("unsegmented") + a_series, a_stamps = align_streams(streams) for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") + + + def test_smoketest_sync_forced(example_files): for file in example_files: @@ -101,6 +104,12 @@ def test_smoketest_sync_forced(example_files): for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") + from pyxdf.pyxdf import _interpolate + print("forced cubic") + a_series, a_stamps = align_streams(streams, align_foo={0:lambda x,y,xh: _interpolate(x,y,xh, "cubic")}) + for d, s in zip(a_series, a_stamps): + print(f"{s:5.3f} : {d}") + def test_smoketest_sync_with_gaps(example_files): for file in example_files: streams, header = load_xdf( From 8fa4645f50904dff6f3fe00ada57c06d1c87d6dc Mon Sep 17 00:00:00 2001 From: agricolab Date: Wed, 13 Dec 2023 21:16:45 +0100 Subject: [PATCH 12/14] Refactor into own file Remove docstrings for unused arguments in load_xdf --- pyxdf/align.py | 181 ++++++++++++++++++++++++++++++++ pyxdf/pyxdf.py | 186 +-------------------------------- pyxdf/test/test_data.py | 2 +- pyxdf/test/test_shift_align.py | 2 +- 4 files changed, 186 insertions(+), 185 deletions(-) create mode 100644 pyxdf/align.py diff --git a/pyxdf/align.py b/pyxdf/align.py new file mode 100644 index 0000000..83f0ab9 --- /dev/null +++ b/pyxdf/align.py @@ -0,0 +1,181 @@ +import numpy as np +import warnings +from collections import defaultdict, Counter + + +def _interpolate( + x: np.ndarray, y: np.ndarray, new_x: np.ndarray, kind="linear" +) -> np.ndarray: + """Perform interpolation for _align_timestamps + + If scipy is not installed, the method falls back to numpy, and then only + supports linear interpolation. Otherwise, it supports ‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, or ‘next’. + """ + try: + from scipy.interpolate import interp1d + + f = interp1d( + x, + y, + kind=kind, + axis=0, + assume_sorted=True, # speed up + bounds_error=False, + ) + return f(new_x) + except ImportError as e: + if kind != "linear": + raise e + else: + return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) + + +def _shift_align(old_timestamps, old_timeseries, new_timestamps): + old_timestamps = np.array(old_timestamps) + old_timeseries = np.array(old_timeseries) + new_timestamps = np.array(new_timestamps) + ts_last = old_timestamps[-1] + ts_first = old_timestamps[0] + source = list() + target = list() + new_timeseries = np.empty(( + new_timestamps.shape[0], # new sample count + old_timeseries.shape[1], # old channel count + ), dtype=object) + new_timeseries.fill(np.nan) + too_old = list() + too_young = list() + for nix, nts in enumerate(new_timestamps): + closest = (np.abs(old_timestamps - nts)).argmin() + # remember the edge cases, + if (nts>ts_last): + too_young.append((nix, nts)) + elif (nts < ts_first): + too_old.append((nix,nts)) + else: + closest = (np.abs(old_timestamps - nts)).argmin() + source.append(closest) + target.append(nix) + # check the edge cases, + for nix, nts in reversed(too_old): + closest = (np.abs(old_timestamps - nts)).argmin() + if (closest not in source): + source.append(closest) + target.append(nix) + break + for nix, nts in too_young: + closest = (np.abs(old_timestamps - nts)).argmin() + if (closest not in source): + source.append(closest) + target.append(nix) + break + + if len(set(source)) != len(old_timestamps): + missed = len(old_timestamps)-len(set(source)) + raise RuntimeError(f"Too few new timestamps. {missed} of {len(old_timestamps)} old samples could not be assigned.") + if len(set(source)) != len(source): #non-unique mapping + cnt = Counter(source) + toomany = defaultdict(list) + for v,n in zip(source, target): + if cnt[v] != 1: + toomany[old_timestamps[source[v]]].append(new_timestamps[target[n]]) + for k,v in toomany.items(): + print("The old time_stamp ", k, + "is a closest neighbor of", len(v) ,"new time_stamps:", v) + raise RuntimeError("Can not align streams. Could not create an unique mapping") + for chan in range(old_timeseries.shape[1]): + new_timeseries[target, chan] = old_timeseries[source,chan] + return new_timeseries + + +def align_streams(streams, # List[defaultdict] + align_foo=dict(), # defaultdict[int, Callable] + aligned_timestamps=None, # Optional[List[float]] + sampling_rate=None # Optional[float|int] +): # -> Tuple[np.ndarray, List[float]] + """ + A function to + + + Args: + + streams: a list of defaultdicts (i.e. streams) as returned by + load_xdf + align_foo: a dictionary mapping streamIDs (i.e. int) to interpolation + callables. These callables must have the signature + `interpolate(old_timestamps, old_timeseries, new_timestamps)` and return a np.ndarray. See `_shift_align` and `_interpolate` for examples. + aligned_timestamps (optional): a list of floats with the new + timestamps to be used for alignment/interpolation. This list of timestamps can be irregular and have gaps. + sampling_rate (optional): a float defining the sampling rate which + will be used to calculate aligned_timestamps. + + Return: + (aligned_timeseries, aligned_timestamps): tuple + + + THe user can define either aligned_timestamps or sampling_rate or neither. If neither is defined, the algorithm will take the sampling_rate of the fastest stream and create aligned_timestamps from the oldest sample of all streams to the youngest. + + """ + + if sampling_rate is not None and aligned_timestamps is not None: + raise ValueError("You can not specify aligned_timestamps and sampling_rate at the same time") + + if sampling_rate is None: + # we pick the effective sampling rate from the fastest stream + srates = [stream["info"]["effective_srate"] for stream in streams] + sampling_rate = max(srates, default=0) + if sampling_rate <= 0: # either no valid stream or all streams are async + warnings.warn("Can not align streams: Fastest effective sampling rate was 0 or smaller.") + return streams + + + if aligned_timestamps is None: + # we pick the oldest and youngest timestamp of all streams + stamps = [stream["time_stamps"] for stream in streams] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + full_dur = ts_last-ts_first + step = 1/sampling_rate + # we create new regularized timestamps + aligned_timestamps = np.arange(ts_first, ts_last+step/2, step) + # using np.linspace only differs in step if n_samples is different (as n_samples must be an integer number (see implementation below). + # therefore we stick with np.arange (in spite of possible floating point error accumulation, but to make sure that ts_last is included, we add a half-step. This therefore comes at the cost of a overshoot, but i consider this acceptable considering this stamp would only be from one stream, and not part of all other and therefore is kind of arbitray anyways. + # linspace implementation: + # n_samples = int(np.round((full_dur * sampling_rate),0))+1 + # aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) + + channels = 0 + for stream in streams: + # print(stream) + channels += int(stream["info"]["channel_count"][0]) + # https://stackoverflow.com/questions/1704823/create-numpy-matrix-filled-with-nans The timings show a preference for ndarray.fill(..) as the faster alternative. + aligned_timeseries = np.empty((len(aligned_timestamps), + channels,), dtype=object) + aligned_timeseries.fill(np.nan) + + chan_start = 0 + chan_end = 0 + for stream in streams: + sid = stream["info"]["stream_id"] + align = align_foo.get(sid, _shift_align) + chan_cnt = int(stream["info"]["channel_count"][0]) + new_timeseries = np.empty((len(aligned_timestamps), chan_cnt), dtype=object) + new_timeseries.fill(np.nan) + for seg_start, seg_stop in stream["info"]["segments"]: + _new_timeseries = align( + stream["time_stamps"][seg_start:seg_stop+1], + stream["time_series"][seg_start:seg_stop+1], + aligned_timestamps) + # pick indices of the NEW timestamps closest to when segments start and stop + a = stream["time_stamps"][seg_start] + b = stream["time_stamps"][seg_stop] + aix = np.argmin(np.abs(aligned_timestamps-a)) + bix = np.argmin(np.abs(aligned_timestamps-b)) + # and store only this aligned segment, leaving the rest as nans (or aligned as other segments) + new_timeseries[aix:bix+1] = _new_timeseries[aix:bix+1] + + # store the new timeseries at the respective channel indices in the 2D array + chan_start = chan_end + chan_end += chan_cnt + aligned_timeseries[:, chan_start:chan_end] = new_timeseries + return aligned_timeseries, aligned_timestamps diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 985cb0c..76e923d 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -14,13 +14,11 @@ import itertools import gzip from xml.etree.ElementTree import fromstring -from collections import OrderedDict, defaultdict, Counter +from collections import OrderedDict, defaultdict import logging from pathlib import Path -import warnings import numpy as np - - +from pyxdf.align import align_streams __all__ = ["load_xdf", "align_streams"] logger = logging.getLogger(__name__) @@ -119,32 +117,7 @@ def load_xdf( ClockOffset chunks. (default: true) dejitter_timestamps : Whether to perform jitter removal for regularly - sampled streams. (default: true) - - align_timestamps: {bool, str} - sync timestamps of all streams sample-wise with the stream of the - highest effective sampling rate (or as set in use_samplingrate). Using align_timestamps with any - method other than linear has dependency on scipy, which is not a - hard requirement of pyxdf. If scipy is not installed in your - environment, the method supports linear interpolation with - numpy. - - False -> do not align samples - True -> align samples using linear interpolation - str: align data using the method <'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, ‘next’> as implemented in scipy.interpolate.interp1d. - - use_samplingrate: {int, str} = "highest" - resamples all channels to an effective sampling rate as given by this argument, using the method as defined by align_timestamps. Will do nothing if align_timestamps is False or the sampling rate is not larger than zero. - Plase note that setting a sampling rate smaller than the fastest sampling rate of all streams can result in aliasing. - int>0: resample to this sampling rate. - str:<'highest'> use the highest - - only_overlapping: bool = False - If true, return only overlapping streams, i.e. all streams - are limited to periods where all streams have data. (default: False) - If false, depends on whether align_timestamps is set. If set, it - expands all streams to include the earliest and latest timestamp of - any stream, if not set it simply return streams as they are. + sampled streams. (default: true) on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify @@ -894,156 +867,3 @@ def _parse_streamheader(xml): """Parse stream header XML.""" return {el.tag: el.text for el in xml if el.tag != "desc"} - -def _interpolate( - x: np.ndarray, y: np.ndarray, new_x: np.ndarray, kind="linear" -) -> np.ndarray: - """Perform interpolation for _align_timestamps - - If scipy is not installed, the method falls back to numpy, and then only - supports linear interpolation. Otherwise, it supports ‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, or ‘next’. - """ - try: - from scipy.interpolate import interp1d - - f = interp1d( - x, - y, - kind=kind, - axis=0, - assume_sorted=True, # speed up - bounds_error=False, - ) - return f(new_x) - except ImportError as e: - if kind != "linear": - raise e - else: - return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) - - -def _shift_align(old_timestamps, old_timeseries, new_timestamps): - old_timestamps = np.array(old_timestamps) - old_timeseries = np.array(old_timeseries) - new_timestamps = np.array(new_timestamps) - ts_last = old_timestamps[-1] - ts_first = old_timestamps[0] - source = list() - target = list() - new_timeseries = np.empty(( - new_timestamps.shape[0], # new sample count - old_timeseries.shape[1], # old channel count - ), dtype=object) - new_timeseries.fill(np.nan) - too_old = list() - too_young = list() - for nix, nts in enumerate(new_timestamps): - closest = (np.abs(old_timestamps - nts)).argmin() - # remember the edge cases, - if (nts>ts_last): - too_young.append((nix, nts)) - elif (nts < ts_first): - too_old.append((nix,nts)) - else: - closest = (np.abs(old_timestamps - nts)).argmin() - source.append(closest) - target.append(nix) - # check the edge cases, - for nix, nts in reversed(too_old): - closest = (np.abs(old_timestamps - nts)).argmin() - if (closest not in source): - source.append(closest) - target.append(nix) - break - for nix, nts in too_young: - closest = (np.abs(old_timestamps - nts)).argmin() - if (closest not in source): - source.append(closest) - target.append(nix) - break - - if len(set(source)) != len(old_timestamps): - missed = len(old_timestamps)-len(set(source)) - raise RuntimeError(f"Too few new timestamps. {missed} of {len(old_timestamps)} old samples could not be assigned.") - if len(set(source)) != len(source): #non-unique mapping - cnt = Counter(source) - toomany = defaultdict(list) - for v,n in zip(source, target): - if cnt[v] != 1: - toomany[old_timestamps[source[v]]].append(new_timestamps[target[n]]) - for k,v in toomany.items(): - print("The old time_stamp ", k, - "is a closest neighbor of", len(v) ,"new time_stamps:", v) - raise RuntimeError("Can not align streams. Could not create an unique mapping") - for chan in range(old_timeseries.shape[1]): - new_timeseries[target, chan] = old_timeseries[source,chan] - return new_timeseries - - -def align_streams(streams, # List[defaultdict] - align_foo=dict(), # defaultdict[int, Callable] - aligned_timestamps=None, # List[float] - sampling_rate=None # float -): - if sampling_rate is not None and aligned_timestamps is not None: - raise ValueError("You can not specify aligned_timestamps and sampling_rate at the same time") - - if sampling_rate is None: - # we pick the effective sampling rate from the fastest stream - srates = [stream["info"]["effective_srate"] for stream in streams] - sampling_rate = max(srates, default=0) - if sampling_rate <= 0: # either no valid stream or all streams are async - warnings.warn("Can not align streams: Fastest effective sampling rate was 0 or smaller.") - return streams - - - if aligned_timestamps is None: - # we pick the oldest and youngest timestamp of all streams - stamps = [stream["time_stamps"] for stream in streams] - ts_first = min((min(s) for s in stamps)) - ts_last = max((max(s) for s in stamps)) - full_dur = ts_last-ts_first - step = 1/sampling_rate - # we create new regularized timestamps - aligned_timestamps = np.arange(ts_first, ts_last+step/2, step) - # using np.linspace only differs in step if n_samples is different (as n_samples must be an integer number (see implementation below). - # therefore we stick with np.arange (in spite of possible floating point error accumulation, but to make sure that ts_last is included, we add a half-step. This therefore comes at the cost of a overshoot, but i consider this acceptable considering this stamp would only be from one stream, and not part of all other and therefore is kind of arbitray anyways. - # linspace implementation: - # n_samples = int(np.round((full_dur * sampling_rate),0))+1 - # aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) - - channels = 0 - for stream in streams: - # print(stream) - channels += int(stream["info"]["channel_count"][0]) - # https://stackoverflow.com/questions/1704823/create-numpy-matrix-filled-with-nans The timings show a preference for ndarray.fill(..) as the faster alternative. - aligned_timeseries = np.empty((len(aligned_timestamps), - channels,), dtype=object) - aligned_timeseries.fill(np.nan) - - chan_start = 0 - chan_end = 0 - for stream in streams: - sid = stream["info"]["stream_id"] - align = align_foo.get(sid, _shift_align) - chan_cnt = int(stream["info"]["channel_count"][0]) - new_timeseries = np.empty((len(aligned_timestamps), chan_cnt), dtype=object) - new_timeseries.fill(np.nan) - for seg_start, seg_stop in stream["info"]["segments"]: - _new_timeseries = align( - stream["time_stamps"][seg_start:seg_stop+1], - stream["time_series"][seg_start:seg_stop+1], - aligned_timestamps) - # pick indices of the NEW timestamps closest to when segments start and stop - a = stream["time_stamps"][seg_start] - b = stream["time_stamps"][seg_stop] - aix = np.argmin(np.abs(aligned_timestamps-a)) - bix = np.argmin(np.abs(aligned_timestamps-b)) - # and store only this aligned segment, leaving the rest as nans (or aligned as other segments) - new_timeseries[aix:bix+1] = _new_timeseries[aix:bix+1] - - # store the new timeseries at the respective channel indices in the 2D array - chan_start = chan_end - chan_end += chan_cnt - aligned_timeseries[:, chan_start:chan_end] = new_timeseries - return aligned_timeseries, aligned_timestamps diff --git a/pyxdf/test/test_data.py b/pyxdf/test/test_data.py index 3ed7b93..efd939e 100644 --- a/pyxdf/test/test_data.py +++ b/pyxdf/test/test_data.py @@ -104,7 +104,7 @@ def test_smoketest_sync_forced(example_files): for d, s in zip(a_series, a_stamps): print(f"{s:5.3f} : {d}") - from pyxdf.pyxdf import _interpolate + from pyxdf.align import _interpolate print("forced cubic") a_series, a_stamps = align_streams(streams, align_foo={0:lambda x,y,xh: _interpolate(x,y,xh, "cubic")}) for d, s in zip(a_series, a_stamps): diff --git a/pyxdf/test/test_shift_align.py b/pyxdf/test/test_shift_align.py index 6e6a9b9..ac7007c 100644 --- a/pyxdf/test/test_shift_align.py +++ b/pyxdf/test/test_shift_align.py @@ -1,4 +1,4 @@ -from pyxdf.pyxdf import _shift_align +from pyxdf.align import _shift_align import numpy as np import pytest old_timestamps = np.linspace(1.0, 1.5, 51) From 19b6b58a18b8de93dac0929e682b8a77c362392e Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 7 Oct 2024 06:30:35 +0200 Subject: [PATCH 13/14] Attempt to address https://github.com/xdf-modules/pyxdf/pull/1 --- pyxdf/__init__.py | 10 ++- pyxdf/align.py | 202 ++++++++++++++++++++++++++-------------------- test.py | 15 ++++ 3 files changed, 138 insertions(+), 89 deletions(-) create mode 100644 test.py diff --git a/pyxdf/__init__.py b/pyxdf/__init__.py index dd4683b..81b6354 100644 --- a/pyxdf/__init__.py +++ b/pyxdf/__init__.py @@ -3,10 +3,14 @@ # # License: BSD (2-clause) -from pkg_resources import get_distribution, DistributionNotFound try: - __version__ = get_distribution(__name__).version -except DistributionNotFound: # package is not installed + from pkg_resources import get_distribution, DistributionNotFound + + try: + __version__ = get_distribution(__name__).version + except DistributionNotFound: # package is not installed + __version__ = None +except ImportError: # pkg_resources is not available __version__ = None from .pyxdf import load_xdf, resolve_streams, match_streaminfos, align_streams diff --git a/pyxdf/align.py b/pyxdf/align.py index 83f0ab9..2fb3115 100644 --- a/pyxdf/align.py +++ b/pyxdf/align.py @@ -31,148 +31,178 @@ def _interpolate( def _shift_align(old_timestamps, old_timeseries, new_timestamps): + # Convert inputs to numpy arrays old_timestamps = np.array(old_timestamps) old_timeseries = np.array(old_timeseries) new_timestamps = np.array(new_timestamps) + ts_last = old_timestamps[-1] - ts_first = old_timestamps[0] - source = list() - target = list() - new_timeseries = np.empty(( - new_timestamps.shape[0], # new sample count - old_timeseries.shape[1], # old channel count - ), dtype=object) - new_timeseries.fill(np.nan) - too_old = list() - too_young = list() + ts_first = old_timestamps[0] + + # Initialize variables + source = [] + target = [] + + new_timeseries = np.full((new_timestamps.shape[0], old_timeseries.shape[1]), np.nan) + + too_old = [] + too_young = [] + + # Loop through new timestamps to find the closest old timestamp + # Handle timestamps outside of the segment (too young or too old) different from stamnps from within the segment for nix, nts in enumerate(new_timestamps): - closest = (np.abs(old_timestamps - nts)).argmin() - # remember the edge cases, - if (nts>ts_last): + if nts > ts_last: too_young.append((nix, nts)) - elif (nts < ts_first): - too_old.append((nix,nts)) + elif nts < ts_first: + too_old.append((nix, nts)) else: - closest = (np.abs(old_timestamps - nts)).argmin() - source.append(closest) - target.append(nix) - # check the edge cases, - for nix, nts in reversed(too_old): - closest = (np.abs(old_timestamps - nts)).argmin() - if (closest not in source): + closest = np.abs(old_timestamps - nts).argmin() + if closest not in source: # Ensure unique mapping + source.append(closest) + target.append(nix) + else: + raise RuntimeError( + f"Non-unique mapping. Closest old timestamp for {new_timestamps[nix]} is {old_timestamps[closest]} but that one was already assigned to {new_timestamps[source.index(closest)]}" + ) + + # Handle too old timestamps (those before the first old timestamp) + for nix, nts in too_old: + closest = 0 # Assign to the first timestamp + if closest not in source: # Ensure unique mapping source.append(closest) target.append(nix) - break + break # only one, because we only need the edge + + # Handle too young timestamps (those after the last old timestamp) for nix, nts in too_young: - closest = (np.abs(old_timestamps - nts)).argmin() - if (closest not in source): + closest = len(old_timestamps) - 1 # Assign to the last timestamp + if closest not in source: # Ensure unique mapping source.append(closest) target.append(nix) - break - - if len(set(source)) != len(old_timestamps): - missed = len(old_timestamps)-len(set(source)) - raise RuntimeError(f"Too few new timestamps. {missed} of {len(old_timestamps)} old samples could not be assigned.") - if len(set(source)) != len(source): #non-unique mapping - cnt = Counter(source) - toomany = defaultdict(list) - for v,n in zip(source, target): - if cnt[v] != 1: - toomany[old_timestamps[source[v]]].append(new_timestamps[target[n]]) - for k,v in toomany.items(): - print("The old time_stamp ", k, - "is a closest neighbor of", len(v) ,"new time_stamps:", v) - raise RuntimeError("Can not align streams. Could not create an unique mapping") + break # only one, because we only need the edge + + # Sanity check: all old timestamps should be assigned to at least one new timestamp + missed = len(old_timestamps) - len(set(source)) + if missed > 0: + unassigned_old = [i for i in range(len(old_timestamps)) if i not in source] + raise RuntimeError( + f"Too few new timestamps. {missed} old timestamps ({old_timestamps[unassigned_old]}) found no corresponding new timestamp because it was already taken by another old timestamp. If your stream has multiple segments, this might be caused by small differences in effective srate between segments. Try different dejittering thresholds or support your own aligned_timestamps." + ) + + # Populate new timeseries with aligned values from old_timeseries for chan in range(old_timeseries.shape[1]): - new_timeseries[target, chan] = old_timeseries[source,chan] + new_timeseries[target, chan] = old_timeseries[source, chan] + return new_timeseries -def align_streams(streams, # List[defaultdict] - align_foo=dict(), # defaultdict[int, Callable] - aligned_timestamps=None, # Optional[List[float]] - sampling_rate=None # Optional[float|int] -): # -> Tuple[np.ndarray, List[float]] +def align_streams( + streams, # List[defaultdict] + align_foo=dict(), # defaultdict[int, Callable] + aligned_timestamps=None, # Optional[List[float]] + sampling_rate=None, # Optional[float|int] +): # -> Tuple[np.ndarray, List[float]] """ - A function to + A function to Args: - streams: a list of defaultdicts (i.e. streams) as returned by + streams: a list of defaultdicts (i.e. streams) as returned by load_xdf - align_foo: a dictionary mapping streamIDs (i.e. int) to interpolation - callables. These callables must have the signature + align_foo: a dictionary mapping streamIDs (i.e. int) to interpolation + callables. These callables must have the signature `interpolate(old_timestamps, old_timeseries, new_timestamps)` and return a np.ndarray. See `_shift_align` and `_interpolate` for examples. - aligned_timestamps (optional): a list of floats with the new + aligned_timestamps (optional): a list of floats with the new timestamps to be used for alignment/interpolation. This list of timestamps can be irregular and have gaps. - sampling_rate (optional): a float defining the sampling rate which + sampling_rate (optional): a float defining the sampling rate which will be used to calculate aligned_timestamps. - + Return: (aligned_timeseries, aligned_timestamps): tuple - THe user can define either aligned_timestamps or sampling_rate or neither. If neither is defined, the algorithm will take the sampling_rate of the fastest stream and create aligned_timestamps from the oldest sample of all streams to the youngest. - + THe user can define either aligned_timestamps or sampling_rate or neither. If neither is defined, the algorithm will take the sampling_rate of the fastest stream and create aligned_timestamps from the oldest sample of all streams to the youngest. + """ - + if sampling_rate is not None and aligned_timestamps is not None: - raise ValueError("You can not specify aligned_timestamps and sampling_rate at the same time") - + raise ValueError( + "You can not specify aligned_timestamps and sampling_rate at the same time" + ) + if sampling_rate is None: - # we pick the effective sampling rate from the fastest stream + # we pick the effective sampling rate from the fastest stream srates = [stream["info"]["effective_srate"] for stream in streams] sampling_rate = max(srates, default=0) if sampling_rate <= 0: # either no valid stream or all streams are async - warnings.warn("Can not align streams: Fastest effective sampling rate was 0 or smaller.") + warnings.warn( + "Can not align streams: Fastest effective sampling rate was 0 step = 1 / sampling_rateor smaller." + ) return streams - - - if aligned_timestamps is None: + + if aligned_timestamps is None: # we pick the oldest and youngest timestamp of all streams - stamps = [stream["time_stamps"] for stream in streams] - ts_first = min((min(s) for s in stamps)) - ts_last = max((max(s) for s in stamps)) - full_dur = ts_last-ts_first - step = 1/sampling_rate + stamps = [stream["time_stamps"] for stream in streams] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + full_dur = ts_last - ts_first + # Use np.linspace for precise control over the number of points and guaranteed inclusion of the stop value. + # np.arange is better when you need direct control over step size but may exclude the stop value and accumulate floating-point errors. + # Choose np.linspace for better precision and np.arange for efficiency with fixed steps. # we create new regularized timestamps - aligned_timestamps = np.arange(ts_first, ts_last+step/2, step) - # using np.linspace only differs in step if n_samples is different (as n_samples must be an integer number (see implementation below). - # therefore we stick with np.arange (in spite of possible floating point error accumulation, but to make sure that ts_last is included, we add a half-step. This therefore comes at the cost of a overshoot, but i consider this acceptable considering this stamp would only be from one stream, and not part of all other and therefore is kind of arbitray anyways. + # arange implementation: + # step = 1 / sampling_rate + # aligned_timestamps = np.arange(ts_first, ts_last + step / 2, step) # linspace implementation: - # n_samples = int(np.round((full_dur * sampling_rate),0))+1 - # aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) - + # add 1 to the number of samples to include the last sample + n_samples = int(np.round((full_dur * sampling_rate), 0)) + 1 + aligned_timestamps = np.linspace(ts_first, ts_last, n_samples) + channels = 0 for stream in streams: # print(stream) channels += int(stream["info"]["channel_count"][0]) # https://stackoverflow.com/questions/1704823/create-numpy-matrix-filled-with-nans The timings show a preference for ndarray.fill(..) as the faster alternative. - aligned_timeseries = np.empty((len(aligned_timestamps), - channels,), dtype=object) + aligned_timeseries = np.empty( + ( + len(aligned_timestamps), + channels, + ), + dtype=object, + ) aligned_timeseries.fill(np.nan) - chan_start = 0 + chan_start = 0 chan_end = 0 for stream in streams: sid = stream["info"]["stream_id"] - align = align_foo.get(sid, _shift_align) + align = align_foo.get(sid, _shift_align) chan_cnt = int(stream["info"]["channel_count"][0]) new_timeseries = np.empty((len(aligned_timestamps), chan_cnt), dtype=object) new_timeseries.fill(np.nan) - for seg_start, seg_stop in stream["info"]["segments"]: - _new_timeseries = align( - stream["time_stamps"][seg_start:seg_stop+1], - stream["time_series"][seg_start:seg_stop+1], - aligned_timestamps) + print("Stream #", sid, " has ", len(stream["info"]["segments"]), "segments") + for seg_idx, (seg_start, seg_stop) in enumerate(stream["info"]["segments"]): + print(seg_idx, ": from index ", seg_start, "to ", seg_stop + 1) + # segments have been created including the stop index, so we need to add 1 to include the last sample + segment_old_timestamps = stream["time_stamps"][seg_start : seg_stop + 1] + segment_old_timeseries = stream["time_series"][seg_start : seg_stop + 1] + # Sanity check for duplicate timestamps + if len(np.unique(segment_old_timestamps)) != len(segment_old_timestamps): + raise RuntimeError("Duplicate timestamps found in old_timestamps") + # apply align function as defined by the user (or default) + segment_new_timeseries = align( + segment_old_timestamps, + segment_old_timeseries, + aligned_timestamps, + ) # pick indices of the NEW timestamps closest to when segments start and stop a = stream["time_stamps"][seg_start] b = stream["time_stamps"][seg_stop] - aix = np.argmin(np.abs(aligned_timestamps-a)) - bix = np.argmin(np.abs(aligned_timestamps-b)) + aix = np.argmin(np.abs(aligned_timestamps - a)) + bix = np.argmin(np.abs(aligned_timestamps - b)) # and store only this aligned segment, leaving the rest as nans (or aligned as other segments) - new_timeseries[aix:bix+1] = _new_timeseries[aix:bix+1] + new_timeseries[aix : bix + 1] = segment_new_timeseries[aix : bix + 1] # store the new timeseries at the respective channel indices in the 2D array chan_start = chan_end diff --git a/test.py b/test.py new file mode 100644 index 0000000..b0897b4 --- /dev/null +++ b/test.py @@ -0,0 +1,15 @@ +import matplotlib.pyplot as plt +import pyxdf + +if __name__ == "__main__": + fname = "/home/rtgugg/Downloads/sub-13_ses-S001_task-HCT_run-001_eeg.xdf" + # streams, header = pyxdf.load_xdf( + # fname, select_streams=[2, 5] + # ) # EEG and ACC streams + + # pyxdf.align_streams(streams) + + streams, header = pyxdf.load_xdf(fname, select_streams=[2]) # EEG stream + plt.plot(streams[0]["time_stamps"]) + plt.show() + pyxdf.align_streams(streams) From b98248fce460e95646d48420aa324eb9b51ab369 Mon Sep 17 00:00:00 2001 From: agricolab Date: Tue, 8 Oct 2024 04:29:43 +0200 Subject: [PATCH 14/14] Improve exception messages --- pyxdf/align.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pyxdf/align.py b/pyxdf/align.py index 2fb3115..a9929ce 100644 --- a/pyxdf/align.py +++ b/pyxdf/align.py @@ -86,7 +86,7 @@ def _shift_align(old_timestamps, old_timeseries, new_timestamps): if missed > 0: unassigned_old = [i for i in range(len(old_timestamps)) if i not in source] raise RuntimeError( - f"Too few new timestamps. {missed} old timestamps ({old_timestamps[unassigned_old]}) found no corresponding new timestamp because it was already taken by another old timestamp. If your stream has multiple segments, this might be caused by small differences in effective srate between segments. Try different dejittering thresholds or support your own aligned_timestamps." + f"Too few new timestamps. {missed} old timestamps ({unassigned_old}:{old_timestamps[unassigned_old]}) found no corresponding new timestamp because all were already taken by other old timestamps. If your stream has multiple segments, this might be caused by small differences in effective srate between segments. Try different dejittering thresholds or support your own aligned_timestamps." ) # Populate new timeseries with aligned values from old_timeseries @@ -146,7 +146,9 @@ def align_streams( stamps = [stream["time_stamps"] for stream in streams] ts_first = min((min(s) for s in stamps)) ts_last = max((max(s) for s in stamps)) - full_dur = ts_last - ts_first + full_dur = ( + ts_last - ts_first + (1 / sampling_rate) + ) # add one sample to include the last sample (see _jitter_removal) # Use np.linspace for precise control over the number of points and guaranteed inclusion of the stop value. # np.arange is better when you need direct control over step size but may exclude the stop value and accumulate floating-point errors. # Choose np.linspace for better precision and np.arange for efficiency with fixed steps.