diff --git a/lsl_security_helper.py b/lsl_security_helper.py
new file mode 100644
index 0000000..af4c178
--- /dev/null
+++ b/lsl_security_helper.py
@@ -0,0 +1,127 @@
+"""
+Helper module to add security API support to pylsl.
+
+This module ensures security_enabled() and security_fingerprint() methods
+are available on pylsl.StreamInfo, and provides local_security_enabled()
+to check if the local device has security credentials configured.
+
+Either uses native pylsl support (pylsl >= TBD with secure-lsl-support branch)
+or patches the class.
+
+Usage:
+ import lsl_security_helper # Import before using pylsl
+ import pylsl
+
+ # Check local security status
+ if lsl_security_helper.local_security_enabled():
+ print("Local security credentials are configured")
+
+ streams = pylsl.resolve_streams()
+ for s in streams:
+ if s.security_enabled():
+ print(f"Stream {s.name()} is encrypted: {s.security_fingerprint()}")
+"""
+
+import ctypes
+import pylsl
+from pylsl.lib import lib
+
+
+def _check_native_support():
+ """Check if pylsl has native security_enabled/security_fingerprint methods."""
+ return (
+ hasattr(pylsl.StreamInfo, "security_enabled")
+ and hasattr(pylsl.StreamInfo, "security_fingerprint")
+ and callable(getattr(pylsl.StreamInfo, "security_enabled", None))
+ )
+
+
+def _setup_security_functions():
+ """Set up ctypes declarations for security functions."""
+ try:
+ # lsl_get_security_enabled returns int32_t (1=enabled, 0=disabled, -1=error)
+ lib.lsl_get_security_enabled.restype = ctypes.c_int32
+ lib.lsl_get_security_enabled.argtypes = [ctypes.c_void_p]
+
+ # lsl_get_security_fingerprint returns const char*
+ lib.lsl_get_security_fingerprint.restype = ctypes.c_char_p
+ lib.lsl_get_security_fingerprint.argtypes = [ctypes.c_void_p]
+
+ return True
+ except AttributeError:
+ # Functions not available in this liblsl build
+ return False
+
+
+def _setup_local_security_function():
+ """Set up ctypes declaration for local security enabled check."""
+ try:
+ # lsl_local_security_enabled returns int32_t (1=enabled, 0=disabled)
+ lib.lsl_local_security_enabled.restype = ctypes.c_int32
+ lib.lsl_local_security_enabled.argtypes = []
+ return True
+ except AttributeError:
+ return False
+
+
+# Track if local security function is available
+_local_security_available = _setup_local_security_function()
+
+
+def local_security_enabled() -> bool:
+ """Check if the local device has security credentials configured.
+
+ Returns True if security credentials are loaded and enabled locally,
+ False otherwise (either not a secure build or no credentials configured).
+ """
+ if not _local_security_available:
+ return False
+ try:
+ return lib.lsl_local_security_enabled() == 1
+ except Exception:
+ return False
+
+
+def _security_enabled(self) -> bool:
+ """Check if the stream has security/encryption enabled.
+
+ Returns True if security is enabled, False otherwise.
+ """
+ try:
+ result = lib.lsl_get_security_enabled(self.obj)
+ return result == 1
+ except Exception:
+ return False
+
+
+def _security_fingerprint(self) -> str:
+ """Get the security fingerprint of the stream's public key.
+
+ Returns the fingerprint string (SHA256:xxxx...) or empty string if
+ security is not enabled.
+ """
+ try:
+ result = lib.lsl_get_security_fingerprint(self.obj)
+ if result:
+ return result.decode("utf-8")
+ return ""
+ except Exception:
+ return ""
+
+
+# Check for native support first (pylsl with secure-lsl-support branch)
+if _check_native_support():
+ # pylsl already has native support; no patching needed
+ _security_available = True
+else:
+ # No native support; try to patch StreamInfo class
+ _security_available = _setup_security_functions()
+
+ if _security_available:
+ pylsl.StreamInfo.security_enabled = _security_enabled
+ pylsl.StreamInfo.security_fingerprint = _security_fingerprint
+ else:
+ # Provide stub methods that always return False/empty
+ pylsl.StreamInfo.security_enabled = lambda self: False
+ pylsl.StreamInfo.security_fingerprint = lambda self: ""
+ print("Warning: Security API not available in liblsl. Using stub methods.")
diff --git a/paintwidget.py b/paintwidget.py
index 10a5967..c5117e7 100644
--- a/paintwidget.py
+++ b/paintwidget.py
@@ -1,6 +1,7 @@
from PyQt5.QtCore import QThread, Qt, pyqtSignal
from PyQt5.QtGui import QPalette, QPainter, QPen
-from PyQt5.QtWidgets import QWidget
+from PyQt5.QtWidgets import QWidget, QMessageBox
+import lsl_security_helper # Adds security methods to pylsl
import pylsl
import math
import copy
@@ -13,6 +14,7 @@ class DataThread(QThread):
updateStreamNames = pyqtSignal(list, int)
sendData = pyqtSignal(list, list, list, list)
changedStream = pyqtSignal()
+ securityMismatch = pyqtSignal(list, bool) # (stream_names, local_has_security)
def_stream_parms = {'chunk_idx': 0, 'metadata': {}, 'srate': None, 'chunkSize': None,
'downSampling': None, 'downSamplingFactor': None, 'downSamplingBuffer': None,
@@ -27,13 +29,49 @@ def __init__(self, parent):
self.sig_strm_idx = -1
def handle_stream_expanded(self, name):
+ if not self.stream_params:
+ return # No streams available (e.g., security mismatch)
stream_names = [_['metadata']['name'] for _ in self.stream_params]
- self.sig_strm_idx = stream_names.index(name)
- self.changedStream.emit()
+ if name in stream_names:
+ self.sig_strm_idx = stream_names.index(name)
+ self.changedStream.emit()
def update_streams(self):
if not self.streams:
self.streams = pylsl.resolve_streams(wait_time=1.0)
+
+ if not self.streams:
+ return # No streams found
+
+ # Check for security mismatches before creating inlets
+ local_security = lsl_security_helper.local_security_enabled()
+ mismatch_streams = []
+ for stream in self.streams:
+ stream_secure = stream.security_enabled()
+ if stream_secure != local_security:
+ suffix = " (insecure)" if not stream_secure else ""
+ mismatch_streams.append(stream.name() + suffix)
+
+ # Build metadata for all streams (to show in UI)
+ all_metadata = []
+ for stream in self.streams:
+ all_metadata.append({
+ "name": stream.name(),
+ "ch_count": stream.channel_count(),
+ "ch_format": stream.channel_format(),
+ "srate": stream.nominal_srate(),
+ "security_enabled": stream.security_enabled(),
+ "security_fingerprint": stream.security_fingerprint()
+ })
+
+ # Always emit stream names to show in UI (even if mismatched)
+ self.updateStreamNames.emit(all_metadata, 0 if all_metadata else -1)
+
+ if mismatch_streams:
+ self.securityMismatch.emit(mismatch_streams, local_security)
+ self.streams = [] # Clear so user can retry with Update button
+ return # Don't create inlets for mismatched streams
+
for k, stream in enumerate(self.streams):
n = stream.name()
stream_params = copy.deepcopy(self.def_stream_parms)
@@ -41,7 +79,9 @@ def update_streams(self):
"name": n,
"ch_count": stream.channel_count(),
"ch_format": stream.channel_format(),
- "srate": stream.nominal_srate()
+ "srate": stream.nominal_srate(),
+ "security_enabled": stream.security_enabled(),
+ "security_fingerprint": stream.security_fingerprint()
})
# ch = stream.desc().child("channels").child("channel")
# for ch_ix in range(stream.channel_count()):
@@ -63,7 +103,6 @@ def update_streams(self):
stream_params['downSamplingBuffer'] = [[0] * int(stream.channel_count())] * n_buff
self.stream_params.append(stream_params)
- self.updateStreamNames.emit([_['metadata'] for _ in self.stream_params], self.sig_strm_idx)
self.start()
def run(self):
@@ -110,6 +149,41 @@ def __init__(self, widget):
self.dataTr = DataThread(self)
self.dataTr.sendData.connect(self.get_data)
self.dataTr.changedStream.connect(self.reset)
+ self.dataTr.securityMismatch.connect(self.show_security_mismatch)
+
+ def show_security_mismatch(self, stream_names, local_has_security):
+ """Show a dialog when there's a security mismatch between streams and local config."""
+ stream_list = "
".join(
+ f" • {name}"
+ for name in stream_names
+ )
+
+ if not local_has_security:
+ error_msg = (
+ "The following streams require security, but SigVisualizer does not have "
+ "security credentials configured:
" + stream_list +
+ "
To fix this:
"
+ " 1. Run 'lsl-keygen' to generate credentials, or
"
+ " 2. Import shared credentials from an authorized device
"
+ "Cannot visualize streams with "
+ "mismatched security settings."
+ )
+ else:
+ error_msg = (
+ "Security mismatch detected for the following streams:
" +
+ stream_list +
+ "
All devices must have the same security configuration "
+ "(either all secure or all insecure).
"
+ "Cannot visualize streams with "
+ "mismatched security settings."
+ )
+
+ msg_box = QMessageBox(self)
+ msg_box.setWindowTitle("Security Mismatch")
+ msg_box.setIcon(QMessageBox.Critical)
+ msg_box.setTextFormat(Qt.RichText)
+ msg_box.setText(error_msg)
+ msg_box.exec_()
def reset(self):
self.chunk_idx = 0
@@ -210,17 +284,17 @@ def paintEvent(self, event):
chan_offset = (ch_idx + 0.5) * self.channelHeight
if self.lastY:
if not math.isnan(self.lastY[ch_idx]) and not math.isnan(self.dataBuffer[0][ch_idx]):
- painter.drawLine(x0 - self.px_per_samp,
- -self.lastY[ch_idx] + chan_offset,
- x0,
- -self.dataBuffer[0][ch_idx] + chan_offset)
+ painter.drawLine(int(x0 - self.px_per_samp),
+ int(-self.lastY[ch_idx] + chan_offset),
+ int(x0),
+ int(-self.dataBuffer[0][ch_idx] + chan_offset))
for m in range(n_samps - 1):
if not math.isnan(self.dataBuffer[m][ch_idx]) and not math.isnan(self.dataBuffer[m+1][ch_idx]):
- painter.drawLine(x0 + m * self.px_per_samp,
- -self.dataBuffer[m][ch_idx] + chan_offset,
- x0 + (m + 1) * self.px_per_samp,
- -self.dataBuffer[m+1][ch_idx] + chan_offset)
+ painter.drawLine(int(x0 + m * self.px_per_samp),
+ int(-self.dataBuffer[m][ch_idx] + chan_offset),
+ int(x0 + (m + 1) * self.px_per_samp),
+ int(-self.dataBuffer[m+1][ch_idx] + chan_offset))
# Reset for next iteration
self.chunk_idx = (self.chunk_idx + 1) % self.dataTr.chunksPerScreen # For next iteration
@@ -230,6 +304,6 @@ def paintEvent(self, event):
if self.markerBuffer is not None:
painter.setPen(QPen(Qt.red))
for px, mrk in self.markerBuffer:
- painter.drawLine(px, 0, px, self.height())
- painter.drawText(px - 2 * self.px_per_samp, 0.95 * self.height(), mrk)
+ painter.drawLine(int(px), 0, int(px), self.height())
+ painter.drawText(int(px - 2 * self.px_per_samp), int(0.95 * self.height()), mrk)
self.markerBuffer = None
diff --git a/sigvisualizer.py b/sigvisualizer.py
index e31062b..788b7b0 100644
--- a/sigvisualizer.py
+++ b/sigvisualizer.py
@@ -36,17 +36,21 @@ def __init__(self):
self.stream_expanded.connect(self.ui.widget.dataTr.handle_stream_expanded)
def tree_item_expanded(self, widget_item):
- name = widget_item.text(0)
+ display_name = widget_item.text(0)
+ # Remove lock emoji prefix if present for matching
+ name = display_name.lstrip("\U0001F512 ")
for it_ix in range(self.ui.treeWidget.topLevelItemCount()):
item = self.ui.treeWidget.topLevelItem(it_ix)
- if item.text(0) != name:
+ if item.text(0) != display_name:
item.setExpanded(False)
self.stream_expanded.emit(name)
def update_metadata_widget(self, metadata, default_idx):
for s_ix, s_meta in enumerate(metadata):
item = QTreeWidgetItem(self.ui.treeWidget)
- item.setText(0, s_meta["name"])
+ # Show lock emoji for encrypted streams
+ prefix = "\U0001F512 " if s_meta.get("security_enabled", False) else ""
+ item.setText(0, prefix + s_meta["name"])
for m in range(s_meta["ch_count"]):
channel_item = QTreeWidgetItem(item)
@@ -57,8 +61,11 @@ def update_metadata_widget(self, metadata, default_idx):
self.ui.treeWidget.addTopLevelItem(item)
self.ui.treeWidget.setAnimated(True)
+ # Show sampling rate and security status
+ security_status = "Encrypted" if metadata[default_idx].get("security_enabled", False) else "Unencrypted"
self.statusBar.showMessage(
- "Sampling rate: {}Hz".format(metadata[default_idx]["srate"]))
+ "Sampling rate: {}Hz | Security: {}".format(
+ metadata[default_idx]["srate"], security_status))
def toggle_panel(self):
if self.panelHidden: