diff --git a/lsl_security_helper.py b/lsl_security_helper.py new file mode 100644 index 0000000..af4c178 --- /dev/null +++ b/lsl_security_helper.py @@ -0,0 +1,127 @@ +""" +Helper module to add security API support to pylsl. + +This module ensures security_enabled() and security_fingerprint() methods +are available on pylsl.StreamInfo, and provides local_security_enabled() +to check if the local device has security credentials configured. + +Either uses native pylsl support (pylsl >= TBD with secure-lsl-support branch) +or patches the class. + +Usage: + import lsl_security_helper # Import before using pylsl + import pylsl + + # Check local security status + if lsl_security_helper.local_security_enabled(): + print("Local security credentials are configured") + + streams = pylsl.resolve_streams() + for s in streams: + if s.security_enabled(): + print(f"Stream {s.name()} is encrypted: {s.security_fingerprint()}") +""" + +import ctypes +import pylsl +from pylsl.lib import lib + + +def _check_native_support(): + """Check if pylsl has native security_enabled/security_fingerprint methods.""" + return ( + hasattr(pylsl.StreamInfo, "security_enabled") + and hasattr(pylsl.StreamInfo, "security_fingerprint") + and callable(getattr(pylsl.StreamInfo, "security_enabled", None)) + ) + + +def _setup_security_functions(): + """Set up ctypes declarations for security functions.""" + try: + # lsl_get_security_enabled returns int32_t (1=enabled, 0=disabled, -1=error) + lib.lsl_get_security_enabled.restype = ctypes.c_int32 + lib.lsl_get_security_enabled.argtypes = [ctypes.c_void_p] + + # lsl_get_security_fingerprint returns const char* + lib.lsl_get_security_fingerprint.restype = ctypes.c_char_p + lib.lsl_get_security_fingerprint.argtypes = [ctypes.c_void_p] + + return True + except AttributeError: + # Functions not available in this liblsl build + return False + + +def _setup_local_security_function(): + """Set up ctypes declaration for local security enabled check.""" + try: + # lsl_local_security_enabled returns int32_t (1=enabled, 0=disabled) + lib.lsl_local_security_enabled.restype = ctypes.c_int32 + lib.lsl_local_security_enabled.argtypes = [] + return True + except AttributeError: + return False + + +# Track if local security function is available +_local_security_available = _setup_local_security_function() + + +def local_security_enabled() -> bool: + """Check if the local device has security credentials configured. + + Returns True if security credentials are loaded and enabled locally, + False otherwise (either not a secure build or no credentials configured). + """ + if not _local_security_available: + return False + try: + return lib.lsl_local_security_enabled() == 1 + except Exception: + return False + + +def _security_enabled(self) -> bool: + """Check if the stream has security/encryption enabled. + + Returns True if security is enabled, False otherwise. + """ + try: + result = lib.lsl_get_security_enabled(self.obj) + return result == 1 + except Exception: + return False + + +def _security_fingerprint(self) -> str: + """Get the security fingerprint of the stream's public key. + + Returns the fingerprint string (SHA256:xxxx...) or empty string if + security is not enabled. + """ + try: + result = lib.lsl_get_security_fingerprint(self.obj) + if result: + return result.decode("utf-8") + return "" + except Exception: + return "" + + +# Check for native support first (pylsl with secure-lsl-support branch) +if _check_native_support(): + # pylsl already has native support; no patching needed + _security_available = True +else: + # No native support; try to patch StreamInfo class + _security_available = _setup_security_functions() + + if _security_available: + pylsl.StreamInfo.security_enabled = _security_enabled + pylsl.StreamInfo.security_fingerprint = _security_fingerprint + else: + # Provide stub methods that always return False/empty + pylsl.StreamInfo.security_enabled = lambda self: False + pylsl.StreamInfo.security_fingerprint = lambda self: "" + print("Warning: Security API not available in liblsl. Using stub methods.") diff --git a/paintwidget.py b/paintwidget.py index 10a5967..c5117e7 100644 --- a/paintwidget.py +++ b/paintwidget.py @@ -1,6 +1,7 @@ from PyQt5.QtCore import QThread, Qt, pyqtSignal from PyQt5.QtGui import QPalette, QPainter, QPen -from PyQt5.QtWidgets import QWidget +from PyQt5.QtWidgets import QWidget, QMessageBox +import lsl_security_helper # Adds security methods to pylsl import pylsl import math import copy @@ -13,6 +14,7 @@ class DataThread(QThread): updateStreamNames = pyqtSignal(list, int) sendData = pyqtSignal(list, list, list, list) changedStream = pyqtSignal() + securityMismatch = pyqtSignal(list, bool) # (stream_names, local_has_security) def_stream_parms = {'chunk_idx': 0, 'metadata': {}, 'srate': None, 'chunkSize': None, 'downSampling': None, 'downSamplingFactor': None, 'downSamplingBuffer': None, @@ -27,13 +29,49 @@ def __init__(self, parent): self.sig_strm_idx = -1 def handle_stream_expanded(self, name): + if not self.stream_params: + return # No streams available (e.g., security mismatch) stream_names = [_['metadata']['name'] for _ in self.stream_params] - self.sig_strm_idx = stream_names.index(name) - self.changedStream.emit() + if name in stream_names: + self.sig_strm_idx = stream_names.index(name) + self.changedStream.emit() def update_streams(self): if not self.streams: self.streams = pylsl.resolve_streams(wait_time=1.0) + + if not self.streams: + return # No streams found + + # Check for security mismatches before creating inlets + local_security = lsl_security_helper.local_security_enabled() + mismatch_streams = [] + for stream in self.streams: + stream_secure = stream.security_enabled() + if stream_secure != local_security: + suffix = " (insecure)" if not stream_secure else "" + mismatch_streams.append(stream.name() + suffix) + + # Build metadata for all streams (to show in UI) + all_metadata = [] + for stream in self.streams: + all_metadata.append({ + "name": stream.name(), + "ch_count": stream.channel_count(), + "ch_format": stream.channel_format(), + "srate": stream.nominal_srate(), + "security_enabled": stream.security_enabled(), + "security_fingerprint": stream.security_fingerprint() + }) + + # Always emit stream names to show in UI (even if mismatched) + self.updateStreamNames.emit(all_metadata, 0 if all_metadata else -1) + + if mismatch_streams: + self.securityMismatch.emit(mismatch_streams, local_security) + self.streams = [] # Clear so user can retry with Update button + return # Don't create inlets for mismatched streams + for k, stream in enumerate(self.streams): n = stream.name() stream_params = copy.deepcopy(self.def_stream_parms) @@ -41,7 +79,9 @@ def update_streams(self): "name": n, "ch_count": stream.channel_count(), "ch_format": stream.channel_format(), - "srate": stream.nominal_srate() + "srate": stream.nominal_srate(), + "security_enabled": stream.security_enabled(), + "security_fingerprint": stream.security_fingerprint() }) # ch = stream.desc().child("channels").child("channel") # for ch_ix in range(stream.channel_count()): @@ -63,7 +103,6 @@ def update_streams(self): stream_params['downSamplingBuffer'] = [[0] * int(stream.channel_count())] * n_buff self.stream_params.append(stream_params) - self.updateStreamNames.emit([_['metadata'] for _ in self.stream_params], self.sig_strm_idx) self.start() def run(self): @@ -110,6 +149,41 @@ def __init__(self, widget): self.dataTr = DataThread(self) self.dataTr.sendData.connect(self.get_data) self.dataTr.changedStream.connect(self.reset) + self.dataTr.securityMismatch.connect(self.show_security_mismatch) + + def show_security_mismatch(self, stream_names, local_has_security): + """Show a dialog when there's a security mismatch between streams and local config.""" + stream_list = "
".join( + f"  • {name}" + for name in stream_names + ) + + if not local_has_security: + error_msg = ( + "The following streams require security, but SigVisualizer does not have " + "security credentials configured:

