From d5495e281f7903074b0eae1aee442a1b71ee34bf Mon Sep 17 00:00:00 2001 From: jasper blues Date: Wed, 4 Feb 2026 07:20:04 +1100 Subject: [PATCH 1/5] Integrate embabel-version that uses chat-store directly --- pom.xml | 11 ++- .../guide/chat/event/MessageEventListener.kt | 86 +++++++++++++++++++ .../guide/chat/model/DeliveredMessage.kt | 2 +- .../guide/chat/service/ChatSessionService.kt | 69 +++++++++++---- .../guide/chat/service/JesseService.kt | 80 +++++++++++------ 5 files changed, 202 insertions(+), 46 deletions(-) create mode 100644 src/main/kotlin/com/embabel/guide/chat/event/MessageEventListener.kt diff --git a/pom.xml b/pom.xml index fe795b0..8d4fde2 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 21 - 0.3.3-SNAPSHOT + 0.3.4-SNAPSHOT 2.2.0 @@ -53,7 +53,14 @@ com.embabel.agent embabel-chat-store - 0.1.0-SNAPSHOT + 0.2.0-SNAPSHOT + + + + + com.embabel.agent + embabel-agent-chatstore-autoconfigure + ${embabel-agent.version} diff --git a/src/main/kotlin/com/embabel/guide/chat/event/MessageEventListener.kt b/src/main/kotlin/com/embabel/guide/chat/event/MessageEventListener.kt new file mode 100644 index 0000000..0f6265c --- /dev/null +++ b/src/main/kotlin/com/embabel/guide/chat/event/MessageEventListener.kt @@ -0,0 +1,86 @@ +package com.embabel.guide.chat.event + +import com.embabel.chat.event.MessageEvent +import com.embabel.guide.chat.model.DeliveredMessage +import com.embabel.guide.chat.model.StatusMessage +import com.embabel.guide.chat.service.ChatService +import com.embabel.guide.domain.GuideUserRepository +import com.embabel.guide.util.UUIDv7 +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component + +/** + * Listens for MessageEvents and delivers messages to users via WebSocket. + * + * This decouples message persistence from WebSocket delivery: + * - ADDED: Message was added to conversation - send to recipient immediately + * - PERSISTENCE_FAILED: Log error for monitoring + */ +@Component +class MessageEventListener( + private val chatService: ChatService, + private val guideUserRepository: GuideUserRepository +) { + private val logger = LoggerFactory.getLogger(MessageEventListener::class.java) + + @EventListener(condition = "#event.status.name() == 'ADDED'") + fun onMessageAdded(event: MessageEvent) { + val toGuideUserId = event.toUserId + if (toGuideUserId == null) { + logger.debug("MessageEvent has no toUserId, skipping WebSocket delivery for session {}", event.conversationId) + return + } + + val message = event.message + if (message == null) { + logger.warn("MessageEvent ADDED has no message for session {}", event.conversationId) + return + } + + // Look up the GuideUser to get their webUserId for WebSocket routing + val guideUser = guideUserRepository.findById(toGuideUserId).orElse(null) + if (guideUser == null) { + logger.warn("GuideUser not found for id {}, skipping WebSocket delivery", toGuideUserId) + return + } + + val webUserId = guideUser.webUser?.id + if (webUserId == null) { + logger.debug("GuideUser {} has no webUser, skipping WebSocket delivery", toGuideUserId) + return + } + + logger.debug("Delivering message to webUser {} (guideUser {}) for session {}", + webUserId, toGuideUserId, event.conversationId) + + val delivered = DeliveredMessage( + id = UUIDv7.generateString(), + sessionId = event.conversationId, + role = message.role.name.lowercase(), + body = message.content, + ts = event.timestamp, + authorId = event.fromUserId, + title = event.title + ) + + chatService.sendToUser(webUserId, delivered) + + // Send status update to clear typing indicator + event.fromUserId?.let { fromUserId -> + chatService.sendStatusToUser(webUserId, StatusMessage(fromUserId = fromUserId)) + } + } + + @EventListener(condition = "#event.status.name() == 'PERSISTENCE_FAILED'") + fun onPersistenceFailed(event: MessageEvent) { + logger.error( + "Message persistence failed for session {}, role={}, error={}", + event.conversationId, + event.role, + event.error?.message, + event.error + ) + // Could notify user of failure, implement retry logic, etc. + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt b/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt index 0076b00..34e8e57 100644 --- a/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt +++ b/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt @@ -20,7 +20,7 @@ data class DeliveredMessage( return DeliveredMessage( id = msg.messageId, sessionId = sessionId, - role = msg.role, + role = msg.role.name.lowercase(), body = msg.content, ts = msg.createdAt, authorId = msg.author?.id, diff --git a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt index 9da80f2..84b2c96 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt @@ -1,6 +1,13 @@ package com.embabel.guide.chat.service +import com.embabel.chat.AssistantMessage +import com.embabel.chat.ConversationFactoryProvider +import com.embabel.chat.ConversationStoreType +import com.embabel.chat.Role +import com.embabel.chat.UserMessage +import com.embabel.chat.store.adapter.StoredConversationFactory import com.embabel.chat.store.model.MessageData +import com.embabel.chat.store.model.SessionUser import com.embabel.chat.store.model.StoredMessage import com.embabel.chat.store.model.StoredSession import com.embabel.chat.store.repository.ChatSessionRepository @@ -15,15 +22,12 @@ import java.util.Optional @Service class ChatSessionService( private val chatSessionRepository: ChatSessionRepository, + private val conversationFactoryProvider: ConversationFactoryProvider, private val ragAdapter: RagServiceAdapter, private val guideUserRepository: GuideUserRepository ) { companion object { - const val ROLE_USER = "user" - const val ROLE_ASSISTANT = "assistant" - const val ROLE_TOOL = "tool" - const val DEFAULT_WELCOME_MESSAGE = "Welcome! How can I help you today?" const val WELCOME_PROMPT_TEMPLATE = "User %s has created a new account. Could you please greet and welcome them" } @@ -64,7 +68,7 @@ class ChatSessionService( ownerId: String, title: String? = null, message: String, - role: String, + role: Role, authorId: String? = null ): StoredSession { val sessionId = UUIDv7.generateString() @@ -124,7 +128,7 @@ class ChatSessionService( val messageData = MessageData( messageId = UUIDv7.generateString(), - role = ROLE_ASSISTANT, + role = Role.ASSISTANT, content = welcomeMessage, createdAt = Instant.now() ) @@ -149,7 +153,7 @@ class ChatSessionService( ownerId = ownerId, title = "Welcome", message = welcomeMessage, - role = ROLE_ASSISTANT, + role = Role.ASSISTANT, authorId = null ) } @@ -171,7 +175,7 @@ class ChatSessionService( ownerId = ownerId, title = title, message = content, - role = ROLE_USER, + role = Role.USER, authorId = ownerId ) } @@ -202,8 +206,8 @@ class ChatSessionService( ): SessionResult = withContext(Dispatchers.IO) { val existing = chatSessionRepository.findBySessionId(sessionId) if (existing.isPresent) { - // Session exists - just add the message - addMessage(sessionId, message, ROLE_USER, authorId) + // Session exists - just add the message (user messages don't need event delivery) + addMessageDirect(sessionId, message, Role.USER, authorId) SessionResult(existing.get(), created = false) } else { // Session doesn't exist - create with generated title @@ -214,7 +218,7 @@ class ChatSessionService( val messageData = MessageData( messageId = UUIDv7.generateString(), - role = ROLE_USER, + role = Role.USER, content = message, createdAt = Instant.now() ) @@ -233,18 +237,53 @@ class ChatSessionService( } /** - * Add a message to an existing session. + * Add a message to an existing session using the new ConversationFactory pattern. + * Messages are persisted asynchronously and delivered via MessageEvent. + * + * @param sessionId the session to add the message to + * @param text the message text + * @param role the message role + * @param user the human user participant (for routing USER messages from, ASSISTANT messages to) + * @param agent the AI/system user participant (for routing ASSISTANT messages from, USER messages to) + * @param title the session title (included in events for UI display) + */ + fun addMessage( + sessionId: String, + text: String, + role: Role, + user: SessionUser, + agent: SessionUser?, + title: String? = null + ) { + val factory = conversationFactoryProvider.getFactory(ConversationStoreType.STORED) + as StoredConversationFactory + + val conversation = factory.createForParticipants(sessionId, user, agent, title) + + val message = when (role) { + Role.USER -> UserMessage(text) + Role.ASSISTANT -> AssistantMessage(text) + else -> UserMessage(text) // Default to user message + } + + // This triggers MessageEvent(ADDED) synchronously, then persists async + conversation.addMessage(message) + } + + /** + * Add a message to an existing session (legacy method for backwards compatibility). + * Uses direct repository access - no events fired. * * @param sessionId the session to add the message to * @param text the message text - * @param role the message role (user, assistant, tool) + * @param role the message role * @param authorId optional author ID (null for system messages) * @return the created message */ - fun addMessage( + fun addMessageDirect( sessionId: String, text: String, - role: String, + role: Role, authorId: String? = null ): StoredMessage { val messageData = MessageData( diff --git a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt index c1142c3..44e6a76 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt @@ -1,14 +1,18 @@ package com.embabel.guide.chat.service -import com.embabel.chat.store.model.StoredMessage -import com.embabel.guide.chat.model.DeliveredMessage +import com.embabel.chat.Role +import com.embabel.chat.store.model.SessionUser import com.embabel.guide.chat.model.StatusMessage +import com.embabel.guide.domain.GuideUserData +import com.embabel.guide.domain.GuideUserRepository import com.embabel.guide.domain.GuideUserService import com.embabel.guide.util.UUIDv7 import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import org.drivine.manager.GraphObjectManager import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.scheduling.annotation.Scheduled @@ -20,19 +24,39 @@ class JesseService( private val presenceService: PresenceService, private val ragAdapter: RagServiceAdapter, private val chatSessionService: ChatSessionService, - private val guideUserService: GuideUserService + private val guideUserService: GuideUserService, + private val guideUserRepository: GuideUserRepository, + @Qualifier("neoGraphObjectManager") private val graphObjectManager: GraphObjectManager ) { private val logger = LoggerFactory.getLogger(JesseService::class.java) private val coroutineScope = CoroutineScope(Dispatchers.IO) + // Jesse's GuideUserData - initialized on startup + private lateinit var jesseUser: GuideUserData + companion object { const val JESSE_USER_ID = "bot:jesse" const val JESSE_SESSION_ID = "jesse-bot-session" + const val JESSE_DISPLAY_NAME = "Jesse" } @EventListener(ApplicationReadyEvent::class) fun initializeJesse() { logger.info("Initializing Jesse bot") + + // Get or create Jesse as a GuideUser (SessionUser) for message authorship + jesseUser = guideUserRepository.findById(JESSE_USER_ID) + .map { it.core } + .orElseGet { + logger.info("Creating Jesse user in database") + val jesse = GuideUserData( + id = JESSE_USER_ID, + displayName = JESSE_DISPLAY_NAME + ) + graphObjectManager.save(jesse) + jesse + } + presenceService.touch(JESSE_USER_ID, JESSE_SESSION_ID, "active") logger.info("Jesse bot is now online with ID: {}", JESSE_USER_ID) } @@ -42,12 +66,10 @@ class JesseService( presenceService.touch(JESSE_USER_ID, JESSE_SESSION_ID, "active") } - private fun sendMessageToUser(toUserId: String, message: StoredMessage, sessionId: String, title: String? = null) { - logger.debug("Jesse sending message to user: {}", toUserId) - val deliveredMessage = DeliveredMessage.createFrom(message, sessionId, title) - chatService.sendToUser(toUserId, deliveredMessage) - chatService.sendStatusToUser(toUserId, status = StatusMessage(fromUserId = JESSE_USER_ID)) - } + /** + * Get Jesse's SessionUser for use as agent in conversations. + */ + fun getJesseUser(): SessionUser = jesseUser private fun sendStatusToUser(toUserId: String, status: String) { logger.debug("Jesse sending status to user {}: {}", toUserId, status) @@ -62,6 +84,8 @@ class JesseService( * Receive a message from a user, persist it, get AI response, and send back. * Creates the session lazily if it doesn't exist. * + * Assistant responses are delivered via MessageEvent -> MessageEventListener -> WebSocket. + * * @param sessionId the session to add messages to, or blank/empty to create a new session * @param fromWebUserId the WebUser ID from the JWT principal * @param message the message text @@ -112,7 +136,7 @@ class JesseService( // Load existing session messages for context (exclude the message we just added) val priorMessages = sessionResult.session.messages .dropLast(1) - .map { PriorMessage(it.role, it.content) } + .map { PriorMessage(it.role.name.lowercase(), it.content) } logger.info("[session={}] Loaded {} prior messages for context", effectiveSessionId, priorMessages.size) // Send status updates to the user while processing @@ -128,33 +152,33 @@ class JesseService( } logger.info("[session={}] RAG adapter returned response ({} chars)", effectiveSessionId, response.length) - // Save the assistant's response to the session - logger.info("[session={}] Saving assistant response to session", effectiveSessionId) - val assistantMessage = chatSessionService.addMessage( + // Save the assistant's response - WebSocket delivery happens via MessageEvent + logger.info("[session={}] Saving assistant response (event-based delivery)", effectiveSessionId) + chatSessionService.addMessage( sessionId = effectiveSessionId, text = response, - role = ChatSessionService.ROLE_ASSISTANT, - authorId = null // System-generated response + role = Role.ASSISTANT, + user = guideUser.core, // Human user (recipient of assistant messages) + agent = jesseUser, // Jesse (sender of assistant messages) + title = title // Session title for UI display ) - logger.info("[session={}] Assistant message saved", effectiveSessionId) - - // Send the response to the user via WebSocket (include title for new sessions) - logger.info("[session={}] Sending response to webUser {} via WebSocket", effectiveSessionId, fromWebUserId) - sendMessageToUser(fromWebUserId, assistantMessage, effectiveSessionId, title) - logger.info("[session={}] Response sent successfully", effectiveSessionId) + logger.info("[session={}] Assistant message queued for delivery", effectiveSessionId) } catch (e: Exception) { logger.error("[session={}] Error processing message from webUser {}: {}", effectiveSessionId, fromWebUserId, e.message, e) sendStatusToUser(fromWebUserId, "Error processing your request") // Try to save error message to session - may fail if session wasn't created try { - val errorMessage = chatSessionService.addMessage( - sessionId = effectiveSessionId, - text = "Sorry, I encountered an error while processing your message. Please try again!", - role = ChatSessionService.ROLE_ASSISTANT, - authorId = null - ) - sendMessageToUser(fromWebUserId, errorMessage, effectiveSessionId) + val guideUser = guideUserService.findByWebUserId(fromWebUserId).orElse(null) + if (guideUser != null) { + chatSessionService.addMessage( + sessionId = effectiveSessionId, + text = "Sorry, I encountered an error while processing your message. Please try again!", + role = Role.ASSISTANT, + user = guideUser.core, + agent = jesseUser + ) + } } catch (e2: Exception) { logger.error("[session={}] Failed to save error message: {}", effectiveSessionId, e2.message) } From a13523bff0a81581c2185969432f04cc371dc9db Mon Sep 17 00:00:00 2001 From: jasper blues Date: Fri, 6 Feb 2026 19:08:37 +1100 Subject: [PATCH 2/5] Integrate chat-store on agent --- pom.xml | 12 ++---------- .../java/com/embabel/guide/ChatActions.java | 6 ++++-- .../guide/chat/model/DeliveredMessage.kt | 6 +++--- .../guide/chat/service/ChatSessionService.kt | 18 +++++++++--------- .../embabel/guide/chat/service/JesseService.kt | 6 +++--- .../embabel/guide/config/ChatStoreConfig.kt | 10 +++++----- .../com/embabel/guide/domain/GuideUserData.kt | 10 ++++++---- 7 files changed, 32 insertions(+), 36 deletions(-) diff --git a/pom.xml b/pom.xml index 8d4fde2..21c7df5 100644 --- a/pom.xml +++ b/pom.xml @@ -49,21 +49,13 @@ 0.0.22 - + com.embabel.agent - embabel-chat-store + embabel-agent-starter-chat-store 0.2.0-SNAPSHOT - - - com.embabel.agent - embabel-agent-chatstore-autoconfigure - ${embabel-agent.version} - - - com.embabel.agent embabel-agent-starter-openai diff --git a/src/main/java/com/embabel/guide/ChatActions.java b/src/main/java/com/embabel/guide/ChatActions.java index 68242d2..6dc4c39 100644 --- a/src/main/java/com/embabel/guide/ChatActions.java +++ b/src/main/java/com/embabel/guide/ChatActions.java @@ -61,8 +61,10 @@ private GuideUser getGuideUser(@Nullable User user) { var guideUserData = new GuideUserData( java.util.UUID.randomUUID().toString(), displayName != null ? displayName : "", - null, - null + discordInfo.getUsername() != null ? discordInfo.getUsername() : "", + null, // email + null, // persona + null // customPrompt ); var discordUserInfo = new DiscordUserInfoData( discordInfo.getId(), diff --git a/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt b/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt index 34e8e57..b8e5790 100644 --- a/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt +++ b/src/main/kotlin/com/embabel/guide/chat/model/DeliveredMessage.kt @@ -1,6 +1,6 @@ package com.embabel.guide.chat.model -import com.embabel.chat.store.model.StoredMessage +import com.embabel.chat.store.model.SimpleStoredMessage import java.time.Instant /** @@ -16,13 +16,13 @@ data class DeliveredMessage( val title: String? = null ) { companion object { - fun createFrom(msg: StoredMessage, sessionId: String, title: String? = null): DeliveredMessage { + fun createFrom(msg: SimpleStoredMessage, sessionId: String, title: String? = null): DeliveredMessage { return DeliveredMessage( id = msg.messageId, sessionId = sessionId, role = msg.role.name.lowercase(), body = msg.content, - ts = msg.createdAt, + ts = msg.message.createdAt, authorId = msg.author?.id, title = title ) diff --git a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt index 84b2c96..40716e0 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt @@ -5,16 +5,17 @@ import com.embabel.chat.ConversationFactoryProvider import com.embabel.chat.ConversationStoreType import com.embabel.chat.Role import com.embabel.chat.UserMessage -import com.embabel.chat.store.adapter.StoredConversationFactory +import com.embabel.chat.event.MessageEvent import com.embabel.chat.store.model.MessageData -import com.embabel.chat.store.model.SessionUser -import com.embabel.chat.store.model.StoredMessage +import com.embabel.chat.store.model.SimpleStoredMessage import com.embabel.chat.store.model.StoredSession +import com.embabel.chat.store.model.StoredUser import com.embabel.chat.store.repository.ChatSessionRepository import com.embabel.guide.domain.GuideUserRepository import com.embabel.guide.util.UUIDv7 import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Instant import java.util.Optional @@ -24,7 +25,8 @@ class ChatSessionService( private val chatSessionRepository: ChatSessionRepository, private val conversationFactoryProvider: ConversationFactoryProvider, private val ragAdapter: RagServiceAdapter, - private val guideUserRepository: GuideUserRepository + private val guideUserRepository: GuideUserRepository, + private val eventPublisher: ApplicationEventPublisher ) { companion object { @@ -251,13 +253,11 @@ class ChatSessionService( sessionId: String, text: String, role: Role, - user: SessionUser, - agent: SessionUser?, + user: StoredUser, + agent: StoredUser?, title: String? = null ) { val factory = conversationFactoryProvider.getFactory(ConversationStoreType.STORED) - as StoredConversationFactory - val conversation = factory.createForParticipants(sessionId, user, agent, title) val message = when (role) { @@ -285,7 +285,7 @@ class ChatSessionService( text: String, role: Role, authorId: String? = null - ): StoredMessage { + ): SimpleStoredMessage { val messageData = MessageData( messageId = UUIDv7.generateString(), role = role, diff --git a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt index 44e6a76..10b507e 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt @@ -1,7 +1,7 @@ package com.embabel.guide.chat.service import com.embabel.chat.Role -import com.embabel.chat.store.model.SessionUser +import com.embabel.chat.store.model.StoredUser import com.embabel.guide.chat.model.StatusMessage import com.embabel.guide.domain.GuideUserData import com.embabel.guide.domain.GuideUserRepository @@ -67,9 +67,9 @@ class JesseService( } /** - * Get Jesse's SessionUser for use as agent in conversations. + * Get Jesse's StoredUser for use as agent in conversations. */ - fun getJesseUser(): SessionUser = jesseUser + fun getJesseUser(): StoredUser = jesseUser private fun sendStatusToUser(toUserId: String, status: String) { logger.debug("Jesse sending status to user {}: {}", toUserId, status) diff --git a/src/main/kotlin/com/embabel/guide/config/ChatStoreConfig.kt b/src/main/kotlin/com/embabel/guide/config/ChatStoreConfig.kt index b751bcd..359d5e6 100644 --- a/src/main/kotlin/com/embabel/guide/config/ChatStoreConfig.kt +++ b/src/main/kotlin/com/embabel/guide/config/ChatStoreConfig.kt @@ -1,6 +1,6 @@ package com.embabel.guide.config -import com.embabel.chat.store.model.SessionUser +import com.embabel.chat.store.model.StoredUser import com.embabel.chat.store.repository.ChatSessionRepository import com.embabel.chat.store.repository.ChatSessionRepositoryImpl import com.embabel.guide.domain.GuideUserData @@ -14,7 +14,7 @@ import org.springframework.context.annotation.Primary /** * Configuration for embabel-chat-store integration. - * Registers GuideUserData as the SessionUser implementation for polymorphic deserialization, + * Registers GuideUserData as the StoredUser implementation for polymorphic deserialization, * and wires up the ChatSessionRepository bean. */ @Configuration @@ -24,11 +24,11 @@ class ChatStoreConfig { @Primary fun persistenceManager(factory: PersistenceManagerFactory): PersistenceManager { val pm = factory.get("neo") - // Register GuideUserData as implementation of SessionUser interface. + // Register GuideUserData as implementation of StoredUser interface. // Composite label key is sorted alphabetically with pipe separator. pm.registerSubtype( - SessionUser::class.java, - "GuideUser|SessionUser", + StoredUser::class.java, + "GuideUser|User", GuideUserData::class.java ) return pm diff --git a/src/main/kotlin/com/embabel/guide/domain/GuideUserData.kt b/src/main/kotlin/com/embabel/guide/domain/GuideUserData.kt index 8f3a80f..b71f39f 100644 --- a/src/main/kotlin/com/embabel/guide/domain/GuideUserData.kt +++ b/src/main/kotlin/com/embabel/guide/domain/GuideUserData.kt @@ -1,23 +1,25 @@ package com.embabel.guide.domain -import com.embabel.chat.store.model.SessionUser +import com.embabel.chat.store.model.StoredUser import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.drivine.annotation.NodeFragment import org.drivine.annotation.NodeId /** * Node fragment representing a GuideUser in the graph. - * Implements SessionUser to enable integration with embabel-chat-store library. + * Implements StoredUser to enable integration with embabel-chat-store library. */ -@NodeFragment(labels = ["GuideUser", "SessionUser"]) +@NodeFragment(labels = ["GuideUser", "User"]) @JsonIgnoreProperties(ignoreUnknown = true) data class GuideUserData( @NodeId override var id: String, override var displayName: String = "", + override var username: String = displayName, + override var email: String? = null, var persona: String? = null, var customPrompt: String? = null -) : HasGuideUserData, SessionUser { +) : HasGuideUserData, StoredUser { override fun guideUserData(): GuideUserData = this } From 6f28b11369116180a831951d1d49ffe9c338c6cf Mon Sep 17 00:00:00 2001 From: jasper blues Date: Fri, 6 Feb 2026 19:33:45 +1100 Subject: [PATCH 3/5] Make sure createSession fires events. --- .../guide/chat/service/ChatSessionService.kt | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt index 40716e0..f2ef70f 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt @@ -135,13 +135,28 @@ class ChatSessionService( createdAt = Instant.now() ) - chatSessionRepository.createSessionWithMessage( + val title = "Welcome" + val session = chatSessionRepository.createSessionWithMessage( sessionId = sessionId, owner = owner.guideUserData(), - title = "Welcome", + title = title, messageData = messageData, messageAuthor = null // System message - no author ) + + // Publish event so UI receives the welcome message with title + val persistedMessage = session.messages.last().toMessage() + eventPublisher.publishEvent( + MessageEvent.persisted( + conversationId = sessionId, + message = persistedMessage, + fromUserId = null, // System message + toUserId = ownerId, + title = title + ) + ) + + session } /** @@ -151,13 +166,28 @@ class ChatSessionService( ownerId: String, welcomeMessage: String = DEFAULT_WELCOME_MESSAGE ): StoredSession { - return createSession( + val title = "Welcome" + val session = createSession( ownerId = ownerId, - title = "Welcome", + title = title, message = welcomeMessage, role = Role.ASSISTANT, authorId = null ) + + // Publish event so UI receives the welcome message with title + val persistedMessage = session.messages.last().toMessage() + eventPublisher.publishEvent( + MessageEvent.persisted( + conversationId = session.session.sessionId, + message = persistedMessage, + fromUserId = null, // System message + toUserId = ownerId, + title = title + ) + ) + + return session } /** From 8fddd32158bb386442fa5752480b1883ad4413ec Mon Sep 17 00:00:00 2001 From: jasper blues Date: Sun, 8 Feb 2026 17:09:34 +1100 Subject: [PATCH 4/5] Integrate latest. --- .../java/com/embabel/guide/ChatConfig.java | 9 +- .../guide/chat/service/ChatSessionService.kt | 112 +++--------------- .../chat/service/FakeRagServiceAdapter.kt | 1 - .../chat/service/GuideRagServiceAdapter.kt | 29 ++--- .../guide/chat/service/JesseService.kt | 55 ++------- .../guide/chat/service/RagServiceAdapter.kt | 17 +-- 6 files changed, 48 insertions(+), 175 deletions(-) diff --git a/src/main/java/com/embabel/guide/ChatConfig.java b/src/main/java/com/embabel/guide/ChatConfig.java index 001070b..1046ef3 100644 --- a/src/main/java/com/embabel/guide/ChatConfig.java +++ b/src/main/java/com/embabel/guide/ChatConfig.java @@ -2,6 +2,9 @@ import com.embabel.agent.core.AgentPlatform; import com.embabel.chat.Chatbot; +import com.embabel.chat.ConversationFactory; +import com.embabel.chat.ConversationFactoryProvider; +import com.embabel.chat.ConversationStoreType; import com.embabel.chat.agent.AgentProcessChatbot; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,12 +12,14 @@ /** * Create a chatbot backed by all actions on the AgentPlatform * that respond to UserMessages. Will use utility planning. + * Conversations are persisted to Neo4j via the STORED conversation factory. */ @Configuration class ChatConfig { @Bean - Chatbot chatbot(AgentPlatform agentPlatform) { - return AgentProcessChatbot.utilityFromPlatform(agentPlatform); + Chatbot chatbot(AgentPlatform agentPlatform, ConversationFactoryProvider conversationFactoryProvider) { + ConversationFactory factory = conversationFactoryProvider.getFactory(ConversationStoreType.STORED); + return AgentProcessChatbot.utilityFromPlatform(agentPlatform, factory); } } diff --git a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt index f2ef70f..2432df4 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/ChatSessionService.kt @@ -1,15 +1,9 @@ package com.embabel.guide.chat.service -import com.embabel.chat.AssistantMessage -import com.embabel.chat.ConversationFactoryProvider -import com.embabel.chat.ConversationStoreType import com.embabel.chat.Role -import com.embabel.chat.UserMessage import com.embabel.chat.event.MessageEvent import com.embabel.chat.store.model.MessageData -import com.embabel.chat.store.model.SimpleStoredMessage import com.embabel.chat.store.model.StoredSession -import com.embabel.chat.store.model.StoredUser import com.embabel.chat.store.repository.ChatSessionRepository import com.embabel.guide.domain.GuideUserRepository import com.embabel.guide.util.UUIDv7 @@ -20,10 +14,13 @@ import org.springframework.stereotype.Service import java.time.Instant import java.util.Optional +/** + * Service for managing chat session metadata (titles, ownership, listing). + * Message persistence is handled by the chatbot via STORED conversations. + */ @Service class ChatSessionService( private val chatSessionRepository: ChatSessionRepository, - private val conversationFactoryProvider: ConversationFactoryProvider, private val ragAdapter: RagServiceAdapter, private val guideUserRepository: GuideUserRepository, private val eventPublisher: ApplicationEventPublisher @@ -119,9 +116,7 @@ class ChatSessionService( val welcomeMessage = ragAdapter.sendMessage( threadId = sessionId, message = prompt, - fromUserId = ownerId, - priorMessages = emptyList(), // No prior context for welcome session - onEvent = { } // No status updates needed for welcome message + fromUserId = ownerId ) val owner = guideUserRepository.findById(ownerId).orElseThrow { @@ -221,115 +216,38 @@ class ChatSessionService( ) /** - * Get an existing session or create a new one with the given message. + * Get an existing session or create a new one. * If the session doesn't exist, generates a title from the message content. * + * Note: This method only creates the session metadata (title, owner). + * Message persistence is handled by the chatbot via STORED conversations. + * * @param sessionId the session ID (client-provided) * @param ownerId the user who owns the session - * @param message the message text - * @param authorId the author of the message + * @param messageForTitle the message text (used only for title generation if new session) * @return SessionResult containing the session and whether it was created */ - suspend fun getOrCreateSessionWithMessage( + suspend fun getOrCreateSession( sessionId: String, ownerId: String, - message: String, - authorId: String + messageForTitle: String ): SessionResult = withContext(Dispatchers.IO) { val existing = chatSessionRepository.findBySessionId(sessionId) if (existing.isPresent) { - // Session exists - just add the message (user messages don't need event delivery) - addMessageDirect(sessionId, message, Role.USER, authorId) SessionResult(existing.get(), created = false) } else { // Session doesn't exist - create with generated title - val title = ragAdapter.generateTitle(message, ownerId) + val title = ragAdapter.generateTitle(messageForTitle, ownerId) val owner = guideUserRepository.findById(ownerId).orElseThrow { IllegalArgumentException("Owner not found: $ownerId") } - val messageData = MessageData( - messageId = UUIDv7.generateString(), - role = Role.USER, - content = message, - createdAt = Instant.now() - ) - - val messageAuthor = guideUserRepository.findById(authorId).orElse(null)?.guideUserData() - - val session = chatSessionRepository.createSessionWithMessage( + val session = chatSessionRepository.createSession( sessionId = sessionId, owner = owner.guideUserData(), - title = title, - messageData = messageData, - messageAuthor = messageAuthor + title = title ) SessionResult(session, created = true) } } - - /** - * Add a message to an existing session using the new ConversationFactory pattern. - * Messages are persisted asynchronously and delivered via MessageEvent. - * - * @param sessionId the session to add the message to - * @param text the message text - * @param role the message role - * @param user the human user participant (for routing USER messages from, ASSISTANT messages to) - * @param agent the AI/system user participant (for routing ASSISTANT messages from, USER messages to) - * @param title the session title (included in events for UI display) - */ - fun addMessage( - sessionId: String, - text: String, - role: Role, - user: StoredUser, - agent: StoredUser?, - title: String? = null - ) { - val factory = conversationFactoryProvider.getFactory(ConversationStoreType.STORED) - val conversation = factory.createForParticipants(sessionId, user, agent, title) - - val message = when (role) { - Role.USER -> UserMessage(text) - Role.ASSISTANT -> AssistantMessage(text) - else -> UserMessage(text) // Default to user message - } - - // This triggers MessageEvent(ADDED) synchronously, then persists async - conversation.addMessage(message) - } - - /** - * Add a message to an existing session (legacy method for backwards compatibility). - * Uses direct repository access - no events fired. - * - * @param sessionId the session to add the message to - * @param text the message text - * @param role the message role - * @param authorId optional author ID (null for system messages) - * @return the created message - */ - fun addMessageDirect( - sessionId: String, - text: String, - role: Role, - authorId: String? = null - ): SimpleStoredMessage { - val messageData = MessageData( - messageId = UUIDv7.generateString(), - role = role, - content = text, - createdAt = Instant.now() - ) - - // Look up the author if provided - val author = authorId?.let { id -> - guideUserRepository.findById(id).orElse(null)?.guideUserData() - } - - val updatedSession = chatSessionRepository.addMessage(sessionId, messageData, author) - // Return the last message (the one we just added) - return updatedSession.messages.last() - } } diff --git a/src/main/kotlin/com/embabel/guide/chat/service/FakeRagServiceAdapter.kt b/src/main/kotlin/com/embabel/guide/chat/service/FakeRagServiceAdapter.kt index 550ae9d..bea0b3b 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/FakeRagServiceAdapter.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/FakeRagServiceAdapter.kt @@ -24,7 +24,6 @@ class FakeRagServiceAdapter : RagServiceAdapter { threadId: String, message: String, fromUserId: String, - priorMessages: List, onEvent: (String) -> Unit ): String { logger.info("Processing fake RAG request from user: {} in thread: {}", fromUserId, threadId) diff --git a/src/main/kotlin/com/embabel/guide/chat/service/GuideRagServiceAdapter.kt b/src/main/kotlin/com/embabel/guide/chat/service/GuideRagServiceAdapter.kt index 38e52d2..00fc3d4 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/GuideRagServiceAdapter.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/GuideRagServiceAdapter.kt @@ -20,7 +20,8 @@ import java.util.concurrent.ConcurrentHashMap * This adapter bridges the WebSocket chat system with the Guide's RAG-powered chatbot, * enabling real-time AI responses through the web interface. * - * Sessions are cached per thread to maintain separate conversation contexts. + * The chatbot uses STORED conversations, so message history is automatically loaded + * when restoring a session by conversation ID. */ @Service @ConditionalOnProperty( @@ -35,7 +36,7 @@ class GuideRagServiceAdapter( private val logger = LoggerFactory.getLogger(GuideRagServiceAdapter::class.java) - // Session cache to maintain conversation continuity per thread + // Session cache to maintain AgentProcess continuity per thread private val threadSessions = ConcurrentHashMap() companion object { @@ -70,7 +71,6 @@ class GuideRagServiceAdapter( threadId: String, message: String, fromUserId: String, - priorMessages: List, onEvent: (String) -> Unit ): String = withContext(Dispatchers.IO) { logger.info("Processing Guide RAG request from user: {} in thread: {}", fromUserId, threadId) @@ -85,31 +85,18 @@ class GuideRagServiceAdapter( val guideUser = guideUserRepository.findById(fromUserId) .orElseThrow { RuntimeException("No user found with id: $fromUserId") } - // Get or create session context for this thread to maintain conversation continuity - var isNewSession = false + // Get or create session context for this thread + // The chatbot uses STORED conversations with conversationId=threadId, + // so message history is automatically loaded when restoring a session val sessionContext = threadSessions.computeIfAbsent(threadId) { - logger.info("Creating new chat session for thread: {} (user: {})", threadId, fromUserId) - isNewSession = true + logger.info("Creating/restoring chat session for thread: {} (user: {})", threadId, fromUserId) val dynamicChannel = DynamicOutputChannel() dynamicChannel.currentDelegate = messageOutputChannel - val session = chatbot.createSession(guideUser, dynamicChannel, null) + val session = chatbot.createSession(guideUser, dynamicChannel, null, threadId) SessionContext(session, dynamicChannel) } - // Load prior messages into the conversation if this is a new session - // Messages are added directly to the conversation without being processed by AI - if (isNewSession && priorMessages.isNotEmpty()) { - logger.info("Loading {} prior messages into conversation for thread: {}", priorMessages.size, threadId) - for (prior in priorMessages) { - when (prior.role) { - "user" -> sessionContext.session.conversation.addMessage(UserMessage(prior.content)) - "assistant" -> sessionContext.session.conversation.addMessage(AssistantMessage(prior.content)) - } - } - } - // Update the dynamic channel to point to this message's output channel - // (for existing sessions, or if this is a new session this ensures it's set) sessionContext.dynamicChannel.currentDelegate = messageOutputChannel // Process the message with the cached session (which maintains conversation history) diff --git a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt index 10b507e..85d9c49 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/JesseService.kt @@ -1,6 +1,5 @@ package com.embabel.guide.chat.service -import com.embabel.chat.Role import com.embabel.chat.store.model.StoredUser import com.embabel.guide.chat.model.StatusMessage import com.embabel.guide.domain.GuideUserData @@ -118,13 +117,13 @@ class JesseService( val guideUserId = guideUser.core.id logger.info("[session={}] Found guideUser {} for webUser {}", effectiveSessionId, guideUserId, fromWebUserId) - // Get or create session with the user's message (lazy creation) - logger.info("[session={}] Getting or creating session with user message", effectiveSessionId) - val sessionResult = chatSessionService.getOrCreateSessionWithMessage( + // Get or create session (lazy creation) + // Message persistence is handled by the chatbot via STORED conversations + logger.info("[session={}] Getting or creating session", effectiveSessionId) + val sessionResult = chatSessionService.getOrCreateSession( sessionId = effectiveSessionId, ownerId = guideUserId, - message = message, - authorId = guideUserId + messageForTitle = message ) val title = sessionResult.session.session.title if (sessionResult.created) { @@ -133,55 +132,27 @@ class JesseService( logger.info("[session={}] Added message to existing session", effectiveSessionId) } - // Load existing session messages for context (exclude the message we just added) - val priorMessages = sessionResult.session.messages - .dropLast(1) - .map { PriorMessage(it.role.name.lowercase(), it.content) } - logger.info("[session={}] Loaded {} prior messages for context", effectiveSessionId, priorMessages.size) - - // Send status updates to the user while processing + // Send message to RAG adapter - conversation history is auto-loaded by the chatbot logger.info("[session={}] Calling RAG adapter", effectiveSessionId) val response = ragAdapter.sendMessage( threadId = effectiveSessionId, message = message, - fromUserId = guideUserId, - priorMessages = priorMessages + fromUserId = guideUserId ) { event -> logger.debug("[session={}] RAG event for user {}: {}", effectiveSessionId, fromWebUserId, event) sendStatusToUser(fromWebUserId, event) } logger.info("[session={}] RAG adapter returned response ({} chars)", effectiveSessionId, response.length) - // Save the assistant's response - WebSocket delivery happens via MessageEvent - logger.info("[session={}] Saving assistant response (event-based delivery)", effectiveSessionId) - chatSessionService.addMessage( - sessionId = effectiveSessionId, - text = response, - role = Role.ASSISTANT, - user = guideUser.core, // Human user (recipient of assistant messages) - agent = jesseUser, // Jesse (sender of assistant messages) - title = title // Session title for UI display - ) - logger.info("[session={}] Assistant message queued for delivery", effectiveSessionId) + // Clear status now that response is complete + sendStatusToUser(fromWebUserId, "") + + // Message persistence and WebSocket delivery are handled automatically + // by the chatbot's STORED conversation factory (fires MessageEvent on persist) } catch (e: Exception) { logger.error("[session={}] Error processing message from webUser {}: {}", effectiveSessionId, fromWebUserId, e.message, e) sendStatusToUser(fromWebUserId, "Error processing your request") - - // Try to save error message to session - may fail if session wasn't created - try { - val guideUser = guideUserService.findByWebUserId(fromWebUserId).orElse(null) - if (guideUser != null) { - chatSessionService.addMessage( - sessionId = effectiveSessionId, - text = "Sorry, I encountered an error while processing your message. Please try again!", - role = Role.ASSISTANT, - user = guideUser.core, - agent = jesseUser - ) - } - } catch (e2: Exception) { - logger.error("[session={}] Failed to save error message: {}", effectiveSessionId, e2.message) - } + // Error messages are sent via status channel - no need to persist } } } diff --git a/src/main/kotlin/com/embabel/guide/chat/service/RagServiceAdapter.kt b/src/main/kotlin/com/embabel/guide/chat/service/RagServiceAdapter.kt index 89fa0b2..3dff08a 100644 --- a/src/main/kotlin/com/embabel/guide/chat/service/RagServiceAdapter.kt +++ b/src/main/kotlin/com/embabel/guide/chat/service/RagServiceAdapter.kt @@ -1,18 +1,13 @@ package com.embabel.guide.chat.service -/** - * Represents a prior message in a conversation thread for context loading. - */ -data class PriorMessage( - val role: String, // "user" or "assistant" - val content: String -) - /** * Interface for integrating with RAG (Retrieval-Augmented Generation) systems. * * This adapter provides a non-blocking way to send messages to a RAG system * and receive responses, with support for real-time status events during processing. + * + * Conversation history is automatically managed by the underlying chatbot using + * persistent conversation storage - callers don't need to track prior messages. */ interface RagServiceAdapter { @@ -23,15 +18,14 @@ interface RagServiceAdapter { /** * Sends a message to the RAG system and returns the response. - * Each thread maintains its own conversation context. + * Each thread maintains its own conversation context with automatic history persistence. * * This is a suspending function to avoid blocking threads during potentially * long-running RAG operations (document retrieval, LLM inference, etc.). * - * @param threadId The thread ID for maintaining separate conversation contexts + * @param threadId The thread/conversation ID - used for session persistence and restoration * @param message The user's message to process * @param fromUserId The ID of the user sending the message (for context/logging) - * @param priorMessages Prior messages to load into context if this is a new session for this thread * @param onEvent Callback function to receive real-time status updates during processing * (e.g., "Planning response", "Querying database", "Generating answer") * @return The RAG system's response message @@ -40,7 +34,6 @@ interface RagServiceAdapter { threadId: String, message: String, fromUserId: String, - priorMessages: List = emptyList(), onEvent: (String) -> Unit = {} ): String From 2ae4ab23c99f225c8b9976829cb8459f115f948a Mon Sep 17 00:00:00 2001 From: jasper blues Date: Mon, 9 Feb 2026 06:56:33 +1100 Subject: [PATCH 5/5] Build against latest --- src/main/java/com/embabel/guide/rag/DataManager.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/embabel/guide/rag/DataManager.java b/src/main/java/com/embabel/guide/rag/DataManager.java index bfb8dda..48da38a 100644 --- a/src/main/java/com/embabel/guide/rag/DataManager.java +++ b/src/main/java/com/embabel/guide/rag/DataManager.java @@ -1,8 +1,8 @@ package com.embabel.guide.rag; -import com.embabel.agent.api.common.LlmReference; -import com.embabel.agent.api.common.reference.LlmReferenceProviders; import com.embabel.agent.api.identity.User; +import com.embabel.agent.api.reference.LlmReference; +import com.embabel.agent.api.reference.LlmReferenceProviders; import com.embabel.agent.rag.ingestion.*; import com.embabel.agent.rag.ingestion.policy.UrlSpecificContentRefreshPolicy; import com.embabel.agent.rag.neo.drivine.DrivineStore; @@ -14,8 +14,6 @@ import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.PlatformTransactionManager; import java.util.Collections; import java.util.List;