Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions lsl_security_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Helper module to add security API support to pylsl.

This module ensures security_enabled() and security_fingerprint() methods
are available on pylsl.StreamInfo, and provides local_security_enabled()
to check if the local device has security credentials configured.

Either uses native pylsl support (pylsl >= TBD with secure-lsl-support branch)
or patches the class.

Usage:
import lsl_security_helper # Import before using pylsl
import pylsl

# Check local security status
if lsl_security_helper.local_security_enabled():
print("Local security credentials are configured")

streams = pylsl.resolve_streams()
for s in streams:
if s.security_enabled():
print(f"Stream {s.name()} is encrypted: {s.security_fingerprint()}")
"""

import ctypes
import pylsl
from pylsl.lib import lib


def _check_native_support():
"""Check if pylsl has native security_enabled/security_fingerprint methods."""
return (
hasattr(pylsl.StreamInfo, "security_enabled")
and hasattr(pylsl.StreamInfo, "security_fingerprint")
and callable(getattr(pylsl.StreamInfo, "security_enabled", None))
)


def _setup_security_functions():
"""Set up ctypes declarations for security functions."""
try:
# lsl_get_security_enabled returns int32_t (1=enabled, 0=disabled, -1=error)
lib.lsl_get_security_enabled.restype = ctypes.c_int32
lib.lsl_get_security_enabled.argtypes = [ctypes.c_void_p]

# lsl_get_security_fingerprint returns const char*
lib.lsl_get_security_fingerprint.restype = ctypes.c_char_p
lib.lsl_get_security_fingerprint.argtypes = [ctypes.c_void_p]

return True
except AttributeError:
# Functions not available in this liblsl build
return False


def _setup_local_security_function():
"""Set up ctypes declaration for local security enabled check."""
try:
# lsl_local_security_enabled returns int32_t (1=enabled, 0=disabled)
lib.lsl_local_security_enabled.restype = ctypes.c_int32
lib.lsl_local_security_enabled.argtypes = []
return True
except AttributeError:
return False


# Track if local security function is available
_local_security_available = _setup_local_security_function()


def local_security_enabled() -> bool:
"""Check if the local device has security credentials configured.

Returns True if security credentials are loaded and enabled locally,
False otherwise (either not a secure build or no credentials configured).
"""
if not _local_security_available:
return False
try:
return lib.lsl_local_security_enabled() == 1
except Exception:
return False


def _security_enabled(self) -> bool:
"""Check if the stream has security/encryption enabled.

Returns True if security is enabled, False otherwise.
"""
try:
result = lib.lsl_get_security_enabled(self.obj)
return result == 1
except Exception:
return False


def _security_fingerprint(self) -> str:
"""Get the security fingerprint of the stream's public key.