" + stream_list + + "

To fix this:
" + "  1. Run 'lsl-keygen' to generate credentials, or
" + "  2. Import shared credentials from an authorized device

" + "Cannot visualize streams with " + "mismatched security settings." + ) + else: + error_msg = ( + "Security mismatch detected for the following streams:

" + + stream_list + + "

All devices must have the same security configuration " + "(either all secure or all insecure).

" + "Cannot visualize streams with " + "mismatched security settings." + ) + + msg_box = QMessageBox(self) + msg_box.setWindowTitle("Security Mismatch") + msg_box.setIcon(QMessageBox.Critical) + msg_box.setTextFormat(Qt.RichText) + msg_box.setText(error_msg) + msg_box.exec_() def reset(self): self.chunk_idx = 0 @@ -210,17 +284,17 @@ def paintEvent(self, event): chan_offset = (ch_idx + 0.5) * self.channelHeight if self.lastY: if not math.isnan(self.lastY[ch_idx]) and not math.isnan(self.dataBuffer[0][ch_idx]): - painter.drawLine(x0 - self.px_per_samp, - -self.lastY[ch_idx] + chan_offset, - x0, - -self.dataBuffer[0][ch_idx] + chan_offset) + painter.drawLine(int(x0 - self.px_per_samp), + int(-self.lastY[ch_idx] + chan_offset), + int(x0), + int(-self.dataBuffer[0][ch_idx] + chan_offset)) for m in range(n_samps - 1): if not math.isnan(self.dataBuffer[m][ch_idx]) and not math.isnan(self.dataBuffer[m+1][ch_idx]): - painter.drawLine(x0 + m * self.px_per_samp, - -self.dataBuffer[m][ch_idx] + chan_offset, - x0 + (m + 1) * self.px_per_samp, - -self.dataBuffer[m+1][ch_idx] + chan_offset) + painter.drawLine(int(x0 + m * self.px_per_samp), + int(-self.dataBuffer[m][ch_idx] + chan_offset), + int(x0 + (m + 1) * self.px_per_samp), + int(-self.dataBuffer[m+1][ch_idx] + chan_offset)) # Reset for next iteration self.chunk_idx = (self.chunk_idx + 1) % self.dataTr.chunksPerScreen # For next iteration @@ -230,6 +304,6 @@ def paintEvent(self, event): if self.markerBuffer is not None: painter.setPen(QPen(Qt.red)) for px, mrk in self.markerBuffer: - painter.drawLine(px, 0, px, self.height()) - painter.drawText(px - 2 * self.px_per_samp, 0.95 * self.height(), mrk) + painter.drawLine(int(px), 0, int(px), self.height()) + painter.drawText(int(px - 2 * self.px_per_samp), int(0.95 * self.height()), mrk) self.markerBuffer = None diff --git a/sigvisualizer.py b/sigvisualizer.py index e31062b..788b7b0 100644 --- a/sigvisualizer.py +++ b/sigvisualizer.py @@ -36,17 +36,21 @@ def __init__(self): self.stream_expanded.connect(self.ui.widget.dataTr.handle_stream_expanded) def tree_item_expanded(self, widget_item): - name = widget_item.text(0) + display_name = widget_item.text(0) + # Remove lock emoji prefix if present for matching + name = display_name.lstrip("\U0001F512 ") for it_ix in range(self.ui.treeWidget.topLevelItemCount()): item = self.ui.treeWidget.topLevelItem(it_ix) - if item.text(0) != name: + if item.text(0) != display_name: item.setExpanded(False) self.stream_expanded.emit(name) def update_metadata_widget(self, metadata, default_idx): for s_ix, s_meta in enumerate(metadata): item = QTreeWidgetItem(self.ui.treeWidget) - item.setText(0, s_meta["name"]) + # Show lock emoji for encrypted streams + prefix = "\U0001F512 " if s_meta.get("security_enabled", False) else "" + item.setText(0, prefix + s_meta["name"]) for m in range(s_meta["ch_count"]): channel_item = QTreeWidgetItem(item) @@ -57,8 +61,11 @@ def update_metadata_widget(self, metadata, default_idx): self.ui.treeWidget.addTopLevelItem(item) self.ui.treeWidget.setAnimated(True) + # Show sampling rate and security status + security_status = "Encrypted" if metadata[default_idx].get("security_enabled", False) else "Unencrypted" self.statusBar.showMessage( - "Sampling rate: {}Hz".format(metadata[default_idx]["srate"])) + "Sampling rate: {}Hz | Security: {}".format( + metadata[default_idx]["srate"], security_status)) def toggle_panel(self): if self.panelHidden: