From 542c559918ff51842ab67e9898d38d97f7bcdead Mon Sep 17 00:00:00 2001 From: Om Gate Date: Wed, 24 Sep 2025 01:32:33 +0530 Subject: [PATCH 1/3] feat: emit messages to room --- backend/director/core/session.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/backend/director/core/session.py b/backend/director/core/session.py index cbcdaf24..6405caa6 100644 --- a/backend/director/core/session.py +++ b/backend/director/core/session.py @@ -220,28 +220,31 @@ class OutputMessage(BaseMessage): status: MsgStatus = MsgStatus.progress def update_status(self, status: MsgStatus): - """Update the status of the message and publish the message to the socket. for loading state.""" + """Update the status and broadcast to session room.""" self.status = status self._publish() def push_update(self): - """Publish the message to the socket.""" + """Push real-time update to session room.""" try: self._publish() except Exception as e: - print(f"Error in emitting message: {str(e)}") + print(f"Error in emitting update to session {self.session_id}: {str(e)}") def publish(self): - """Store the message in the database. for conversation history and publish the message to the socket.""" + """Store in database and broadcast final result to session room.""" self._publish() def _publish(self): try: - emit("chat", self.model_dump(), namespace="/chat") + emit("chat", self.model_dump(), + room=self.session_id, + namespace="/chat") + print(f"Emitted message to session room: {self.session_id}") except Exception as e: - print(f"Error in emitting message: {str(e)}") - self.db.add_or_update_msg_to_conv(**self.model_dump()) + print(f"Error emitting to session {self.session_id}: {str(e)}") + self.db.add_or_update_msg_to_conv(**self.model_dump()) def format_user_message(message: dict) -> dict: message_content = message.get("content") @@ -393,6 +396,10 @@ def delete(self): """Delete the session from the database.""" return self.db.delete_session(self.session_id) + def update(self, **kwargs) -> bool: + """Update the session in the database.""" + return self.db.update_session(self.session_id, **kwargs) + def emit_event(self, event: BaseEvent, namespace="/chat"): """Emits a structured WebSocket event to notify all clients about updates.""" From 6685f1dc9188482e73d0b0b797e692845420f0d4 Mon Sep 17 00:00:00 2001 From: Om Gate Date: Wed, 24 Sep 2025 01:34:19 +0530 Subject: [PATCH 2/3] feat: session room joining --- backend/director/entrypoint/api/socket_io.py | 34 ++++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/backend/director/entrypoint/api/socket_io.py b/backend/director/entrypoint/api/socket_io.py index 857e3392..a60ca4aa 100644 --- a/backend/director/entrypoint/api/socket_io.py +++ b/backend/director/entrypoint/api/socket_io.py @@ -1,18 +1,40 @@ import os from flask import current_app as app -from flask_socketio import Namespace - +from flask_socketio import Namespace, join_room, emit from director.db import load_db from director.handler import ChatHandler - class ChatNamespace(Namespace): - """Chat namespace for socket.io""" - + """Chat namespace for socket.io with session-based rooms""" + + def on_connect(self): + """Handle client connection""" + print(f"Client connected to chat namespace") + def on_chat(self, message): - """Handle chat messages""" + """Handle chat messages and auto-join session room""" + session_id = message.get('session_id') + if not session_id: + emit('error', {'message': 'session_id is required'}) + return + + join_room(session_id) + print(f"Client joined session room {session_id}") + chat_handler = ChatHandler( db=load_db(os.getenv("SERVER_DB_TYPE", app.config["DB_TYPE"])) ) chat_handler.chat(message) + + def on_join_session(self, data): + """Explicitly join a session room (for reconnection)""" + session_id = data.get('session_id') + if session_id: + join_room(session_id) + emit('session_joined', {'session_id': session_id}) + print(f"Client explicitly joined session room {session_id}") + + def on_disconnect(self): + """Handle client disconnection""" + print(f"Client disconnected from chat namespace") From 798b828c3469855458ecb952208a42d8766c3585 Mon Sep 17 00:00:00 2001 From: Om Gate Date: Wed, 24 Sep 2025 17:02:10 +0530 Subject: [PATCH 3/3] feat: emit user message --- backend/director/core/session.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/backend/director/core/session.py b/backend/director/core/session.py index 6405caa6..12e37bb4 100644 --- a/backend/director/core/session.py +++ b/backend/director/core/session.py @@ -208,8 +208,15 @@ class InputMessage(BaseMessage): msg_type: MsgType = MsgType.input def publish(self): - """Store the message in the database. for conversation history.""" + """Store the message in the database and broadcast to session room.""" self.db.add_or_update_msg_to_conv(**self.model_dump(exclude={"db"})) + try: + emit("chat", self.model_dump(exclude={"db"}), + room=self.session_id, + namespace="/chat") + print(f"Broadcasted input message to session room: {self.session_id}") + except Exception as e: + print(f"Error broadcasting input message to session {self.session_id}: {str(e)}") class OutputMessage(BaseMessage):