Returns the fingerprint string (SHA256:xxxx...) or empty string if
security is not enabled.
"""
try:
result = lib.lsl_get_security_fingerprint(self.obj)
if result:
return result.decode("utf-8")
return ""
except Exception:
return ""


# Check for native support first (pylsl with secure-lsl-support branch)
if _check_native_support():
# pylsl already has native support; no patching needed
_security_available = True
else:
# No native support; try to patch StreamInfo class
_security_available = _setup_security_functions()

if _security_available:
pylsl.StreamInfo.security_enabled = _security_enabled
pylsl.StreamInfo.security_fingerprint = _security_fingerprint
else:
# Provide stub methods that always return False/empty
pylsl.StreamInfo.security_enabled = lambda self: False
pylsl.StreamInfo.security_fingerprint = lambda self: ""
print("Warning: Security API not available in liblsl. Using stub methods.")
104 changes: 89 additions & 15 deletions paintwidget.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from PyQt5.QtCore import QThread, Qt, pyqtSignal
from PyQt5.QtGui import QPalette, QPainter, QPen
from PyQt5.QtWidgets import QWidget
from PyQt5.QtWidgets import QWidget, QMessageBox
import lsl_security_helper # Adds security methods to pylsl
import pylsl
import math
import copy
Expand All @@ -13,6 +14,7 @@ class DataThread(QThread):
updateStreamNames = pyqtSignal(list, int)
sendData = pyqtSignal(list, list, list, list)
changedStream = pyqtSignal()
securityMismatch = pyqtSignal(list, bool) # (stream_names, local_has_security)

def_stream_parms = {'chunk_idx': 0, 'metadata': {}, 'srate': None, 'chunkSize': None,
'downSampling': None, 'downSamplingFactor': None, 'downSamplingBuffer': None,
Expand All @@ -27,21 +29,59 @@ def __init__(self, parent):
self.sig_strm_idx = -1

def handle_stream_expanded(self, name):
if not self.stream_params:
return # No streams available (e.g., security mismatch)
stream_names = [_['metadata']['name'] for _ in self.stream_params]
self.sig_strm_idx = stream_names.index(name)
self.changedStream.emit()
if name in stream_names:
self.sig_strm_idx = stream_names.index(name)
self.changedStream.emit()

def update_streams(self):
if not self.streams:
self.streams = pylsl.resolve_streams(wait_time=1.0)

if not self.streams:
return # No streams found

# Check for security mismatches before creating inlets
local_security = lsl_security_helper.local_security_enabled()
mismatch_streams = []
for stream in self.streams:
stream_secure = stream.security_enabled()
if stream_secure != local_security:
suffix = " (insecure)" if not stream_secure else ""
mismatch_streams.append(stream.name() + suffix)

# Build metadata for all streams (to show in UI)
all_metadata = []
for stream in self.streams:
all_metadata.append({
"name": stream.name(),
"ch_count": stream.channel_count(),
"ch_format": stream.channel_format(),
"srate": stream.nominal_srate(),
"security_enabled": stream.security_enabled(),
"security_fingerprint": stream.security_fingerprint()
})

# Always emit stream names to show in UI (even if mismatched)
self.updateStreamNames.emit(all_metadata, 0 if all_metadata else -1)

if mismatch_streams:
self.securityMismatch.emit(mismatch_streams, local_security)
self.streams = [] # Clear so user can retry with Update button
return # Don't create inlets for mismatched streams

for k, stream in enumerate(self.streams):
n = stream.name()
stream_params = copy.deepcopy(self.def_stream_parms)
stream_params['metadata'].update({
"name": n,
"ch_count": stream.channel_count(),
"ch_format": stream.channel_format(),
"srate": stream.nominal_srate()
"srate": stream.nominal_srate(),
"security_enabled": stream.security_enabled(),
"security_fingerprint": stream.security_fingerprint()
})
# ch = stream.desc().child("channels").child("channel")
# for ch_ix in range(stream.channel_count()):
Expand All @@ -63,7 +103,6 @@ def update_streams(self):
stream_params['downSamplingBuffer'] = [[0] * int(stream.channel_count())] * n_buff
self.stream_params.append(stream_params)

self.updateStreamNames.emit([_['metadata'] for _ in self.stream_params], self.sig_strm_idx)
self.start()

def run(self):
Expand Down Expand Up @@ -110,6 +149,41 @@ def __init__(self, widget):
self.dataTr = DataThread(self)
self.dataTr.sendData.connect(self.get_data)
self.dataTr.changedStream.connect(self.reset)
self.dataTr.securityMismatch.connect(self.show_security_mismatch)

def show_security_mismatch(self, stream_names, local_has_security):
"""Show a dialog when there's a security mismatch between streams and local config."""
stream_list = "<br>".join(
f"&nbsp;&nbsp;&bull; <span style='color: #0066cc;'>{name}</span>"
for name in stream_names
)

if not local_has_security:
error_msg = (
"The following streams require security, but SigVisualizer does not have "
"security credentials configured:<br><br>" + stream_list +
"<br><br>To fix this:<br>"
"&nbsp;&nbsp;1. Run 'lsl-keygen' to generate credentials, or<br>"
"&nbsp;&nbsp;2. Import shared credentials from an authorized device<br><br>"
"<span style='color: red; font-weight: bold;'>Cannot visualize streams with "
"mismatched security settings.</span>"
)
else:
error_msg = (
"Security mismatch detected for the following streams:<br><br>" +
stream_list +
"<br><br>All devices must have the same security configuration "
"(either all secure or all insecure).<br><br>"
"<span style='color: red; font-weight: bold;'>Cannot visualize streams with "
"mismatched security settings.</span>"
)

