diff --git a/unilabos/devices/SII/cameraSII/__init__.py b/unilabos/devices/SII/cameraSII/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/SII/cameraSII/cameraUSB.py b/unilabos/devices/SII/cameraSII/cameraUSB.py new file mode 100644 index 00000000..505cc94d --- /dev/null +++ b/unilabos/devices/SII/cameraSII/cameraUSB.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python3 +import asyncio +import json +import subprocess +import sys +import threading +from typing import Optional, Dict, Any + +import requests +import websockets + + +class CameraController: + """ + Uni-Lab-OS 摄像头驱动(Linux USB 摄像头版,无 PTZ) + + - WebSocket 信令:signal_backend_url 连接到后端 + 例如: wss://sciol.ac.cn/api/realtime/signal/host/ + - 媒体服务器:RTMP 推流到 rtmp_url;WebRTC offer 转发到 SRS 的 webrtc_api + - 视频源:本地 USB 摄像头(V4L2,默认 /dev/video0) + """ + + def __init__( + self, + host_id: str = "demo-host", + signal_backend_url: str = "wss://sciol.ac.cn/api/realtime/signal/host", + rtmp_url: str = "rtmp://srs.sciol.ac.cn:4499/live/camera-01", + webrtc_api: str = "https://srs.sciol.ac.cn/rtc/v1/play/", + webrtc_stream_url: str = "webrtc://srs.sciol.ac.cn:4500/live/camera-01", + video_device: str = "/dev/video0", + width: int = 1280, + height: int = 720, + fps: int = 30, + video_bitrate: str = "1500k", + audio_device: Optional[str] = None, # 比如 "hw:1,0",没有音频就保持 None + audio_bitrate: str = "64k", + auto_start: bool = False, + ): + self.host_id = host_id + + # 拼接最终 WebSocket URL:.../host/ + signal_backend_url = signal_backend_url.rstrip("/") + if not signal_backend_url.endswith("/host"): + signal_backend_url = signal_backend_url + "/host" + self.signal_backend_url = f"{signal_backend_url}/{host_id}" + + # 媒体服务器配置 + self.rtmp_url = rtmp_url + self.webrtc_api = webrtc_api + self.webrtc_stream_url = webrtc_stream_url + + # 本地采集配置 + self.video_device = video_device + self.width = int(width) + self.height = int(height) + self.fps = int(fps) + self.video_bitrate = video_bitrate + self.audio_device = audio_device + self.audio_bitrate = audio_bitrate + + # 运行时状态(保证所有 status 字段存在) + self._ws: Optional[object] = None + self._ffmpeg_process: Optional[subprocess.Popen] = None + + self._running: bool = False + self._websocket_connected: bool = False + + self._loop_task: Optional[asyncio.Future] = None + + # 事件循环 & 线程 + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_thread: Optional[threading.Thread] = None + + if auto_start: + try: + self.start() + except Exception as e: + print(f"[CameraController] __init__ auto start failed: {e}", file=sys.stderr) + + # --------------------------------------------------------------------- + # 对外方法 + # --------------------------------------------------------------------- + + def start(self, config: Optional[Dict[str, Any]] = None): + if self._running: + return {"status": "already_running", "host_id": self.host_id} + + # 启动前复位状态(避免上次残留) + self._running = True + self._websocket_connected = False + + # 应用 config 覆盖(如果有) + if config: + cfg_host_id = config.get("host_id") + if cfg_host_id: + self.host_id = cfg_host_id + + signal_backend_url = config.get("signal_backend_url") + if signal_backend_url: + signal_backend_url = signal_backend_url.rstrip("/") + if not signal_backend_url.endswith("/host"): + signal_backend_url = signal_backend_url + "/host" + self.signal_backend_url = f"{signal_backend_url}/{self.host_id}" + + self.rtmp_url = config.get("rtmp_url", self.rtmp_url) + self.webrtc_api = config.get("webrtc_api", self.webrtc_api) + self.webrtc_stream_url = config.get("webrtc_stream_url", self.webrtc_stream_url) + + self.video_device = config.get("video_device", self.video_device) + self.width = int(config.get("width", self.width)) + self.height = int(config.get("height", self.height)) + self.fps = int(config.get("fps", self.fps)) + self.video_bitrate = config.get("video_bitrate", self.video_bitrate) + self.audio_device = config.get("audio_device", self.audio_device) + self.audio_bitrate = config.get("audio_bitrate", self.audio_bitrate) + + print("[CameraController] start(): starting FFmpeg streaming...", file=sys.stderr) + self._start_ffmpeg() + + self._loop = asyncio.new_event_loop() + + def loop_runner(loop: asyncio.AbstractEventLoop): + asyncio.set_event_loop(loop) + try: + loop.run_forever() + except Exception as e: + print(f"[CameraController] event loop error: {e}", file=sys.stderr) + + self._loop_thread = threading.Thread(target=loop_runner, args=(self._loop,), daemon=True) + self._loop_thread.start() + + self._loop_task = asyncio.run_coroutine_threadsafe(self._run_main_loop(), self._loop) + + return { + "status": "started", + "host_id": self.host_id, + "signal_backend_url": self.signal_backend_url, + "rtmp_url": self.rtmp_url, + "webrtc_api": self.webrtc_api, + "webrtc_stream_url": self.webrtc_stream_url, + "video_device": self.video_device, + "width": self.width, + "height": self.height, + "fps": self.fps, + "video_bitrate": self.video_bitrate, + "audio_device": self.audio_device, + "running": self.running, + "websocket_connected": self.websocket_connected, + "ffmpeg_running": self.ffmpeg_running, + } + + def stop(self) -> Dict[str, Any]: + # 立刻置状态,便于状态发布端读到正确值 + self._running = False + self._websocket_connected = False + + # 先取消主任务(让 ws connect/sleep 尽快退出) + if self._loop_task is not None and not self._loop_task.done(): + self._loop_task.cancel() + + # 停止推流 + self._stop_ffmpeg() + + # 关闭 WebSocket(在 loop 中执行) + if self._ws and self._loop is not None: + + async def close_ws(): + try: + await self._ws.close() + except Exception as e: + print(f"[CameraController] error closing WebSocket: {e}", file=sys.stderr) + + try: + asyncio.run_coroutine_threadsafe(close_ws(), self._loop) + except Exception: + pass + + # 停止事件循环 + if self._loop is not None: + try: + self._loop.call_soon_threadsafe(self._loop.stop) + except Exception as e: + print(f"[CameraController] error stopping loop: {e}", file=sys.stderr) + + # 等待线程退出 + if self._loop_thread is not None: + try: + self._loop_thread.join(timeout=5) + except Exception as e: + print(f"[CameraController] error joining loop thread: {e}", file=sys.stderr) + + self._ws = None + self._loop_task = None + self._loop = None + self._loop_thread = None + + return { + "status": "stopped", + "host_id": self.host_id, + "running": self.running, + "websocket_connected": self.websocket_connected, + "ffmpeg_running": self.ffmpeg_running, + } + + def get_status(self) -> Dict[str, Any]: + ws_closed = None + if self._ws is not None: + ws_closed = getattr(self._ws, "closed", None) + + if ws_closed is None: + websocket_connected = self._ws is not None + else: + websocket_connected = (self._ws is not None) and (not ws_closed) + + return { + "host_id": self.host_id, + "running": self.running, + "websocket_connected": websocket_connected, + "ffmpeg_running": self.ffmpeg_running, + "signal_backend_url": self.signal_backend_url, + "rtmp_url": self.rtmp_url, + "video_device": self.video_device, + "width": self.width, + "height": self.height, + "fps": self.fps, + "video_bitrate": self.video_bitrate, + } + + # --------------------------------------------------------------------- + # WebSocket / 信令 + # --------------------------------------------------------------------- + + async def _run_main_loop(self): + print("[CameraController] main loop started", file=sys.stderr) + try: + while self._running: + try: + async with websockets.connect(self.signal_backend_url) as ws: + self._ws = ws + self._websocket_connected = True + print(f"[CameraController] WebSocket connected: {self.signal_backend_url}", file=sys.stderr) + + try: + await self._recv_loop() + finally: + self._websocket_connected = False + self._ws = None + + except asyncio.CancelledError: + raise + except Exception as e: + self._websocket_connected = False + self._ws = None + if self._running: + print(f"[CameraController] WebSocket connection error: {e}", file=sys.stderr) + await asyncio.sleep(3) + except asyncio.CancelledError: + pass + finally: + self._websocket_connected = False + self._ws = None + print("[CameraController] main loop exited", file=sys.stderr) + + async def _recv_loop(self): + assert self._ws is not None + ws = self._ws + + async for message in ws: + try: + data = json.loads(message) + except json.JSONDecodeError: + print(f"[CameraController] non-JSON message: {message}", file=sys.stderr) + continue + + try: + await self._handle_message(data) + except Exception as e: + print(f"[CameraController] error handling message {data}: {e}", file=sys.stderr) + + async def _handle_message(self, data: Dict[str, Any]): + cmd = data.get("command") + + if cmd == "start_stream": + self._start_ffmpeg() + return + + if cmd == "stop_stream": + self._stop_ffmpeg() + return + + if data.get("type") == "offer": + offer_sdp = data.get("sdp", "") + camera_id = data.get("cameraId", "camera-01") + + answer_sdp = await self._handle_webrtc_offer(offer_sdp) + + if self._ws: + answer_payload = { + "type": "answer", + "sdp": answer_sdp, + "cameraId": camera_id, + "hostId": self.host_id, + } + await self._ws.send(json.dumps(answer_payload)) + + # --------------------------------------------------------------------- + # FFmpeg 推流(V4L2 USB 摄像头) + # --------------------------------------------------------------------- + + def _start_ffmpeg(self): + if self._ffmpeg_process and self._ffmpeg_process.poll() is None: + return + + video_size = f"{self.width}x{self.height}" + + cmd = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "warning", + "-f", + "v4l2", + "-framerate", + str(self.fps), + "-video_size", + video_size, + "-i", + self.video_device, + ] + + if self.audio_device: + cmd += [ + "-f", + "alsa", + "-i", + self.audio_device, + "-c:a", + "aac", + "-b:a", + self.audio_bitrate, + "-ar", + "44100", + "-ac", + "1", + ] + else: + cmd += ["-an"] + + cmd += [ + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-profile:v", + "baseline", + "-pix_fmt", + "yuv420p", + "-b:v", + self.video_bitrate, + "-maxrate", + self.video_bitrate, + "-bufsize", + "2M", + "-g", + str(max(self.fps, 10)), + "-keyint_min", + str(max(self.fps, 10)), + "-sc_threshold", + "0", + "-x264-params", + "bframes=0", + "-f", + "flv", + self.rtmp_url, + ] + + print(f"[CameraController] starting FFmpeg: {' '.join(cmd)}", file=sys.stderr) + + try: + self._ffmpeg_process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=sys.stderr, + shell=False, + ) + except Exception as e: + self._ffmpeg_process = None + print(f"[CameraController] failed to start FFmpeg: {e}", file=sys.stderr) + + def _stop_ffmpeg(self): + proc = self._ffmpeg_process + if proc and proc.poll() is None: + try: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + except Exception as e: + print(f"[CameraController] error stopping FFmpeg: {e}", file=sys.stderr) + self._ffmpeg_process = None + + # --------------------------------------------------------------------- + # WebRTC offer -> SRS + # --------------------------------------------------------------------- + + async def _handle_webrtc_offer(self, offer_sdp: str) -> str: + payload = { + "api": self.webrtc_api, + "streamurl": self.webrtc_stream_url, + "sdp": offer_sdp, + } + headers = {"Content-Type": "application/json"} + + def _do_post(): + return requests.post(self.webrtc_api, json=payload, headers=headers, timeout=10) + + loop = asyncio.get_running_loop() + resp = await loop.run_in_executor(None, _do_post) + + resp.raise_for_status() + data = resp.json() + answer_sdp = data.get("sdp", "") + if not answer_sdp: + raise RuntimeError(f"empty SDP from media server: {data}") + return answer_sdp + + # --------------------------------------------------------------------- + # status fields for UniLabOS publisher(关键:这些名字要和 YAML 里的 status_types 对上) + # --------------------------------------------------------------------- + + @property + def running(self) -> bool: + return bool(self._running) + + @property + def websocket_connected(self) -> bool: + return bool(self._websocket_connected) + + @property + def ffmpeg_running(self) -> bool: + p = self._ffmpeg_process + try: + return p is not None and p.poll() is None + except Exception: + return False + + +if __name__ == "__main__": + # 直接运行用于手动测试 + c = CameraController( + host_id="demo-host", + video_device="/dev/video0", + width=1280, + height=720, + fps=30, + video_bitrate="1500k", + audio_device=None, + auto_start=False, + ) + try: + while True: + # 这里必须 await,否则啥也不会睡 + asyncio.run(asyncio.sleep(1)) + except KeyboardInterrupt: + c.stop() \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/cameraUSB_test.py b/unilabos/devices/SII/cameraSII/cameraUSB_test.py new file mode 100644 index 00000000..30b0ac24 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/cameraUSB_test.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import time +import json + +from cameraUSB import CameraController + + +def main(): + # 按你的实际情况改 + cfg = dict( + host_id="demo-host", + signal_backend_url="wss://sciol.ac.cn/api/realtime/signal/host", + rtmp_url="rtmp://srs.sciol.ac.cn:4499/live/camera-01", + webrtc_api="https://srs.sciol.ac.cn/rtc/v1/play/", + webrtc_stream_url="webrtc://srs.sciol.ac.cn:4500/live/camera-01", + video_device="/dev/video0", + width=1280, + height=720, + fps=30, + video_bitrate="1500k", + audio_device=None, + ) + + c = CameraController(**cfg) + + # 可选:如果你不想依赖 __init__ 自动 start,可以这样显式调用: + # c = CameraController(host_id=cfg["host_id"]) + # c.start(cfg) + + run_seconds = 30 # 测试运行时长 + t0 = time.time() + + try: + while True: + st = c.get_status() + print(json.dumps(st, ensure_ascii=False, indent=2)) + + if time.time() - t0 >= run_seconds: + break + + time.sleep(2) + except KeyboardInterrupt: + print("Interrupted, stopping...") + finally: + print("Stopping controller...") + c.stop() + print("Done.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/cameraWifi.py b/unilabos/devices/SII/cameraSII/cameraWifi.py new file mode 100644 index 00000000..d3797c75 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/cameraWifi.py @@ -0,0 +1,720 @@ +#!/usr/bin/env python3 +import asyncio +import json +import subprocess +import sys +import threading +from typing import Optional, Dict, Any +import logging + +import requests +import websockets + +logging.getLogger("zeep").setLevel(logging.WARNING) +logging.getLogger("zeep.xsd.schema").setLevel(logging.WARNING) +logging.getLogger("zeep.xsd.schema.schema").setLevel(logging.WARNING) +from onvif import ONVIFCamera # 新增:ONVIF PTZ 控制 + + +# ======================= 独立的 PTZController ======================= +class PTZController: + def __init__(self, host: str, port: int, user: str, password: str): + """ + :param host: 摄像机 IP 或域名(和 RTSP 的一样即可) + :param port: ONVIF 端口(多数为 80,看你的设备) + :param user: 摄像机用户名 + :param password: 摄像机密码 + """ + self.host = host + self.port = port + self.user = user + self.password = password + + self.cam: Optional[ONVIFCamera] = None + self.media_service = None + self.ptz_service = None + self.profile = None + + def connect(self) -> bool: + """ + 建立 ONVIF 连接并初始化 PTZ 能力,失败返回 False(不抛异常) + Note: 首先 pip install onvif-zeep + """ + try: + self.cam = ONVIFCamera(self.host, self.port, self.user, self.password) + self.media_service = self.cam.create_media_service() + self.ptz_service = self.cam.create_ptz_service() + profiles = self.media_service.GetProfiles() + if not profiles: + print("[PTZ] No media profiles found on camera.", file=sys.stderr) + return False + self.profile = profiles[0] + return True + except Exception as e: + print(f"[PTZ] Failed to init ONVIF PTZ: {e}", file=sys.stderr) + return False + + def _continuous_move(self, pan: float, tilt: float, zoom: float, duration: float) -> bool: + """ + 连续移动一段时间(秒),之后自动停止。 + 此函数为阻塞模式:只有在 Stop 调用结束后,才返回 True/False。 + """ + if not self.ptz_service or not self.profile: + print("[PTZ] _continuous_move: ptz_service or profile not ready", file=sys.stderr) + return False + + # 进入前先强行停一下,避免前一次残留动作 + self._force_stop() + + req = self.ptz_service.create_type("ContinuousMove") + req.ProfileToken = self.profile.token + + req.Velocity = { + "PanTilt": {"x": pan, "y": tilt}, + "Zoom": {"x": zoom}, + } + + try: + print(f"[PTZ] ContinuousMove start: pan={pan}, tilt={tilt}, zoom={zoom}, duration={duration}", file=sys.stderr) + self.ptz_service.ContinuousMove(req) + except Exception as e: + print(f"[PTZ] ContinuousMove failed: {e}", file=sys.stderr) + return False + + # 阻塞等待:这里决定“运动时间” + import time + wait_seconds = max(2 * duration, 0.0) + time.sleep(wait_seconds) + + # 运动完成后强制停止 + return self._force_stop() + + def stop(self) -> bool: + """ + 阻塞调用 Stop(带重试),成功 True,失败 False。 + """ + return self._force_stop() + + # ------- 对外动作接口(给 CameraController 调用) ------- + # 所有接口都为“阻塞模式”:只有在运动 + Stop 完成后才返回 True/False + + def move_up(self, speed: float = 0.5, duration: float = 1.0) -> bool: + print(f"[PTZ] move_up called, speed={speed}, duration={duration}", file=sys.stderr) + return self._continuous_move(pan=0.0, tilt=+speed, zoom=0.0, duration=duration) + + def move_down(self, speed: float = 0.5, duration: float = 1.0) -> bool: + print(f"[PTZ] move_down called, speed={speed}, duration={duration}", file=sys.stderr) + return self._continuous_move(pan=0.0, tilt=-speed, zoom=0.0, duration=duration) + + def move_left(self, speed: float = 0.2, duration: float = 1.0) -> bool: + print(f"[PTZ] move_left called, speed={speed}, duration={duration}", file=sys.stderr) + return self._continuous_move(pan=-speed, tilt=0.0, zoom=0.0, duration=duration) + + def move_right(self, speed: float = 0.2, duration: float = 1.0) -> bool: + print(f"[PTZ] move_right called, speed={speed}, duration={duration}", file=sys.stderr) + return self._continuous_move(pan=+speed, tilt=0.0, zoom=0.0, duration=duration) + + # ------- 占位的变倍接口(当前设备不支持) ------- + def zoom_in(self, speed: float = 0.2, duration: float = 1.0) -> bool: + """ + 当前设备不支持变倍;保留方法只是避免上层调用时报错。 + """ + print("[PTZ] zoom_in is disabled for this device.", file=sys.stderr) + return False + + def zoom_out(self, speed: float = 0.2, duration: float = 1.0) -> bool: + """ + 当前设备不支持变倍;保留方法只是避免上层调用时报错。 + """ + print("[PTZ] zoom_out is disabled for this device.", file=sys.stderr) + return False + + def _force_stop(self, retries: int = 3, delay: float = 0.1) -> bool: + """ + 尝试多次调用 Stop,作为“强制停止”手段。 + :param retries: 重试次数 + :param delay: 每次重试间隔(秒) + """ + if not self.ptz_service or not self.profile: + print("[PTZ] _force_stop: ptz_service or profile not ready", file=sys.stderr) + return False + + import time + last_error = None + for i in range(retries): + try: + print(f"[PTZ] _force_stop: calling Stop(), attempt={i+1}", file=sys.stderr) + self.ptz_service.Stop({"ProfileToken": self.profile.token}) + print("[PTZ] _force_stop: Stop() returned OK", file=sys.stderr) + return True + except Exception as e: + last_error = e + print(f"[PTZ] _force_stop: Stop() failed at attempt {i+1}: {e}", file=sys.stderr) + time.sleep(delay) + + print(f"[PTZ] _force_stop: all {retries} attempts failed, last error: {last_error}", file=sys.stderr) + return False + +# ======================= CameraController(加入 PTZ) ======================= + +class CameraController: + """ + Uni-Lab-OS 摄像头驱动(driver 形式) + 启动 Uni-Lab-OS 后,立即开始推流 + + - WebSocket 信令:通过 signal_backend_url 连接到后端 + 例如: wss://sciol.ac.cn/api/realtime/signal/host/ + - 媒体服务器:通过 rtmp_url / webrtc_api / webrtc_stream_url + 当前配置为 SRS,与独立 HostSimulator 独立运行脚本保持一致。 + """ + + def __init__( + self, + host_id: str = "demo-host", + + # (1)信令后端(WebSocket) + signal_backend_url: str = "wss://sciol.ac.cn/api/realtime/signal/host", + + # (2)媒体后端(RTMP + WebRTC API) + rtmp_url: str = "rtmp://srs.sciol.ac.cn:4499/live/camera-01", + webrtc_api: str = "https://srs.sciol.ac.cn/rtc/v1/play/", + webrtc_stream_url: str = "webrtc://srs.sciol.ac.cn:4500/live/camera-01", + camera_rtsp_url: str = "", + + # (3)PTZ 控制相关(ONVIF) + ptz_host: str = "", # 一般就是摄像头 IP,比如 "192.168.31.164" + ptz_port: int = 80, # ONVIF 端口,不一定是 80,按实际情况改 + ptz_user: str = "", # admin + ptz_password: str = "", # admin123 + ): + self.host_id = host_id + self.camera_rtsp_url = camera_rtsp_url + + # 拼接最终的 WebSocket URL:.../host/ + signal_backend_url = signal_backend_url.rstrip("/") + if not signal_backend_url.endswith("/host"): + signal_backend_url = signal_backend_url + "/host" + self.signal_backend_url = f"{signal_backend_url}/{host_id}" + + # 媒体服务器配置 + self.rtmp_url = rtmp_url + self.webrtc_api = webrtc_api + self.webrtc_stream_url = webrtc_stream_url + + # PTZ 控制 + self.ptz_host = ptz_host + self.ptz_port = ptz_port + self.ptz_user = ptz_user + self.ptz_password = ptz_password + self._ptz: Optional[PTZController] = None + self._init_ptz_if_possible() + + # 运行时状态 + self._ws: Optional[object] = None + self._ffmpeg_process: Optional[subprocess.Popen] = None + self._running = False + self._loop_task: Optional[asyncio.Future] = None + + # 事件循环 & 线程 + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_thread: Optional[threading.Thread] = None + + try: + self.start() + except Exception as e: + print(f"[CameraController] __init__ auto start failed: {e}", file=sys.stderr) + + # ------------------------ PTZ 初始化 ------------------------ + + # ------------------------ PTZ 公开动作方法(一个动作一个函数) ------------------------ + + def ptz_move_up(self, speed: float = 0.5, duration: float = 1.0) -> bool: + print(f"[CameraController] ptz_move_up called, speed={speed}, duration={duration}") + return self._ptz.move_up(speed=speed, duration=duration) + + def ptz_move_down(self, speed: float = 0.5, duration: float = 1.0) -> bool: + print(f"[CameraController] ptz_move_down called, speed={speed}, duration={duration}") + return self._ptz.move_down(speed=speed, duration=duration) + + def ptz_move_left(self, speed: float = 0.2, duration: float = 1.0) -> bool: + print(f"[CameraController] ptz_move_left called, speed={speed}, duration={duration}") + return self._ptz.move_left(speed=speed, duration=duration) + + def ptz_move_right(self, speed: float = 0.2, duration: float = 1.0) -> bool: + print(f"[CameraController] ptz_move_right called, speed={speed}, duration={duration}") + return self._ptz.move_right(speed=speed, duration=duration) + + def zoom_in(self, speed: float = 0.2, duration: float = 1.0) -> bool: + """ + 当前设备不支持变倍;保留方法只是避免上层调用时报错。 + """ + print("[PTZ] zoom_in is disabled for this device.", file=sys.stderr) + return False + + def zoom_out(self, speed: float = 0.2, duration: float = 1.0) -> bool: + """ + 当前设备不支持变倍;保留方法只是避免上层调用时报错。 + """ + print("[PTZ] zoom_out is disabled for this device.", file=sys.stderr) + return False + + def ptz_stop(self): + if self._ptz is None: + print("[CameraController] PTZ not initialized.", file=sys.stderr) + return + self._ptz.stop() + + def _init_ptz_if_possible(self): + """ + 根据 ptz_host / user / password 初始化 PTZ; + 如果配置信息不全则不启用 PTZ(静默)。 + """ + if not (self.ptz_host and self.ptz_user and self.ptz_password): + return + ctrl = PTZController( + host=self.ptz_host, + port=self.ptz_port, + user=self.ptz_user, + password=self.ptz_password, + ) + if ctrl.connect(): + self._ptz = ctrl + else: + self._ptz = None + + # --------------------------------------------------------------------- + # 对外暴露的方法:供 Uni-Lab-OS 调用 + # --------------------------------------------------------------------- + + def start(self, config: Optional[Dict[str, Any]] = None): + """ + 启动 Camera 连接 & 消息循环,并在启动时就开启 FFmpeg 推流, + """ + + if self._running: + return {"status": "already_running", "host_id": self.host_id} + + # 应用 config 覆盖(如果有) + if config: + self.camera_rtsp_url = config.get("camera_rtsp_url", self.camera_rtsp_url) + cfg_host_id = config.get("host_id") + if cfg_host_id: + self.host_id = cfg_host_id + + signal_backend_url = config.get("signal_backend_url") + if signal_backend_url: + signal_backend_url = signal_backend_url.rstrip("/") + if not signal_backend_url.endswith("/host"): + signal_backend_url = signal_backend_url + "/host" + self.signal_backend_url = f"{signal_backend_url}/{self.host_id}" + + self.rtmp_url = config.get("rtmp_url", self.rtmp_url) + self.webrtc_api = config.get("webrtc_api", self.webrtc_api) + self.webrtc_stream_url = config.get( + "webrtc_stream_url", self.webrtc_stream_url + ) + + # PTZ 相关配置也允许通过 config 注入 + self.ptz_host = config.get("ptz_host", self.ptz_host) + self.ptz_port = int(config.get("ptz_port", self.ptz_port)) + self.ptz_user = config.get("ptz_user", self.ptz_user) + self.ptz_password = config.get("ptz_password", self.ptz_password) + self._init_ptz_if_possible() + + self._running = True + + # === start 时启动 FFmpeg 推流 === + self._start_ffmpeg() + + # 创建新的事件循环和线程(用于 WebSocket 信令) + self._loop = asyncio.new_event_loop() + + def loop_runner(loop: asyncio.AbstractEventLoop): + asyncio.set_event_loop(loop) + try: + loop.run_forever() + except Exception as e: + print(f"[CameraController] event loop error: {e}", file=sys.stderr) + + self._loop_thread = threading.Thread( + target=loop_runner, args=(self._loop,), daemon=True + ) + self._loop_thread.start() + + self._loop_task = asyncio.run_coroutine_threadsafe( + self._run_main_loop(), self._loop + ) + + return { + "status": "started", + "host_id": self.host_id, + "signal_backend_url": self.signal_backend_url, + "rtmp_url": self.rtmp_url, + "webrtc_api": self.webrtc_api, + "webrtc_stream_url": self.webrtc_stream_url, + } + + def stop(self) -> Dict[str, Any]: + """ + 停止推流 & 断开 WebSocket,并关闭事件循环线程。 + """ + self._running = False + + self._stop_ffmpeg() + + if self._ws and self._loop is not None: + async def close_ws(): + try: + await self._ws.close() + except Exception as e: + print( + f"[CameraController] error when closing WebSocket: {e}", + file=sys.stderr, + ) + + asyncio.run_coroutine_threadsafe(close_ws(), self._loop) + + if self._loop_task is not None: + if not self._loop_task.done(): + self._loop_task.cancel() + try: + self._loop_task.result() + except asyncio.CancelledError: + pass + except Exception as e: + print( + f"[CameraController] main loop task error in stop(): {e}", + file=sys.stderr, + ) + finally: + self._loop_task = None + + if self._loop is not None: + try: + self._loop.call_soon_threadsafe(self._loop.stop) + except Exception as e: + print( + f"[CameraController] error when stopping event loop: {e}", + file=sys.stderr, + ) + + if self._loop_thread is not None: + try: + self._loop_thread.join(timeout=5) + except Exception as e: + print( + f"[CameraController] error when joining loop thread: {e}", + file=sys.stderr, + ) + finally: + self._loop_thread = None + + self._ws = None + self._loop = None + + return {"status": "stopped", "host_id": self.host_id} + + def get_status(self) -> Dict[str, Any]: + """ + 查询当前状态,方便在 Uni-Lab-OS 中做监控。 + """ + ws_closed = None + if self._ws is not None: + ws_closed = getattr(self._ws, "closed", None) + + if ws_closed is None: + websocket_connected = self._ws is not None + else: + websocket_connected = (self._ws is not None) and (not ws_closed) + + return { + "host_id": self.host_id, + "running": self._running, + "websocket_connected": websocket_connected, + "ffmpeg_running": bool( + self._ffmpeg_process and self._ffmpeg_process.poll() is None + ), + "signal_backend_url": self.signal_backend_url, + "rtmp_url": self.rtmp_url, + } + + # --------------------------------------------------------------------- + # 内部实现逻辑:WebSocket 循环 / FFmpeg / WebRTC Offer 处理 + # --------------------------------------------------------------------- + + async def _run_main_loop(self): + try: + while self._running: + try: + async with websockets.connect(self.signal_backend_url) as ws: + self._ws = ws + await self._recv_loop() + except asyncio.CancelledError: + raise + except Exception as e: + if self._running: + print( + f"[CameraController] WebSocket connection error: {e}", + file=sys.stderr, + ) + await asyncio.sleep(3) + except asyncio.CancelledError: + pass + + async def _recv_loop(self): + assert self._ws is not None + ws = self._ws + + async for message in ws: + try: + data = json.loads(message) + except json.JSONDecodeError: + print( + f"[CameraController] received non-JSON message: {message}", + file=sys.stderr, + ) + continue + + try: + await self._handle_message(data) + except Exception as e: + print( + f"[CameraController] error while handling message {data}: {e}", + file=sys.stderr, + ) + + async def _handle_message(self, data: Dict[str, Any]): + """ + 处理来自信令后端的消息: + - command: start_stream / stop_stream / ptz_xxx + - type: offer (WebRTC) + """ + cmd = data.get("command") + + # ---------- 推流控制 ---------- + if cmd == "start_stream": + try: + self._start_ffmpeg() + except Exception as e: + print( + f"[CameraController] error when starting FFmpeg on start_stream: {e}", + file=sys.stderr, + ) + return + + if cmd == "stop_stream": + try: + self._stop_ffmpeg() + except Exception as e: + print( + f"[CameraController] error when stopping FFmpeg on stop_stream: {e}", + file=sys.stderr, + ) + return + + # # ---------- PTZ 控制 ---------- + # # 例如信令可以发: + # # {"command": "ptz_move", "direction": "down", "speed": 0.5, "duration": 0.5} + # if cmd == "ptz_move": + # if self._ptz is None: + # # 没有初始化 PTZ,静默忽略或打印一条 + # print("[CameraController] PTZ not initialized.", file=sys.stderr) + # return + + # direction = data.get("direction", "") + # speed = float(data.get("speed", 0.5)) + # duration = float(data.get("duration", 0.5)) + + # try: + # if direction == "up": + # self._ptz.move_up(speed=speed, duration=duration) + # elif direction == "down": + # self._ptz.move_down(speed=speed, duration=duration) + # elif direction == "left": + # self._ptz.move_left(speed=speed, duration=duration) + # elif direction == "right": + # self._ptz.move_right(speed=speed, duration=duration) + # elif direction == "zoom_in": + # self._ptz.zoom_in(speed=speed, duration=duration) + # elif direction == "zoom_out": + # self._ptz.zoom_out(speed=speed, duration=duration) + # elif direction == "stop": + # self._ptz.stop() + # else: + # # 未知方向,忽略 + # pass + # except Exception as e: + # print( + # f"[CameraController] error when handling PTZ move: {e}", + # file=sys.stderr, + # ) + # return + + # ---------- WebRTC Offer ---------- + if data.get("type") == "offer": + offer_sdp = data.get("sdp", "") + camera_id = data.get("cameraId", "camera-01") + try: + answer_sdp = await self._handle_webrtc_offer(offer_sdp) + except Exception as e: + print( + f"[CameraController] error when handling WebRTC offer: {e}", + file=sys.stderr, + ) + return + + if self._ws: + answer_payload = { + "type": "answer", + "sdp": answer_sdp, + "cameraId": camera_id, + "hostId": self.host_id, + } + try: + await self._ws.send(json.dumps(answer_payload)) + except Exception as e: + print( + f"[CameraController] error when sending WebRTC answer: {e}", + file=sys.stderr, + ) + + # ------------------------ FFmpeg 相关 ------------------------ + + def _start_ffmpeg(self): + if self._ffmpeg_process and self._ffmpeg_process.poll() is None: + return + + cmd = [ + "ffmpeg", + "-rtsp_transport", "tcp", + "-i", self.camera_rtsp_url, + + "-c:v", "libx264", + "-preset", "ultrafast", + "-tune", "zerolatency", + "-profile:v", "baseline", + "-b:v", "1M", + "-maxrate", "1M", + "-bufsize", "2M", + "-g", "10", + "-keyint_min", "10", + "-sc_threshold", "0", + "-pix_fmt", "yuv420p", + "-x264-params", "bframes=0", + + "-c:a", "aac", + "-ar", "44100", + "-ac", "1", + "-b:a", "64k", + + "-f", "flv", + self.rtmp_url, + ] + + logf = open("/tmp/ffmpeg_camera.log", "ab", buffering=0) + self._ffmpeg_process = subprocess.Popen( + cmd, + stdout=logf, + stderr=logf, + shell=False, + ) + + try: + self._ffmpeg_process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + shell=False, + ) + except Exception as e: + print(f"[CameraController] failed to start FFmpeg: {e}", file=sys.stderr) + self._ffmpeg_process = None + raise + + def _stop_ffmpeg(self): + proc = self._ffmpeg_process + + if proc and proc.poll() is None: + try: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + try: + proc.kill() + try: + proc.wait(timeout=2) + except subprocess.TimeoutExpired: + print( + f"[CameraController] FFmpeg process did not exit even after kill (pid={proc.pid})", + file=sys.stderr, + ) + except Exception as e: + print( + f"[CameraController] failed to kill FFmpeg process: {e}", + file=sys.stderr, + ) + except Exception as e: + print( + f"[CameraController] error when stopping FFmpeg: {e}", + file=sys.stderr, + ) + + self._ffmpeg_process = None + + # ------------------------ WebRTC Offer 相关 ------------------------ + + async def _handle_webrtc_offer(self, offer_sdp: str) -> str: + payload = { + "api": self.webrtc_api, + "streamurl": self.webrtc_stream_url, + "sdp": offer_sdp, + } + + headers = {"Content-Type": "application/json"} + + def _do_request(): + return requests.post( + self.webrtc_api, + json=payload, + headers=headers, + timeout=10, + ) + + try: + loop = asyncio.get_running_loop() + resp = await loop.run_in_executor(None, _do_request) + except Exception as e: + print( + f"[CameraController] failed to send offer to media server: {e}", + file=sys.stderr, + ) + raise + + try: + resp.raise_for_status() + except Exception as e: + print( + f"[CameraController] media server HTTP error: {e}, " + f"status={resp.status_code}, body={resp.text[:200]}", + file=sys.stderr, + ) + raise + + try: + data = resp.json() + except Exception as e: + print( + f"[CameraController] failed to parse media server JSON: {e}, " + f"raw={resp.text[:200]}", + file=sys.stderr, + ) + raise + + answer_sdp = data.get("sdp", "") + if not answer_sdp: + msg = f"empty SDP from media server: {data}" + print(f"[CameraController] {msg}", file=sys.stderr) + raise RuntimeError(msg) + + return answer_sdp \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/demo_camera_pic.py b/unilabos/devices/SII/cameraSII/demo_camera_pic.py new file mode 100644 index 00000000..6e117a97 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/demo_camera_pic.py @@ -0,0 +1,36 @@ +import cv2 + +# 推荐把 @ 进行 URL 编码:@ -> %40 +RTSP_URL = "rtsp://admin:admin123@192.168.31.164:554/stream1" +OUTPUT_IMAGE = "rtsp_test_frame.jpg" + +def main(): + print(f"尝试连接 RTSP 流: {RTSP_URL}") + cap = cv2.VideoCapture(RTSP_URL) + + if not cap.isOpened(): + print("错误:无法打开 RTSP 流,请检查:") + print(" 1. IP/端口是否正确") + print(" 2. 账号密码(尤其是 @ 是否已转成 %40)是否正确") + print(" 3. 摄像头是否允许当前主机访问(同一网段、防火墙等)") + return + + print("连接成功,开始读取一帧...") + ret, frame = cap.read() + + if not ret or frame is None: + print("错误:已连接但未能读取到帧数据(可能是码流未开启或网络抖动)") + cap.release() + return + + # 保存当前帧 + success = cv2.imwrite(OUTPUT_IMAGE, frame) + cap.release() + + if success: + print(f"成功截取一帧并保存为: {OUTPUT_IMAGE}") + else: + print("错误:写入图片失败,请检查磁盘权限/路径") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/demo_camera_push.py b/unilabos/devices/SII/cameraSII/demo_camera_push.py new file mode 100644 index 00000000..5321e713 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/demo_camera_push.py @@ -0,0 +1,21 @@ +# run_camera_push.py +import time +from cameraWifi import CameraController # 这里根据你的文件名调整 + +if __name__ == "__main__": + controller = CameraController( + host_id="demo-host", + signal_backend_url="wss://sciol.ac.cn/api/realtime/signal/host", + rtmp_url="rtmp://srs.sciol.ac.cn:4499/live/camera-01", + webrtc_api="https://srs.sciol.ac.cn/rtc/v1/play/", + webrtc_stream_url="webrtc://srs.sciol.ac.cn:4500/live/camera-01", + camera_rtsp_url="rtsp://admin:admin123@192.168.31.164:554/stream1", + ) + + try: + while True: + status = controller.get_status() + print(status) + time.sleep(5) + except KeyboardInterrupt: + controller.stop() \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/ptz_cameracontroller_test.py b/unilabos/devices/SII/cameraSII/ptz_cameracontroller_test.py new file mode 100644 index 00000000..2d788b62 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/ptz_cameracontroller_test.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +使用 CameraController 来测试 PTZ: +让摄像头按顺序向下、向上、向左、向右运动几次。 +""" + +import time +import sys + +# 根据你的工程结构修改导入路径: +# 假设 CameraController 定义在 cameraController.py 里 +from cameraDriver import CameraController + + +def main(): + # === 根据你的实际情况填 IP、端口、账号密码 === + ptz_host = "192.168.31.164" + ptz_port = 2020 # 注意要和你单独测试 PTZController 时保持一致 + ptz_user = "admin" + ptz_password = "admin123" + + # 1. 创建 CameraController 实例 + cam = CameraController( + # 其他摄像机相关参数按你类的 __init__ 来补充 + ptz_host=ptz_host, + ptz_port=ptz_port, + ptz_user=ptz_user, + ptz_password=ptz_password, + ) + + # 2. 启动 / 初始化(如果你的 CameraController 有 start(config) 之类的接口) + # 这里给一个最小的 config,重点是 PTZ 相关字段 + config = { + "ptz_host": ptz_host, + "ptz_port": ptz_port, + "ptz_user": ptz_user, + "ptz_password": ptz_password, + } + + try: + cam.start(config) + except Exception as e: + print(f"[TEST] CameraController start() 失败: {e}", file=sys.stderr) + return + + # 这里可以判断一下内部 _ptz 是否初始化成功(如果你对 CameraController 做了封装) + if getattr(cam, "_ptz", None) is None: + print("[TEST] CameraController 内部 PTZ 未初始化成功,请检查 ptz_host/port/user/password 配置。", file=sys.stderr) + return + + # 3. 依次调用 CameraController 的 PTZ 方法 + # 这里假设你在 CameraController 中提供了这几个对外方法: + # ptz_move_down / ptz_move_up / ptz_move_left / ptz_move_right + # 如果你命名不一样,把下面调用名改成你的即可。 + + print("向下移动(通过 CameraController)...") + cam.ptz_move_down(speed=0.5, duration=1.0) + time.sleep(1) + + print("向上移动(通过 CameraController)...") + cam.ptz_move_up(speed=0.5, duration=1.0) + time.sleep(1) + + print("向左移动(通过 CameraController)...") + cam.ptz_move_left(speed=0.5, duration=1.0) + time.sleep(1) + + print("向右移动(通过 CameraController)...") + cam.ptz_move_right(speed=0.5, duration=1.0) + time.sleep(1) + + print("测试结束。") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/cameraSII/ptz_test.py b/unilabos/devices/SII/cameraSII/ptz_test.py new file mode 100644 index 00000000..018c3d29 --- /dev/null +++ b/unilabos/devices/SII/cameraSII/ptz_test.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +测试 cameraDriver.py中的 PTZController 类,让摄像头按顺序运动几次 +""" + +import time + +from cameraDriver import PTZController + + +def main(): + # 根据你的实际情况填 IP、端口、账号密码 + host = "192.168.31.164" + port = 80 + user = "admin" + password = "admin123" + + ptz = PTZController(host=host, port=port, user=user, password=password) + + # 1. 连接摄像头 + if not ptz.connect(): + print("连接 PTZ 失败,检查 IP/用户名/密码/端口。") + return + + # 2. 依次测试几个动作 + # 每个动作之间 sleep 一下方便观察 + + print("向下移动...") + ptz.move_down(speed=0.5, duration=1.0) + time.sleep(1) + + print("向上移动...") + ptz.move_up(speed=0.5, duration=1.0) + time.sleep(1) + + print("向左移动...") + ptz.move_left(speed=0.5, duration=1.0) + time.sleep(1) + + print("向右移动...") + ptz.move_right(speed=0.5, duration=1.0) + time.sleep(1) + + print("测试结束。") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/XYZStepperController.py b/unilabos/devices/SII/laiyu_xyz_pipette/XYZStepperController.py new file mode 100644 index 00000000..216b2f83 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/XYZStepperController.py @@ -0,0 +1,267 @@ +# xyz_stepper_controller.py +import time +import logging +from enum import Enum +from dataclasses import dataclass +from typing import Optional, Dict, List + +from .drivers.XYZModbus import XYZModbus, MotorStatus, ModbusException +from .drivers.SharedRS485Bus import SharedRS485Bus + +logger = logging.getLogger("XYZStepper") + + +class MotorAxis(Enum): + X = 1 + Y = 2 + Z = 3 + + +@dataclass +class MotorPosition: + steps: int + speed: int + current: int + status: MotorStatus + + +class XYZStepperController: + """ + XYZ 三轴步进控制器(基于 SharedRS485Bus + XYZModbus) + 保持和你原版大体一致的接口。 + """ + + STEPS_PER_REV = 16384 + LEAD_MM_X, LEAD_MM_Y, LEAD_MM_Z = 80.0, 80.0, 5.0 + STEPS_PER_MM_X = STEPS_PER_REV / LEAD_MM_X + STEPS_PER_MM_Y = STEPS_PER_REV / LEAD_MM_Y + STEPS_PER_MM_Z = STEPS_PER_REV / LEAD_MM_Z + + # 寄存器映射沿用你原来的 + REG_STATUS, REG_POS_HIGH, REG_POS_LOW = 0x00, 0x01, 0x02 + REG_ACTUAL_SPEED, REG_CURRENT, REG_ENABLE = 0x03, 0x05, 0x06 + REG_ZERO_CMD, REG_TARGET_HIGH, REG_TARGET_LOW = 0x0F, 0x10, 0x11 + REG_SPEED, REG_ACCEL, REG_PRECISION, REG_START = 0x13, 0x14, 0x15, 0x16 + REG_COMMAND = 0x60 + + def __init__( + self, + bus: SharedRS485Bus, + origin_path: str = "unilabos/devices/laiyu_xyz_pipette/work_origin.json", + ): + """ + 这里不再自己开串口,而是复用上层传入的 SharedRS485Bus。 + """ + self.bus = bus + self.modbus = XYZModbus(bus) + self.axis_addr: Dict[MotorAxis, int] = { + MotorAxis.X: 1, + MotorAxis.Y: 2, + MotorAxis.Z: 3, + } + self.work_origin_steps = {"x": 0, "y": 0, "z": 0} + self.is_homed = False + self._load_work_origin(origin_path) + + # --------- 工具函数 ---------- + @staticmethod + def s16(v: int) -> int: + return v - 0x10000 if v & 0x8000 else v + + @staticmethod + def s32(h: int, l: int) -> int: + v = (h << 16) | l + return v - 0x100000000 if v & 0x80000000 else v + + @classmethod + def mm_to_steps(cls, axis: str, mm: float = 0.0) -> int: + axis = axis.upper() + if axis == "X": + return int(mm * cls.STEPS_PER_MM_X) + elif axis == "Y": + return int(mm * cls.STEPS_PER_MM_Y) + elif axis == "Z": + return int(mm * cls.STEPS_PER_MM_Z) + raise ValueError(f"未知轴: {axis}") + + @classmethod + def steps_to_mm(cls, axis: str, steps: int) -> float: + axis = axis.upper() + if axis == "X": + return steps / cls.STEPS_PER_MM_X + elif axis == "Y": + return steps / cls.STEPS_PER_MM_Y + elif axis == "Z": + return steps / cls.STEPS_PER_MM_Z + raise ValueError(f"未知轴: {axis}") + + # --------- 状态与控制 ---------- + def get_status(self, axis: str = "Z") -> List[int]: + """ + 返回简化数组格式: [steps, speed, current, status_value] + 读 6 个寄存器:status, posH, posL, speed, current, enable + """ + if isinstance(axis, MotorAxis): + axis_enum = axis + elif isinstance(axis, str): + axis_enum = MotorAxis[axis.upper()] + else: + raise TypeError("axis 参数必须为 str 或 MotorAxis") + + slave = self.axis_addr[axis_enum] + try: + vals = self.modbus.read_regs(slave, self.REG_STATUS, 6) + except Exception as e: + logger.warning("get_status fail axis=%s: %s", axis, e) + return [0, 0, 0, MotorStatus.STANDBY.value] + + if vals is None or len(vals) < 5: + return [0, 0, 0, MotorStatus.STANDBY.value] + + steps = self.s32(vals[1], vals[2]) + speed = self.s16(vals[3]) + current = vals[4] + st_val = int(MotorStatus(vals[0]).value) + return [steps, speed, current, st_val] + + def enable(self, axis: str, state: bool) -> bool: + a = MotorAxis[axis.upper()] + return self.modbus.write_reg(self.axis_addr[a], self.REG_ENABLE, 1 if state else 0) + + def wait_complete(self, axis: str, timeout: float = 30.0) -> bool: + a = axis.upper() + start = time.time() + while time.time() - start < timeout: + try: + vals = self.get_status(a) + except Exception: + time.sleep(0.1) + continue + + if len(vals) <= 3: + time.sleep(0.1) + continue + + try: + st = MotorStatus(vals[3]) + except Exception: + time.sleep(0.1) + continue + + if st == MotorStatus.STANDBY: + return True + + if st in ( + MotorStatus.COLLISION_STOP, + MotorStatus.FORWARD_LIMIT_STOP, + MotorStatus.REVERSE_LIMIT_STOP, + ): + logger.warning("%s 轴异常停止: %s", a, st.name) + return False + + time.sleep(0.1) + return False + + # --------- 控制命令 ---------- + def move_to(self, axis: str, steps: int, + speed: int = 2000, acc: int = 500, precision: int = 50) -> bool: + """ + 单轴绝对运动(机器坐标系步数)。 + """ + a = MotorAxis[axis.upper()] + addr = self.axis_addr[a] + hi, lo = (steps >> 16) & 0xFFFF, steps & 0xFFFF + values = [hi, lo, speed, acc, precision] + ok = self.modbus.write_regs(addr, self.REG_TARGET_HIGH, values) + if ok: + self.modbus.write_reg(addr, self.REG_START, 1) + return ok + + def move_xyz_work( + self, + x: Optional[float] = 0.0, + y: Optional[float] = 0.0, + z: Optional[float] = 0.0, + speed: int = 100, + acc: int = 1500, + ): + """ + 按你原来的安全顺序:Z 抬起 -> XY 平面移动 -> Z 下探。 + x/y/z 为 “工作坐标” mm。 + """ + if z is not None: + safe_z = self._to_machine_steps("Z", 0.0) + self.move_to("Z", safe_z, speed, acc) + self.wait_complete("Z") + + if x is not None or y is not None: + if x is not None: + self.move_to("X", self._to_machine_steps("X", x), speed, acc) + if y is not None: + self.move_to("Y", self._to_machine_steps("Y", y), speed, acc) + if x is not None: + self.wait_complete("X") + if y is not None: + self.wait_complete("Y") + + if z is not None: + self.move_to("Z", self._to_machine_steps("Z", z), speed, acc) + self.wait_complete("Z") + + # --------- 坐标与零点 ---------- + def _to_machine_steps(self, axis: str, mm: float) -> int: + """ + 工作坐标 (mm) -> 机器坐标步数(= 工件软零点 + 位移)。 + """ + base = self.work_origin_steps.get(axis.lower(), 0) + return base + self.mm_to_steps(axis, mm) + + def define_current_as_zero(self, save_path: str = "work_origin.json"): + import json + from datetime import datetime + + origin = {} + for axis in ["X", "Y", "Z"]: + vals = self.get_status(axis) + origin[axis.lower()] = int(vals[0]) + with open(save_path, "w", encoding="utf-8") as f: + json.dump( + { + "work_origin_steps": origin, + "timestamp": datetime.now().isoformat(), + }, + f, + indent=2, + ensure_ascii=False, + ) + self.work_origin_steps = origin + self.is_homed = True + logger.info("软零点已定义并保存到 %s", save_path) + + def _load_work_origin(self, path: str) -> bool: + import json, os + + if not os.path.exists(path): + logger.warning("未找到软零点文件: %s", path) + return False + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + self.work_origin_steps = data.get("work_origin_steps", {"x": 0, "y": 0, "z": 0}) + self.is_homed = True + logger.info("软零点已加载: %s", self.work_origin_steps) + return True + + def return_to_work_origin(self, speed: int = 200, acc: int = 800): + """ + 回工件软零点:Z 抬起 -> X/Y -> Z。 + """ + self.move_to("Z", self._to_machine_steps("Z", 0.0), speed, acc) + self.wait_complete("Z") + + self.move_to("X", self.work_origin_steps.get("x", 0), speed, acc) + self.move_to("Y", self.work_origin_steps.get("y", 0), speed, acc) + self.wait_complete("X") + self.wait_complete("Y") + + self.move_to("Z", self.work_origin_steps.get("z", 0), speed, acc) + self.wait_complete("Z") \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/__init__.py b/unilabos/devices/SII/laiyu_xyz_pipette/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/demo_pipette.py b/unilabos/devices/SII/laiyu_xyz_pipette/demo_pipette.py new file mode 100644 index 00000000..ab05e960 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/demo_pipette.py @@ -0,0 +1,77 @@ +# demo_pipette_station.py +import logging +import time + +from station_simple import Station + + +def wash_pipette(st: Station, + wash_volume_ul: float = 200.0, + cycles: int = 3, + delay_s: float = 0.5): + """ + 简单洗液: + 反复执行 “吸 wash_volume_ul → 等待 → 排 wash_volume_ul → 等待” + 通过 Station 的 aspirate/dispense 间接调用移液枪。 + """ + logging.info(f"开始洗液: 体积={wash_volume_ul} uL, 循环={cycles}") + + for i in range(1, cycles + 1): + logging.info(f"洗液循环 {i}/{cycles} - 吸液 {wash_volume_ul} uL") + st.aspirate(wash_volume_ul) + time.sleep(delay_s) + + logging.info(f"洗液循环 {i}/{cycles} - 排液 {wash_volume_ul} uL") + st.dispense(wash_volume_ul) + time.sleep(delay_s) + + logging.info("洗液完成") + + +def purge_liquid(st: Station, + purge_volume_ul: float = 500.0, + delay_s: float = 0.5): + """ + 排空:向外排出一大段液体,尽量把枪头里的液体排尽。 + 通过 Station 的 dispense 调用底层移液枪。 + """ + logging.info(f"开始排空: 体积={purge_volume_ul} uL") + st.dispense(purge_volume_ul) + time.sleep(delay_s) + logging.info("排空完成") + + +def eject_tip(st: Station): + """ + 弹枪头:调用 Station 的 eject_tip 方法。 + """ + logging.info("准备弹出枪头") + st.eject_tip() + # Station 里已经打印日志,这里不再判断返回值 + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + # 1. 创建 Station(里面会自己 new SharedRS485Bus、XYZ 和 Pipette) + st = Station(port="/dev/ttyUSB1", baudrate=115200) + + # 2. 连接 Station(打开总线,初始化 XYZ 和 移液枪) + st.connect() + + try: + # 3. 洗液(例如 200 uL 洗 3 次) + wash_pipette(st, wash_volume_ul=200.0, cycles=3, delay_s=0.5) + + # 4. 排空(例如再排 500 uL,把残液尽量排出) + purge_liquid(st, purge_volume_ul=500.0, delay_s=0.5) + + # 5. 弹枪头 + eject_tip(st) + + finally: + # 6. 断开 Station(关闭总线) + st.disconnect() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/demo_points.py b/unilabos/devices/SII/laiyu_xyz_pipette/demo_points.py new file mode 100644 index 00000000..24c06dad --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/demo_points.py @@ -0,0 +1,35 @@ +# demo_points.py +import logging +import time + +from station_simple import Station + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + st = Station( + port="/dev/ttyUSB1", + baudrate=115200, + points_file="points.json", # 如果不在同目录,改为实际路径 + ) + st.connect() + + try: + # 移动到 C1 原始高度 + st.move_to_point("C1") + + time.sleep(1.0) + + # 在 C2 上方 10 mm + st.move_to_point("C2", z_offset=+10.0) + + time.sleep(1.0) + + # 在 C3 点下插 2 mm + st.move_to_point("C3", z_offset=-2.0) + + finally: + st.disconnect() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/demo_xyz.py b/unilabos/devices/SII/laiyu_xyz_pipette/demo_xyz.py new file mode 100644 index 00000000..38535560 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/demo_xyz.py @@ -0,0 +1,24 @@ +# demo.py +import logging +import time + +from .station_simple import Station + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + st = Station(port="/dev/ttyUSB0", baudrate=115200) + st.connect() + + # 先移动到工件坐标 (x=100, y=100, z=0) + st.move_xyz(100.0, 100.0, 0.0, speed=500, acc=1000) + time.sleep(1.0) + + # 再回到工件原点 (x=0, y=0, z=0) + st.move_xyz(0.0, 0.0, 0.0, speed=500, acc=1000) + time.sleep(1.0) + + st.disconnect() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SOPAPipetteYYQ.py b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SOPAPipetteYYQ.py new file mode 100644 index 00000000..50659669 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SOPAPipetteYYQ.py @@ -0,0 +1,134 @@ +# pipette.py +import time +import logging +from dataclasses import dataclass + +from .SharedRS485Bus import SharedRS485Bus + +logger = logging.getLogger("liquid_station.pipette_driver") + + +@dataclass +class SOPAConfig: + address: int = 4 # 固定设备地址 + timeout: float = 2.0 + + +class SOPAPipetteYYQ: + """SOPA 移液枪 YYQ 驱动(共享 RS485 总线,文本协议)。""" + + def __init__(self, bus: SharedRS485Bus, config: SOPAConfig = SOPAConfig()): + self.bus = bus + self.config = config + + def _send_and_read(self, cmd: str, delay_before_read: float = 0.2) -> str: + """ + 在一把锁内发送一条命令并读取响应,保证事务原子。 + cmd: 不含地址/结束符的正文,如 "HE" / "P100" + """ + address = str(self.config.address) + full_cmd = f"/{address}{cmd}E".encode("ascii") + checksum = bytes([sum(full_cmd) & 0xFF]) + payload = full_cmd + checksum + + with self.bus.lock: + self.bus.reset_input() + self.bus.write(payload) + logger.debug("[YYQ] TX: %r", payload) + time.sleep(delay_before_read) + data = b"" + if self.bus.serial and self.bus.serial.in_waiting: + data = self.bus.serial.read_all() + txt = data.decode(errors="ignore") + if txt: + logger.debug("[YYQ] RX: %r", txt) + return txt + + def initialize(self) -> bool: + try: + logger.info("初始化移液枪中...") + self._send_and_read("HE") + time.sleep(10) + self.is_initialized = True + logger.info("移液枪初始化完成") + return True + except Exception as e: + logger.error("初始化失败: %s", e) + return False + + def eject_tip(self) -> bool: + try: + self._send_and_read("RE") + time.sleep(1) + logger.info("枪头已弹出") + return True + except Exception as e: + logger.error("弹出枪头失败: %s", e) + return False + + def aspirate(self, volume_uL: float): + try: + vol = int(volume_uL) + logger.info("吸液 %d µL...", vol) + self._send_and_read(f"P{vol}") + time.sleep(max(0.2, vol / 200.0)) + logger.info("吸液完成") + except Exception as e: + logger.error("吸液失败: %s", e) + + def dispense(self, volume_uL: float): + try: + vol = int(volume_uL) + logger.info("排液 %d µL...", vol) + self._send_and_read(f"D{vol}") + time.sleep(max(0.2, vol / 200.0)) + logger.info("排液完成") + except Exception as e: + logger.error("排液失败: %s", e) + + def set_max_speed(self, speed: int): + try: + self._send_and_read(f"s{speed}") + time.sleep(1) + logger.info("设置最高速度完成") + except Exception as e: + logger.error("设置最高速度失败: %s", e) + + def set_start_speed(self, speed: int): + try: + self._send_and_read(f"b{speed}") + time.sleep(1) + logger.info("设置启动速度完成") + except Exception as e: + logger.error("设置启动速度失败: %s", e) + + def set_cutoff_speed(self, speed: int): + try: + self._send_and_read(f"c{speed}") + time.sleep(1) + logger.info("设置断流速度完成") + except Exception as e: + logger.error("设置断流速度失败: %s", e) + + def set_acceleration(self, accel: int): + try: + self._send_and_read(f"a{accel}") + time.sleep(1) + logger.info("设置加速度完成") + except Exception as e: + logger.error("设置加速度失败: %s", e) + + def get_status(self) -> str: + return self._send_and_read("Q", delay_before_read=0.1) + + def get_tip_status(self) -> bool: + """ + True: 有枪头;False: 无枪头或通信失败。 + """ + resp = self._send_and_read("Q28", delay_before_read=0.1) + if resp: + if "T1" in resp: + return True + if "T0" in resp: + return False + return False \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SharedRS485Bus.py b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SharedRS485Bus.py new file mode 100644 index 00000000..3e06e597 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/SharedRS485Bus.py @@ -0,0 +1,81 @@ +# bus_rs485.py +import threading +import time +try: + import serial +except Exception as e: + raise RuntimeError("Please install pyserial: pip install pyserial") from e +import logging + +logger = logging.getLogger("liquid_station.rs485") + + +class SharedRS485Bus: + """ + 所有设备(XYZ 轴 + 移液枪)使用同一个 485 接口, + 加入线程锁保证同一时刻只有一个请求在进行。 + """ + + def __init__(self, port: str = "COM3", baudrate: int = 115200, timeout: float = 0.2): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.serial: serial.Serial | None = None + self.lock = threading.Lock() + + def open(self) -> bool: + """开启串口,8N1,无校验。""" + if self.serial and self.serial.is_open: + return True + self.serial = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=self.timeout, + ) + logger.info(f"Opened RS485 bus on {self.port}") + return True + + def close(self): + if self.serial and self.serial.is_open: + self.serial.close() + logger.info("Closed RS485 bus") + + def reset_input(self): + """清空接收缓冲。""" + if self.serial: + self.serial.reset_input_buffer() + + def write(self, data: bytes): + """发送数据。""" + if not self.serial or not self.serial.is_open: + raise RuntimeError("RS485 bus not open") + self.serial.write(data) + + def read(self, n: int = 256) -> bytes: + """读取最多 n 字节(受 timeout 影响)。""" + if not self.serial or not self.serial.is_open: + raise RuntimeError("RS485 bus not open") + return self.serial.read(n) + + def read_exact(self, n: int, overall_timeout: float = 0.3) -> bytes: + """ + 精确读取 n 字节,带整体超时。 + 超时返回已读到的内容(可能 < n)。 + """ + if n <= 0: + return b"" + buf = b"" + deadline = time.time() + overall_timeout + while len(buf) < n: + if time.time() > deadline: + break + need = n - len(buf) + chunk = self.read(need) + if chunk: + buf += chunk + else: + time.sleep(0.001) + return buf \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/drivers/XYZModbus.py b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/XYZModbus.py new file mode 100644 index 00000000..9d5dde5a --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/XYZModbus.py @@ -0,0 +1,230 @@ +import time +import logging +from dataclasses import dataclass +from enum import Enum +from typing import List + +from .SharedRS485Bus import SharedRS485Bus + +logger = logging.getLogger("liquid_station.xyz_modbus") + + +class ModbusException(Exception): + pass + + +class MotorStatus(Enum): + STANDBY = 0x0000 + RUNNING = 0x0001 + COLLISION_STOP = 0x0002 + FORWARD_LIMIT_STOP = 0x0003 + REVERSE_LIMIT_STOP = 0x0004 + + +@dataclass +class MotorPosition: + steps: int + speed: int + current: int + status: MotorStatus + + +class XYZModbus: + """ + 基于 SharedRS485Bus 的 Modbus RTU 客户端。 + """ + + def __init__(self, bus: SharedRS485Bus, ignore_crc_error: bool = False): + self.bus = bus + self.ignore_crc_error = ignore_crc_error + + def set_ignore_crc(self, flag: bool): + self.ignore_crc_error = bool(flag) + + @staticmethod + def _crc16(data: bytes) -> bytes: + """ + 使用你原始代码那套 CRC 算法,但这里直接返回2字节小端形式。 + """ + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + if crc & 0x0001: + crc >>= 1 + crc ^= 0xA001 + else: + crc >>= 1 + return crc.to_bytes(2, "little") + + def _xfer( + self, + slave: int, + payload: bytes, + retries: int = 3, + delay_before_read: float = 0.02, + ) -> bytes: + """ + 发送一帧 Modbus 请求并接收响应。 + 采用“先读一整块,再在其中寻找匹配帧 + CRC 校验”的策略。 + 如果在所有重试中都没有找到合法帧,则抛出 ModbusException。 + """ + def hx(data: bytes) -> str: + return " ".join(f"{b:02X}" for b in data) + + req = bytes([slave]) + payload + frame = req + self._crc16(req) + fn_req = payload[0] + # logger.debug("TX slave=%d fn=0x%02X frame=%s", slave, fn_req, hx(frame)) + + last_error: Exception | None = None + + for attempt in range(1, retries + 1): + try: + with self.bus.lock: + if not self.bus.serial or not self.bus.serial.is_open: + raise ModbusException("RS485 bus not open") + + # 清空旧数据,发送新帧 + self.bus.reset_input() + self.bus.write(frame) + time.sleep(delay_before_read) + + # 一次性读一大块原始数据 + raw = self.bus.read(256) + if not raw: + raise ModbusException("No response") + + # logger.debug("RAW RX attempt %d/%d: %s", attempt, retries, hx(raw)) + + # 在 raw 里搜索对齐的帧头 + for i in range(0, len(raw) - 4): + if raw[i] != slave: + continue + fn = raw[i + 1] + + # 只接受期望功能码或其异常码 + if fn != fn_req and fn != (fn_req | 0x80): + continue + + # 根据功能码推断长度 + if fn == 0x03: + # 读多个寄存器: addr fn bc data... crc + if i + 3 >= len(raw): + continue + bc = raw[i + 2] + total_len = 3 + bc + 2 + elif fn in (0x06, 0x10): + # 写单/多寄存器:固定 8 字节响应 + total_len = 8 + else: + # 其他功能码,保守一些:至少 5 字节 + total_len = 5 + + if i + total_len > len(raw): + continue + + frame_candidate = raw[i : i + total_len] + + # CRC 校验 + frame_wo_crc = frame_candidate[:-2] + crc_recv = frame_candidate[-2:] + crc_calc = self._crc16(frame_wo_crc) + if crc_recv != crc_calc: + # CRC 不一致,跳过这个 candidate + continue + + # 异常响应 + if (fn & 0x80) != 0: + logger.warning("Modbus exception frame: %s", hx(frame_candidate)) + raise ModbusException("Modbus exception response") + + # logger.debug( + # "Parsed frame OK attempt %d/%d: %s", + # attempt, + # retries, + # hx(frame_candidate), + # ) + return frame_candidate + + # 如果这一轮没有找到任何合法帧 + raise ModbusException("No valid frame found in raw data") + + except Exception as e: + last_error = e + logger.warning( + "Modbus xfer attempt %d/%d failed (slave=%s fn=0x%02X): %s", + attempt, + retries, + slave, + fn_req, + e, + ) + # 下一轮重试 + continue + + # 所有重试都失败 + if last_error is not None: + raise ModbusException( + f"Modbus transfer failed after {retries} retries: {last_error}" + ) + else: + raise ModbusException(f"Modbus transfer failed after {retries} retries") + + def read_regs(self, slave: int, addr: int, count: int) -> List[int]: + fn = 0x03 + payload = bytes([fn]) + addr.to_bytes(2, "big") + count.to_bytes(2, "big") + resp = self._xfer(slave, payload) + # resp: [addr, fn, byte_count, data..., crc_lo, crc_hi] + if len(resp) < 5: + raise ModbusException(f"read_regs: response too short: {resp!r}") + + byte_count = resp[2] + if len(resp) != 3 + byte_count + 2: + raise ModbusException( + f"read_regs: byte_count mismatch: byte_count={byte_count}, resp_len={len(resp)}" + ) + + vals: List[int] = [] + for i in range(0, byte_count, 2): + hi = resp[3 + i] + lo = resp[3 + i + 1] + vals.append((hi << 8) | lo) + return vals + + def write_reg(self, slave: int, addr: int, val: int) -> bool: + fn = 0x06 + payload = bytes([fn]) + addr.to_bytes(2, "big") + val.to_bytes(2, "big") + try: + resp = self._xfer(slave, payload) + except Exception as e: + logger.warning("write_reg error (slave=%s, addr=%s, val=%s): %s", + slave, addr, val, e) + return False + # 正常响应: addr fn hi(lo_addr) lo(lo_addr) hi(val) lo(val) crc_lo crc_hi + return len(resp) >= 6 and resp[1] == fn + + def write_regs(self, slave: int, start: int, values: List[int]) -> bool: + fn = 0x10 + bc = len(values) * 2 + payload = ( + bytes([fn]) + + start.to_bytes(2, "big") + + len(values).to_bytes(2, "big") + + bytes([bc]) + ) + for v in values: + payload += v.to_bytes(2, "big") + try: + resp = self._xfer(slave, payload) + except Exception as e: + logger.warning( + "write_regs error (slave=%s, start=%s, len=%s): %s", + slave, + start, + len(values), + e, + ) + return False + # 正常响应: addr fn start_hi start_lo qty_hi qty_lo crc_lo crc_hi + return len(resp) >= 6 and resp[1] == fn \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/drivers/__init__.py b/unilabos/devices/SII/laiyu_xyz_pipette/drivers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/points.json b/unilabos/devices/SII/laiyu_xyz_pipette/points.json new file mode 100644 index 00000000..51321376 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/points.json @@ -0,0 +1,7 @@ +{ + "G1": { + "x": 176.0, + "y": 118.0, + "z": 134.0 + } +} \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/print_xyz_steps.py b/unilabos/devices/SII/laiyu_xyz_pipette/print_xyz_steps.py new file mode 100644 index 00000000..2f355c04 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/print_xyz_steps.py @@ -0,0 +1,19 @@ +from laiyu_xyz_pipette.XYZStepperController import XYZStepperController +from laiyu_xyz_pipette.drivers.SharedRS485Bus import SharedRS485Bus + +if __name__ == "__main__": + # 根据你实际串口和波特率修改 + bus = SharedRS485Bus(port="/dev/ttyUSB0", baudrate=115200) + + ctrl = XYZStepperController( + bus, + origin_path="unilabos/devices/laiyu_xyz_pipette/work_origin.json", + ) + + x_steps, *_ = ctrl.get_status("X") + y_steps, *_ = ctrl.get_status("Y") + z_steps, *_ = ctrl.get_status("Z") + + print(f"X steps: {x_steps}") + print(f"Y steps: {y_steps}") + print(f"Z steps: {z_steps}") \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/station.py b/unilabos/devices/SII/laiyu_xyz_pipette/station.py new file mode 100644 index 00000000..2cbf430d --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/station.py @@ -0,0 +1,235 @@ +# station_simple.py +import logging +import json +import os +from typing import Optional + +from .drivers.SharedRS485Bus import SharedRS485Bus +from .XYZStepperController import XYZStepperController +from .drivers.SOPAPipetteYYQ import SOPAPipetteYYQ + +logger = logging.getLogger("liquid_station.simple_station") + + +class Station: + """ + 简化版 Station: + - 共享 RS485 总线 + - XYZ 三轴控制器 (XYZStepperController) + - SOPA 移液枪 (SOPAPipetteYYQ) + """ + + def __init__( + self, + port: str = "/dev/ttyUSB0", + baudrate: int = 115200, + points_file: str = "unilabos/devices/SII/laiyu_xyz_pipette/points.json", + origin_file: str = "unilabos/devices/SII/laiyu_xyz_pipette/work_origin.json", + ): + self.port = port + self.baudrate = baudrate + + self.bus: SharedRS485Bus = SharedRS485Bus(port=self.port, baudrate=self.baudrate) + self.xyz: Optional[XYZStepperController] = None + self.pip: Optional[SOPAPipetteYYQ] = None + + self.points_file = points_file + self.points: dict[str, dict[str, float]] = {} + self._load_points() + + self.origin_file = origin_file + + # 新增:记录最近一次 move_xyz 的目标工件坐标 + # 注意:这里的 x/y/z 是“工件坐标系的 mm”,和 move_xyz / move_to_point 使用的坐标体系一致 + self._last_target_work: dict[str, float] = {"x": 0.0, "y": 0.0, "z": 0.0} + + # ===== 点位加载 ===== + def _load_points(self): + """从 JSON 文件加载预定义点位""" + if not os.path.exists(self.points_file): + logger.warning("点位文件不存在: %s", self.points_file) + self.points = {} + return + + try: + with open(self.points_file, "r", encoding="utf-8") as f: + data = json.load(f) + except Exception as e: + logger.error("加载点位文件失败 %s: %s", self.points_file, e) + self.points = {} + return + + # 简单校验一下结构 + ok_points: dict[str, dict[str, float]] = {} + for name, p in data.items(): + try: + x = float(p["x"]) + y = float(p["y"]) + z = float(p["z"]) + ok_points[name] = {"x": x, "y": y, "z": z} + except Exception: + logger.warning("点位 %s 格式不正确,已忽略: %s", name, p) + + self.points = ok_points + logger.info("已加载点位 %d 个", len(self.points)) + + # ===== 连接与断开 ===== + def connect(self): + logger.info("Connecting station on %s ...", self.port) + self.bus.open() + # 把 origin_file 传进去,这样 XYZStepperController 会用这个路径加载/保存软零点 + self.xyz = XYZStepperController(self.bus, origin_path=self.origin_file) + self.pip = SOPAPipetteYYQ(self.bus) + logger.info("Station connected.") + + def disconnect(self): + self.bus.close() + + def _ensure_connected(self): + """如果尚未连接设备,则自动连接""" + if self.xyz is None or self.pip is None: + self.connect() + + # ===== 对外简单 API ===== + def move_xyz( + self, + x: float | None, + y: float | None, + z: float | None, + speed: int = 50, + acc: int = 60, + ): + self._ensure_connected() + + def _to_float_or_none(v): + if v is None: + return None + return float(v) + + x_f = _to_float_or_none(x) + y_f = _to_float_or_none(y) + z_f = _to_float_or_none(z) + + # 记录这次的目标工件坐标(None 时保持上一次的值) + # 这样 save_current_position_as_point 可以“保存最后一次 move_xyz 的目标” + if x_f is not None: + self._last_target_work["x"] = x_f + if y_f is not None: + self._last_target_work["y"] = y_f + if z_f is not None: + self._last_target_work["z"] = z_f + + # 仍然调用底层的工件坐标运动函数 + self.xyz.move_xyz_work(x_f, y_f, z_f, speed, acc) + + def aspirate(self, volume_uL: float): + self._ensure_connected() + if not self.pip: + raise RuntimeError("Pipette not initialized") + self.pip.aspirate(volume_uL) + + def dispense(self, volume_uL: float): + self._ensure_connected() + if not self.pip: + raise RuntimeError("Pipette not initialized") + self.pip.dispense(volume_uL) + + def eject_tip(self): + self._ensure_connected() + if not self.pip: + raise RuntimeError("Pipette not initialized") + self.pip.eject_tip() + + # ===== 新增:按点位名移动 ===== + def move_to_point( + self, + name: str, + speed: int = 50, + acc: int = 60, + z_offset: float = 0.0, + ): + """ + 根据点位名,从 self.points 中取出工件坐标 (x, y, z), + 然后直接调用 move_xyz 来完成运动并更新 _last_target_work。 + """ + self._ensure_connected() + + if name not in self.points: + raise ValueError(f"未知点位: {name},已加载点位数量={len(self.points)}") + + p = self.points[name] + x = float(p["x"]) + y = float(p["y"]) + z = float(p["z"]) + float(z_offset) + + logger.info( + "Move to point %s: x=%.3f, y=%.3f, z=%.3f (z_offset=%.3f)", + name, x, y, z, z_offset, + ) + # 关键:改为通过 move_xyz,而不是直接调 xyz.move_xyz_work + self.move_xyz(x, y, z, speed, acc) + + def save_point(self, name: str, x: float, y: float, z: float) -> None: + """ + 将一个点位保存/更新到 points_file 对应的 JSON 文件,并更新内存中的 self.points。 + + 这里假定 x, y, z 已经是“工件坐标系下的目标坐标(单位 mm)”, + 即直接保存 move_xyz/move_to_point 所用的坐标。 + """ + x = float(x) + y = float(y) + z = float(z) + + if self.points is None: + self.points = {} + self.points[name] = {"x": x, "y": y, "z": z} + + points_dir = os.path.dirname(self.points_file) + if points_dir and not os.path.exists(points_dir): + os.makedirs(points_dir, exist_ok=True) + + try: + with open(self.points_file, "w", encoding="utf-8") as f: + json.dump(self.points, f, indent=4, ensure_ascii=False) + logger.info( + "点位已保存到 %s: %s -> x=%.6f, y=%.6f, z=%.6f", + self.points_file, name, x, y, z, + ) + except Exception as e: + logger.error("保存点位到文件失败 %s: %s", self.points_file, e) + raise + + def save_current_position_as_point(self, name: str) -> None: + """ + 不再从电机读“当前位置”,而是把最近一次 move_xyz / move_to_point + 的目标工件坐标 (x, y, z) 直接保存为点位。 + + 这样即使某个轴通讯不正常,也能保证点位文件里的坐标和 + 上层调用 move_xyz 时的目标坐标一致。 + """ + self._ensure_connected() + + x_work = float(self._last_target_work.get("x", 0.0)) + y_work = float(self._last_target_work.get("y", 0.0)) + z_work = float(self._last_target_work.get("z", 0.0)) + + self.save_point(name, x_work, y_work, z_work) + logger.info( + "最近一次目标坐标已保存为点位 %s: x=%.6f, y=%.6f, z=%.6f (工件坐标 mm)", + name, x_work, y_work, z_work, + ) + + # ===== 新增:重置工件软零点 ===== + def define_current_as_zero(self): + """ + 以当前 XYZ 三轴的位置为新的工件软零点,并写入 origin_file 指定的 JSON。 + + 等价于直接调用 XYZStepperController.define_current_as_zero(origin_file)。 + """ + self._ensure_connected() + if not self.xyz: + raise RuntimeError("XYZ controller not initialized") + + logger.info("正在以当前步数重置工件软零点到: %s", self.origin_file) + self.xyz.define_current_as_zero(save_path=self.origin_file) + logger.info("软零点重置完成: %s", self.xyz.work_origin_steps) \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_x.py b/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_x.py new file mode 100644 index 00000000..7304f3dc --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_x.py @@ -0,0 +1,81 @@ +# laiyu_xyz_pipette/test_save_points_x.py +import os +import json +import time + +from laiyu_xyz_pipette.station_simple import Station + + +def main(): + base_dir = os.path.dirname(os.path.abspath(__file__)) + points_path = os.path.join(base_dir, "points.json") + origin_path = os.path.join(base_dir, "work_origin.json") + + print("Using points_file:", points_path) + print("Using origin_file:", origin_path) + + # 初始化 Station(如端口不同,请修改 port) + st = Station( + port="/dev/ttyUSB0", + baudrate=115200, + points_file=points_path, + origin_file=origin_path, + ) + + st.connect() + + try: + # 打印一下当前 X 状态,看看能不能正常读到 + if st.xyz: + print("\nCurrent X status:", st.xyz.get_status("X")) + + # 1) 移动到 x = 50,y/z 先用 0(你可以按需要改) + print("\n=== Step 1: move to x = 50, save as x_test1 ===") + st.move_xyz(x=50, y=0, z=0, speed=500, acc=6000) + time.sleep(1.0) + st.save_current_position_as_point("x_test1") + + # 2) 移动到 x = 100,保存为 x_test2 + print("\n=== Step 2: move to x = 100, save as x_test2 ===") + st.move_xyz(x=100, y=0, z=0, speed=500, acc=6000) + time.sleep(1.0) + st.save_current_position_as_point("x_test2") + + # 再打印一次 X 状态 + if st.xyz: + print("\nAfter moves, X status:", st.xyz.get_status("X")) + + finally: + st.disconnect() + + # 3) 读取 points.json,对比 x_test1 / x_test2 的 x + print("\n=== Step 3: load points.json and compare x ===") + if not os.path.exists(points_path): + print("points.json 不存在:", points_path) + return + + with open(points_path, "r", encoding="utf-8") as f: + data = json.load(f) + + p1 = data.get("x_test1") + p2 = data.get("x_test2") + + print("x_test1 point:", p1) + print("x_test2 point:", p2) + + if p1 is None or p2 is None: + print("x_test1 或 x_test2 未找到,请检查 save_current_position_as_point 是否被正常调用。") + return + + x1 = float(p1["x"]) + x2 = float(p2["x"]) + + print(f"\nSaved x values: x_test1.x = {x1}, x_test2.x = {x2}") + if x1 == x2: + print("WARNING: x_test1.x 和 x_test2.x 完全相同,说明 X 轴位置可能没有随实际运动变化,需要检查通信/控制逻辑。") + else: + print("OK: x_test1.x 和 x_test2.x 不同,说明 X 轴记录是随位置变化的。") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_y.py b/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_y.py new file mode 100644 index 00000000..498d6ca6 --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/test_save_points_y.py @@ -0,0 +1,81 @@ +import os +import json +import time + +from laiyu_xyz_pipette.station_simple import Station + + +def main(): + # 根据你的 device yaml 默认值,points_file / origin_file 都是相对路径 + # 这里用 __file__ 做一个相对路径到当前包目录的映射 + base_dir = os.path.dirname(os.path.abspath(__file__)) + points_path = os.path.join(base_dir, "points.json") + origin_path = os.path.join(base_dir, "work_origin.json") + + print("Using points_file:", points_path) + print("Using origin_file:", origin_path) + + # 初始化 Station + st = Station( + port="/dev/ttyUSB0", # 如有不同串口自行修改 + baudrate=115200, + points_file=points_path, + origin_file=origin_path, + ) + + # 连接设备 + st.connect() + + try: + # 可选:先确保已经有软零点;若不放心,可以先手动跑一次 define_current_as_zero + # 这里不自动设零点,避免影响你现有标定 + + # 1) 移动到 y = 50,x/z 先保持当前 + # 为了简单,这里把 x/z 都设为 0(工件坐标),你根据需求可以改 + print("\n=== Step 1: move to y = 50, save as test1 ===") + st.move_xyz(x=0, y=50, z=0, speed=500, acc=6000) + # 稍微等一下电机停止(保险起见) + time.sleep(1.0) + st.save_current_position_as_point("test1") + + # 2) 移动到 y = 100,保存为 test2 + print("\n=== Step 2: move to y = 100, save as test2 ===") + st.move_xyz(x=0, y=100, z=0, speed=500, acc=6000) + time.sleep(1.0) + st.save_current_position_as_point("test2") + + finally: + # 断开设备 + st.disconnect() + + # 3) 读取 points.json,打印 test1/test2 的 y 值 + print("\n=== Step 3: load points.json and compare y ===") + if not os.path.exists(points_path): + print("points.json 不存在:", points_path) + return + + with open(points_path, "r", encoding="utf-8") as f: + data = json.load(f) + + p1 = data.get("test1") + p2 = data.get("test2") + + print("test1 point:", p1) + print("test2 point:", p2) + + if p1 is None or p2 is None: + print("test1 或 test2 未找到,请检查 save_current_position_as_point 是否被正常调用。") + return + + y1 = float(p1["y"]) + y2 = float(p2["y"]) + + print(f"\nSaved y values: test1.y = {y1}, test2.y = {y2}") + if y1 == y2: + print("WARNING: test1.y 和 test2.y 完全相同,说明 y 可能没有随实际位置变化,需检查 save_current_position_as_point 逻辑。") + else: + print("OK: test1.y 和 test2.y 不同,说明 y 记录是随位置变化的。") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/test_scan_slaves.py b/unilabos/devices/SII/laiyu_xyz_pipette/test_scan_slaves.py new file mode 100644 index 00000000..de712a2c --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/test_scan_slaves.py @@ -0,0 +1,21 @@ +import time +from laiyu_xyz_pipette.drivers.SharedRS485Bus import SharedRS485Bus +from laiyu_xyz_pipette.drivers.XYZModbus import XYZModbus, ModbusException # 按你的模块名改 + +def try_read(slave): + modbus = XYZModbus(bus) + try: + vals = modbus.read_regs(slave, 0x00, 6) + print(f"slave={slave}: OK, vals={vals}") + except Exception as e: + print(f"slave={slave}: ERROR -> {e}") + +if __name__ == "__main__": + bus = SharedRS485Bus(port="/dev/ttyUSB0", baudrate=115200) # 按实际修改 + bus.open() + try: + for sid in range(1, 10): + try_read(sid) + time.sleep(0.5) + finally: + bus.close() \ No newline at end of file diff --git a/unilabos/devices/SII/laiyu_xyz_pipette/work_origin.json b/unilabos/devices/SII/laiyu_xyz_pipette/work_origin.json new file mode 100644 index 00000000..935c3e3b --- /dev/null +++ b/unilabos/devices/SII/laiyu_xyz_pipette/work_origin.json @@ -0,0 +1,8 @@ +{ + "work_origin_steps": { + "x": 11799, + "y": 11476, + "z": 3312 + }, + "timestamp": "2025-11-04T15:31:09.802155" +} \ No newline at end of file diff --git a/unilabos/registry/devices/SII.yaml b/unilabos/registry/devices/SII.yaml new file mode 100644 index 00000000..aa51e9a6 --- /dev/null +++ b/unilabos/registry/devices/SII.yaml @@ -0,0 +1,759 @@ +camera_usb: + category: + - camera + - SII + class: + action_value_mappings: + get_status: + feedback: {} + goal: {} + goal_default: {} + handles: {} + result: {} + schema: + description: 获取摄像头运行状态(running/websocket_connected/ffmpeg_running 等) + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: {} + required: [] + type: object + required: + - goal + title: get_status参数 + type: object + type: UniLabJsonCommand + start: + feedback: {} + goal: + config: config + goal_default: + config: {} + handles: {} + result: {} + schema: + description: 启动摄像头(内部启动 FFmpeg + WebSocket 信令循环) + properties: + feedback: {} + goal: + properties: + config: + type: object + required: + - config + type: object + result: + properties: {} + required: [] + type: object + required: + - goal + title: start参数 + type: object + type: UniLabJsonCommand + stop: + feedback: {} + goal: {} + goal_default: {} + handles: {} + result: {} + schema: + description: 停止摄像头(停止 FFmpeg,关闭 WebSocket,停止事件循环线程) + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: {} + required: [] + type: object + required: + - goal + title: stop参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.SII.cameraSII.cameraUSB:CameraController + status_types: + ffmpeg_running: bool + running: bool + status: dict + websocket_connected: bool + type: python + config_info: [] + description: Linux USB 摄像头驱动(WebSocket 信令 + FFmpeg 推流 + WebRTC 转发) + handles: [] + icon: '' + init_param_schema: + config: + properties: + audio_bitrate: + default: 64k + type: string + audio_device: + type: string + auto_start: + default: false + type: boolean + fps: + default: 30 + type: integer + height: + default: 720 + type: integer + host_id: + default: demo-host + type: string + rtmp_url: + default: rtmp://srs.sciol.ac.cn:4499/live/camera-01 + type: string + signal_backend_url: + default: wss://sciol.ac.cn/api/realtime/signal/host + type: string + video_bitrate: + default: 1500k + type: string + video_device: + default: /dev/video0 + type: string + webrtc_api: + default: https://srs.sciol.ac.cn/rtc/v1/play/ + type: string + webrtc_stream_url: + default: webrtc://srs.sciol.ac.cn:4500/live/camera-01 + type: string + width: + default: 1280 + type: integer + required: [] + type: object + data: + properties: + ffmpeg_running: + type: boolean + running: + type: boolean + status: + type: object + websocket_connected: + type: boolean + required: + - status + - running + - websocket_connected + - ffmpeg_running + type: object + version: 1.0.0 +cameracontroller_device: + category: + - cameraSII + - SII + class: + action_value_mappings: + auto-ptz_move_down: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.5 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.5 + type: number + required: [] + type: object + result: {} + required: + - goal + title: ptz_move_down参数 + type: object + type: UniLabJsonCommand + auto-ptz_move_left: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.2 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.2 + type: number + required: [] + type: object + result: {} + required: + - goal + title: ptz_move_left参数 + type: object + type: UniLabJsonCommand + auto-ptz_move_right: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.2 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.2 + type: number + required: [] + type: object + result: {} + required: + - goal + title: ptz_move_right参数 + type: object + type: UniLabJsonCommand + auto-ptz_move_up: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.5 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.5 + type: number + required: [] + type: object + result: {} + required: + - goal + title: ptz_move_up参数 + type: object + type: UniLabJsonCommand + auto-ptz_stop: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: ptz_stop参数 + type: object + type: UniLabJsonCommand + auto-start: + feedback: {} + goal: {} + goal_default: + config: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + config: + type: string + required: [] + type: object + result: {} + required: + - goal + title: start参数 + type: object + type: UniLabJsonCommand + auto-stop: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stop参数 + type: object + type: UniLabJsonCommand + auto-zoom_in: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.2 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.2 + type: number + required: [] + type: object + result: {} + required: + - goal + title: zoom_in参数 + type: object + type: UniLabJsonCommand + auto-zoom_out: + feedback: {} + goal: {} + goal_default: + duration: 1.0 + speed: 0.2 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + duration: + default: 1.0 + type: number + speed: + default: 0.2 + type: number + required: [] + type: object + result: {} + required: + - goal + title: zoom_out参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.SII.cameraSII.cameraWifi:CameraController + status_types: + status: dict + type: python + config_info: [] + description: 'camera driver: display and actions' + handles: [] + icon: '' + init_param_schema: + config: + properties: + camera_rtsp_url: + default: '' + type: string + host_id: + default: demo-host + type: string + ptz_host: + default: '' + type: string + ptz_password: + default: '' + type: string + ptz_port: + default: 80 + type: integer + ptz_user: + default: '' + type: string + rtmp_url: + default: rtmp://srs.sciol.ac.cn:4499/live/camera-01 + type: string + signal_backend_url: + default: wss://sciol.ac.cn/api/realtime/signal/host + type: string + webrtc_api: + default: https://srs.sciol.ac.cn/rtc/v1/play/ + type: string + webrtc_stream_url: + default: webrtc://srs.sciol.ac.cn:4500/live/camera-01 + type: string + required: [] + type: object + data: + properties: + status: + type: object + required: + - status + type: object + registry_type: device + version: 1.0.0 +xyz_pipette_device: + category: + - laiyu_xyz_pipette + - SII + class: + action_value_mappings: + auto-aspirate: + feedback: {} + goal: {} + goal_default: + volume_uL: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + volume_uL: + type: number + required: + - volume_uL + type: object + result: {} + required: + - goal + title: aspirate参数 + type: object + type: UniLabJsonCommand + auto-connect: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: connect参数 + type: object + type: UniLabJsonCommand + auto-define_current_as_zero: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 以当前 XYZ 位置重置工件软零点 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: define_current_as_zero参数 + type: object + type: UniLabJsonCommand + auto-disconnect: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: disconnect参数 + type: object + type: UniLabJsonCommand + auto-dispense: + feedback: {} + goal: {} + goal_default: + volume_uL: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + volume_uL: + type: number + required: + - volume_uL + type: object + result: {} + required: + - goal + title: dispense参数 + type: object + type: UniLabJsonCommand + auto-eject_tip: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: eject_tip参数 + type: object + type: UniLabJsonCommand + auto-move_to_point: + feedback: {} + goal: {} + goal_default: + acc: 60 + name: null + speed: 50 + z_offset: 0.0 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + acc: + default: 60 + type: integer + name: + type: string + speed: + default: 50 + type: integer + z_offset: + default: 0.0 + type: number + required: + - name + type: object + result: {} + required: + - goal + title: move_to_point参数 + type: object + type: UniLabJsonCommand + auto-move_xyz: + feedback: {} + goal: {} + goal_default: + acc: 60 + speed: 50 + x: null + y: null + z: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + acc: + default: 60 + type: integer + speed: + default: 50 + type: integer + x: + type: string + y: + type: string + z: + type: string + required: + - x + - y + - z + type: object + result: {} + required: + - goal + title: move_xyz参数 + type: object + type: UniLabJsonCommand + auto-save_current_position_as_point: + feedback: {} + goal: {} + goal_default: + name: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 使用最近一次目标坐标保存点位 + properties: + feedback: {} + goal: + properties: + name: + type: string + required: + - name + type: object + result: {} + required: + - goal + title: save_current_position_as_point参数 + type: object + type: UniLabJsonCommand + auto-save_point: + feedback: {} + goal: {} + goal_default: + name: null + x: null + y: null + z: null + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 保存/更新一个点位到 points.json + properties: + feedback: {} + goal: + properties: + name: + type: string + x: + type: number + y: + type: number + z: + type: number + required: + - name + - x + - y + - z + type: object + result: {} + required: + - goal + title: save_point参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.SII.laiyu_xyz_pipette.station:Station + status_types: {} + type: python + config_info: [] + description: xyz_pipette_device + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 115200 + type: integer + origin_file: + default: unilabos/devices/SII/laiyu_xyz_pipette/work_origin.json + type: string + points_file: + default: unilabos/devices/SII/laiyu_xyz_pipette/points.json + type: string + port: + default: /dev/ttyUSB0 + type: string + required: [] + type: object + data: + properties: {} + required: [] + type: object + registry_type: device + version: 1.0.0 diff --git a/unilabos/registry/devices/characterization_chromatic.yaml b/unilabos/registry/devices/characterization_chromatic.yaml index f3059b58..37446bc3 100644 --- a/unilabos/registry/devices/characterization_chromatic.yaml +++ b/unilabos/registry/devices/characterization_chromatic.yaml @@ -1,231 +1,3 @@ -hplc.agilent: - category: - - characterization_chromatic - class: - action_value_mappings: - auto-check_status: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 检查安捷伦HPLC设备状态的函数。用于监控设备的运行状态、连接状态、错误信息等关键指标。该函数定期查询设备状态,确保系统稳定运行,及时发现和报告设备异常。适用于自动化流程中的设备监控、故障诊断、系统维护等场景。 - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: check_status参数 - type: object - type: UniLabJsonCommand - auto-extract_data_from_txt: - feedback: {} - goal: {} - goal_default: - file_path: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 从文本文件中提取分析数据的函数。用于解析安捷伦HPLC生成的结果文件,提取峰面积、保留时间、浓度等关键分析数据。支持多种文件格式的自动识别和数据结构化处理,为后续数据分析和报告生成提供标准化的数据格式。适用于批量数据处理、结果验证、质量控制等分析工作流程。 - properties: - feedback: {} - goal: - properties: - file_path: - type: string - required: - - file_path - type: object - result: {} - required: - - goal - title: extract_data_from_txt参数 - type: object - type: UniLabJsonCommand - auto-start_sequence: - feedback: {} - goal: {} - goal_default: - params: null - resource: null - wf_name: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 启动安捷伦HPLC分析序列的函数。用于执行预定义的分析方法序列,包括样品进样、色谱分离、检测等完整的分析流程。支持参数配置、资源分配、工作流程管理等功能,实现全自动的样品分析。适用于批量样品处理、标准化分析、质量检测等需要连续自动分析的应用场景。 - properties: - feedback: {} - goal: - properties: - params: - type: string - resource: - type: object - wf_name: - type: string - required: - - wf_name - type: object - result: {} - required: - - goal - title: start_sequence参数 - type: object - type: UniLabJsonCommand - auto-try_close_sub_device: - feedback: {} - goal: {} - goal_default: - device_name: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 尝试关闭HPLC子设备的函数。用于安全地关闭泵、检测器、进样器等各个子模块,确保设备正常断开连接并保护硬件安全。该函数提供错误处理和状态确认机制,避免强制关闭可能造成的设备损坏。适用于设备维护、系统重启、紧急停机等需要安全关闭设备的场景。 - properties: - feedback: {} - goal: - properties: - device_name: - type: string - required: [] - type: object - result: {} - required: - - goal - title: try_close_sub_device参数 - type: object - type: UniLabJsonCommand - auto-try_open_sub_device: - feedback: {} - goal: {} - goal_default: - device_name: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 尝试打开HPLC子设备的函数。用于初始化和连接泵、检测器、进样器等各个子模块,建立设备通信并进行自检。该函数提供连接验证和错误恢复机制,确保子设备正常启动并准备就绪。适用于设备初始化、系统启动、设备重连等需要建立设备连接的场景。 - properties: - feedback: {} - goal: - properties: - device_name: - type: string - required: [] - type: object - result: {} - required: - - goal - title: try_open_sub_device参数 - type: object - type: UniLabJsonCommand - execute_command_from_outer: - feedback: {} - goal: - command: command - goal_default: - command: '' - handles: {} - result: - success: success - schema: - description: '' - properties: - feedback: - properties: - status: - type: string - required: - - status - title: SendCmd_Feedback - type: object - goal: - properties: - command: - type: string - required: - - command - title: SendCmd_Goal - type: object - result: - properties: - return_info: - type: string - success: - type: boolean - required: - - return_info - - success - title: SendCmd_Result - type: object - required: - - goal - title: SendCmd - type: object - type: SendCmd - module: unilabos.devices.hplc.AgilentHPLC:HPLCDriver - status_types: - could_run: bool - data_file: String - device_status: str - driver_init_ok: bool - finish_status: str - is_running: bool - status_text: str - success: bool - type: python - config_info: [] - description: 安捷伦高效液相色谱(HPLC)分析设备,用于复杂化合物的分离、检测和定量分析。该设备通过UI自动化技术控制安捷伦ChemStation软件,实现全自动的样品分析流程。具备序列启动、设备状态监控、数据文件提取、结果处理等功能。支持多样品批量处理和实时状态反馈,适用于药物分析、环境检测、食品安全、化学研究等需要高精度色谱分析的实验室应用。 - handles: [] - icon: '' - init_param_schema: - config: - properties: - driver_debug: - default: false - type: string - required: [] - type: object - data: - properties: - could_run: - type: boolean - data_file: - items: - type: string - type: array - device_status: - type: string - driver_init_ok: - type: boolean - finish_status: - type: string - is_running: - type: boolean - status_text: - type: string - success: - type: boolean - required: - - status_text - - device_status - - could_run - - driver_init_ok - - is_running - - success - - finish_status - - data_file - type: object - version: 1.0.0 hplc.agilent-zhida: category: - characterization_chromatic diff --git a/unilabos/registry/devices/characterization_optic.yaml b/unilabos/registry/devices/characterization_optic.yaml index 80dcf93d..0967ef42 100644 --- a/unilabos/registry/devices/characterization_optic.yaml +++ b/unilabos/registry/devices/characterization_optic.yaml @@ -1,194 +1 @@ -raman.home_made: - category: - - characterization_optic - class: - action_value_mappings: - auto-ccd_time: - feedback: {} - goal: {} - goal_default: - int_time: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 设置CCD检测器积分时间的函数。用于配置拉曼光谱仪的信号采集时间,控制光谱数据的质量和信噪比。较长的积分时间可获得更高的信号强度和更好的光谱质量,但会增加测量时间。该函数允许根据样品特性和测量要求动态调整检测参数,优化测量效果。 - properties: - feedback: {} - goal: - properties: - int_time: - type: string - required: - - int_time - type: object - result: {} - required: - - goal - title: ccd_time参数 - type: object - type: UniLabJsonCommand - auto-laser_on_power: - feedback: {} - goal: {} - goal_default: - output_voltage_laser: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 设置激光器输出功率的函数。用于控制拉曼光谱仪激光器的功率输出,调节激光强度以适应不同样品的测量需求。适当的激光功率能够获得良好的拉曼信号同时避免样品损伤。该函数支持精确的功率控制,确保测量结果的稳定性和重现性。 - properties: - feedback: {} - goal: - properties: - output_voltage_laser: - type: string - required: - - output_voltage_laser - type: object - result: {} - required: - - goal - title: laser_on_power参数 - type: object - type: UniLabJsonCommand - auto-raman_without_background: - feedback: {} - goal: {} - goal_default: - int_time: null - laser_power: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 执行无背景扣除的拉曼光谱测量函数。用于直接采集样品的拉曼光谱信号,不进行背景校正处理。该函数配置积分时间和激光功率参数,获取原始光谱数据用于后续的数据处理分析。适用于对光谱数据质量要求较高或需要自定义背景处理流程的测量场景。 - properties: - feedback: {} - goal: - properties: - int_time: - type: string - laser_power: - type: string - required: - - int_time - - laser_power - type: object - result: {} - required: - - goal - title: raman_without_background参数 - type: object - type: UniLabJsonCommand - auto-raman_without_background_average: - feedback: {} - goal: {} - goal_default: - average: null - int_time: null - laser_power: null - sample_name: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 执行多次平均的无背景拉曼光谱测量函数。通过多次测量取平均值来提高光谱数据的信噪比和测量精度,减少随机噪声影响。该函数支持自定义平均次数、积分时间、激光功率等参数,并可为样品指定名称便于数据管理。适用于对测量精度要求较高的定量分析和研究应用。 - properties: - feedback: {} - goal: - properties: - average: - type: string - int_time: - type: string - laser_power: - type: string - sample_name: - type: string - required: - - sample_name - - int_time - - laser_power - - average - type: object - result: {} - required: - - goal - title: raman_without_background_average参数 - type: object - type: UniLabJsonCommand - raman_cmd: - feedback: {} - goal: - command: command - goal_default: - command: '' - handles: {} - result: - success: success - schema: - description: '' - properties: - feedback: - properties: - status: - type: string - required: - - status - title: SendCmd_Feedback - type: object - goal: - properties: - command: - type: string - required: - - command - title: SendCmd_Goal - type: object - result: - properties: - return_info: - type: string - success: - type: boolean - required: - - return_info - - success - title: SendCmd_Result - type: object - required: - - goal - title: SendCmd - type: object - type: SendCmd - module: unilabos.devices.raman_uv.home_made_raman:RamanObj - status_types: {} - type: python - config_info: [] - description: 拉曼光谱分析设备,用于物质的分子结构和化学成分表征。该设备集成激光器和CCD检测器,通过串口通信控制激光功率和光谱采集。具备背景扣除、多次平均、自动数据处理等功能,支持高精度的拉曼光谱测量。适用于材料表征、化学分析、质量控制、研究开发等需要分子指纹识别和结构分析的实验应用。 - handles: [] - icon: '' - init_param_schema: - config: - properties: - baudrate_ccd: - default: 921600 - type: string - baudrate_laser: - default: 9600 - type: string - port_ccd: - type: string - port_laser: - type: string - required: - - port_laser - - port_ccd - type: object - data: - properties: {} - required: [] - type: object - version: 1.0.0 +{} diff --git a/unilabos/registry/devices/opcua_example.yaml b/unilabos/registry/devices/opcua_example.yaml index a7e6b4e3..0967ef42 100644 --- a/unilabos/registry/devices/opcua_example.yaml +++ b/unilabos/registry/devices/opcua_example.yaml @@ -1,176 +1 @@ -opcua_example: - category: - - opcua_example - class: - action_value_mappings: - auto-disconnect: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: disconnect参数 - type: object - type: UniLabJsonCommand - auto-load_config: - feedback: {} - goal: {} - goal_default: - config_path: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - config_path: - type: string - required: - - config_path - type: object - result: {} - required: - - goal - title: load_config参数 - type: object - type: UniLabJsonCommand - auto-refresh_node_values: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: refresh_node_values参数 - type: object - type: UniLabJsonCommand - auto-set_node_value: - feedback: {} - goal: {} - goal_default: - name: null - value: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - name: - type: string - value: - type: string - required: - - name - - value - type: object - result: {} - required: - - goal - title: set_node_value参数 - type: object - type: UniLabJsonCommand - auto-start_node_refresh: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: start_node_refresh参数 - type: object - type: UniLabJsonCommand - auto-stop_node_refresh: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: stop_node_refresh参数 - type: object - type: UniLabJsonCommand - module: unilabos.device_comms.opcua_client.client:OpcUaClient - status_types: - node_value: String - type: python - config_info: [] - description: null - handles: [] - icon: '' - init_param_schema: - config: - properties: - config_path: - type: string - password: - type: string - refresh_interval: - default: 1.0 - type: number - url: - type: string - username: - type: string - required: - - url - type: object - data: - properties: - node_value: - type: string - required: - - node_value - type: object - version: 1.0.0 +{} diff --git a/unilabos/registry/devices/robot_linear_motion.yaml b/unilabos/registry/devices/robot_linear_motion.yaml index 0f8506e9..7d9de362 100644 --- a/unilabos/registry/devices/robot_linear_motion.yaml +++ b/unilabos/registry/devices/robot_linear_motion.yaml @@ -834,174 +834,3 @@ linear_motion.toyo_xyz.sim: mesh: toyo_xyz type: device version: 1.0.0 -motor.iCL42: - category: - - robot_linear_motion - class: - action_value_mappings: - auto-execute_run_motor: - feedback: {} - goal: {} - goal_default: - mode: null - position: null - velocity: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 步进电机执行运动函数。直接执行电机运动命令,包括位置设定、速度控制和路径规划。该函数处理底层的电机控制协议,消除警告信息,设置运动参数并启动电机运行。适用于需要直接控制电机运动的应用场景。 - properties: - feedback: {} - goal: - properties: - mode: - type: string - position: - type: number - velocity: - type: integer - required: - - mode - - position - - velocity - type: object - result: {} - required: - - goal - title: execute_run_motor参数 - type: object - type: UniLabJsonCommand - auto-init_device: - feedback: {} - goal: {} - goal_default: {} - handles: {} - placeholder_keys: {} - result: {} - schema: - description: iCL42电机设备初始化函数。建立与iCL42步进电机驱动器的串口通信连接,配置通信参数包括波特率、数据位、校验位等。该函数是电机使用前的必要步骤,确保驱动器处于可控状态并准备接收运动指令。 - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: init_device参数 - type: object - type: UniLabJsonCommand - auto-run_motor: - feedback: {} - goal: {} - goal_default: - mode: null - position: null - velocity: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: 步进电机运动控制函数。根据指定的运动模式、目标位置和速度参数控制电机运动。支持多种运动模式和精确的位置控制,自动处理运动轨迹规划和执行。该函数提供异步执行和状态反馈,确保运动的准确性和可靠性。 - properties: - feedback: {} - goal: - properties: - mode: - type: string - position: - type: number - velocity: - type: integer - required: - - mode - - position - - velocity - type: object - result: {} - required: - - goal - title: run_motor参数 - type: object - type: UniLabJsonCommand - execute_command_from_outer: - feedback: {} - goal: - command: command - goal_default: - command: '' - handles: {} - result: - success: success - schema: - description: '' - properties: - feedback: - properties: - status: - type: string - required: - - status - title: SendCmd_Feedback - type: object - goal: - properties: - command: - type: string - required: - - command - title: SendCmd_Goal - type: object - result: - properties: - return_info: - type: string - success: - type: boolean - required: - - return_info - - success - title: SendCmd_Result - type: object - required: - - goal - title: SendCmd - type: object - type: SendCmd - module: unilabos.devices.motor.iCL42:iCL42Driver - status_types: - is_executing_run: bool - motor_position: int - success: bool - type: python - config_info: [] - description: iCL42步进电机驱动器,用于实验室设备的精密线性运动控制。该设备通过串口通信控制iCL42型步进电机驱动器,支持多种运动模式和精确的位置、速度控制。具备位置反馈、运行状态监控和故障检测功能。适用于自动进样器、样品传送、精密定位平台等需要准确线性运动控制的实验室自动化设备。 - handles: [] - icon: '' - init_param_schema: - config: - properties: - device_address: - default: 1 - type: integer - device_com: - default: COM9 - type: string - required: [] - type: object - data: - properties: - is_executing_run: - type: boolean - motor_position: - type: integer - success: - type: boolean - required: - - motor_position - - is_executing_run - - success - type: object - version: 1.0.0