msg_box = QMessageBox(self)
msg_box.setWindowTitle("Security Mismatch")
msg_box.setIcon(QMessageBox.Critical)
msg_box.setTextFormat(Qt.RichText)
msg_box.setText(error_msg)
msg_box.exec_()

def reset(self):
self.chunk_idx = 0
Expand Down Expand Up @@ -210,17 +284,17 @@ def paintEvent(self, event):
chan_offset = (ch_idx + 0.5) * self.channelHeight
if self.lastY:
if not math.isnan(self.lastY[ch_idx]) and not math.isnan(self.dataBuffer[0][ch_idx]):
painter.drawLine(x0 - self.px_per_samp,
-self.lastY[ch_idx] + chan_offset,
x0,
-self.dataBuffer[0][ch_idx] + chan_offset)
painter.drawLine(int(x0 - self.px_per_samp),
int(-self.lastY[ch_idx] + chan_offset),
int(x0),
int(-self.dataBuffer[0][ch_idx] + chan_offset))

for m in range(n_samps - 1):
if not math.isnan(self.dataBuffer[m][ch_idx]) and not math.isnan(self.dataBuffer[m+1][ch_idx]):
painter.drawLine(x0 + m * self.px_per_samp,
-self.dataBuffer[m][ch_idx] + chan_offset,
x0 + (m + 1) * self.px_per_samp,
-self.dataBuffer[m+1][ch_idx] + chan_offset)
painter.drawLine(int(x0 + m * self.px_per_samp),
int(-self.dataBuffer[m][ch_idx] + chan_offset),
int(x0 + (m + 1) * self.px_per_samp),
int(-self.dataBuffer[m+1][ch_idx] + chan_offset))

# Reset for next iteration
self.chunk_idx = (self.chunk_idx + 1) % self.dataTr.chunksPerScreen # For next iteration
Expand All @@ -230,6 +304,6 @@ def paintEvent(self, event):
if self.markerBuffer is not None:
painter.setPen(QPen(Qt.red))
for px, mrk in self.markerBuffer:
painter.drawLine(px, 0, px, self.height())
painter.drawText(px - 2 * self.px_per_samp, 0.95 * self.height(), mrk)
painter.drawLine(int(px), 0, int(px), self.height())
painter.drawText(int(px - 2 * self.px_per_samp), int(0.95 * self.height()), mrk)
self.markerBuffer = None
15 changes: 11 additions & 4 deletions sigvisualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ def __init__(self):
self.stream_expanded.connect(self.ui.widget.dataTr.handle_stream_expanded)

def tree_item_expanded(self, widget_item):
name = widget_item.text(0)
display_name = widget_item.text(0)
# Remove lock emoji prefix if present for matching
name = display_name.lstrip("\U0001F512 ")
for it_ix in range(self.ui.treeWidget.topLevelItemCount()):
item = self.ui.treeWidget.topLevelItem(it_ix)
if item.text(0) != name:
if item.text(0) != display_name:
item.setExpanded(False)
self.stream_expanded.emit(name)

def update_metadata_widget(self, metadata, default_idx):
for s_ix, s_meta in enumerate(metadata):
item = QTreeWidgetItem(self.ui.treeWidget)
item.setText(0, s_meta["name"])
# Show lock emoji for encrypted streams
prefix = "\U0001F512 " if s_meta.get("security_enabled", False) else ""
item.setText(0, prefix + s_meta["name"])

for m in range(s_meta["ch_count"]):
channel_item = QTreeWidgetItem(item)
Expand All @@ -57,8 +61,11 @@ def update_metadata_widget(self, metadata, default_idx):
self.ui.treeWidget.addTopLevelItem(item)

self.ui.treeWidget.setAnimated(True)
# Show sampling rate and security status
security_status = "Encrypted" if metadata[default_idx].get("security_enabled", False) else "Unencrypted"
self.statusBar.showMessage(
"Sampling rate: {}Hz".format(metadata[default_idx]["srate"]))
"Sampling rate: {}Hz | Security: {}".format(
metadata[default_idx]["srate"], security_status))

def toggle_panel(self):
if self.panelHidden:
Expand Down