Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<properties>
<java.version>21</java.version>
<embabel-agent.version>0.3.3-SNAPSHOT</embabel-agent.version>
<embabel-agent.version>0.3.4-SNAPSHOT</embabel-agent.version>
<kotlin.version>2.2.0</kotlin.version>
</properties>

Expand Down Expand Up @@ -49,14 +49,13 @@
<version>0.0.22</version>
</dependency>

<!-- Chat session storage library -->
<!-- Chat session storage with autoconfiguration -->
<dependency>
<groupId>com.embabel.agent</groupId>
<artifactId>embabel-chat-store</artifactId>
<version>0.1.0-SNAPSHOT</version>
<artifactId>embabel-agent-starter-chat-store</artifactId>
<version>0.2.0-SNAPSHOT</version>
</dependency>


<dependency>
<groupId>com.embabel.agent</groupId>
<artifactId>embabel-agent-starter-openai</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/embabel/guide/ChatActions.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ private GuideUser getGuideUser(@Nullable User user) {
var guideUserData = new GuideUserData(
java.util.UUID.randomUUID().toString(),
displayName != null ? displayName : "",
null,
null
discordInfo.getUsername() != null ? discordInfo.getUsername() : "",
null, // email
null, // persona
null // customPrompt
);
var discordUserInfo = new DiscordUserInfoData(
discordInfo.getId(),
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/embabel/guide/ChatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import com.embabel.agent.core.AgentPlatform;
import com.embabel.chat.Chatbot;
import com.embabel.chat.ConversationFactory;
import com.embabel.chat.ConversationFactoryProvider;
import com.embabel.chat.ConversationStoreType;
import com.embabel.chat.agent.AgentProcessChatbot;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Create a chatbot backed by all actions on the AgentPlatform
* that respond to UserMessages. Will use utility planning.
* Conversations are persisted to Neo4j via the STORED conversation factory.
*/
@Configuration
class ChatConfig {

@Bean
Chatbot chatbot(AgentPlatform agentPlatform) {
return AgentProcessChatbot.utilityFromPlatform(agentPlatform);
Chatbot chatbot(AgentPlatform agentPlatform, ConversationFactoryProvider conversationFactoryProvider) {
ConversationFactory factory = conversationFactoryProvider.getFactory(ConversationStoreType.STORED);
return AgentProcessChatbot.utilityFromPlatform(agentPlatform, factory);
}
}
6 changes: 2 additions & 4 deletions src/main/java/com/embabel/guide/rag/DataManager.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.embabel.guide.rag;

import com.embabel.agent.api.common.LlmReference;
import com.embabel.agent.api.common.reference.LlmReferenceProviders;
import com.embabel.agent.api.identity.User;
import com.embabel.agent.api.reference.LlmReference;
import com.embabel.agent.api.reference.LlmReferenceProviders;
import com.embabel.agent.rag.ingestion.*;
import com.embabel.agent.rag.ingestion.policy.UrlSpecificContentRefreshPolicy;
import com.embabel.agent.rag.neo.drivine.DrivineStore;
Expand All @@ -14,8 +14,6 @@
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.embabel.guide.chat.event

import com.embabel.chat.event.MessageEvent
import com.embabel.guide.chat.model.DeliveredMessage
import com.embabel.guide.chat.model.StatusMessage
import com.embabel.guide.chat.service.ChatService
import com.embabel.guide.domain.GuideUserRepository
import com.embabel.guide.util.UUIDv7
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component

/**
* Listens for MessageEvents and delivers messages to users via WebSocket.
*
* This decouples message persistence from WebSocket delivery:
* - ADDED: Message was added to conversation - send to recipient immediately
* - PERSISTENCE_FAILED: Log error for monitoring
*/
@Component
class MessageEventListener(
private val chatService: ChatService,
private val guideUserRepository: GuideUserRepository
) {
private val logger = LoggerFactory.getLogger(MessageEventListener::class.java)

@EventListener(condition = "#event.status.name() == 'ADDED'")
fun onMessageAdded(event: MessageEvent) {
val toGuideUserId = event.toUserId
if (toGuideUserId == null) {
logger.debug("MessageEvent has no toUserId, skipping WebSocket delivery for session {}", event.conversationId)
return
}

val message = event.message
if (message == null) {
logger.warn("MessageEvent ADDED has no message for session {}", event.conversationId)
return
}

// Look up the GuideUser to get their webUserId for WebSocket routing
val guideUser = guideUserRepository.findById(toGuideUserId).orElse(null)
if (guideUser == null) {
logger.warn("GuideUser not found for id {}, skipping WebSocket delivery", toGuideUserId)
return
}

val webUserId = guideUser.webUser?.id
if (webUserId == null) {
logger.debug("GuideUser {} has no webUser, skipping WebSocket delivery", toGuideUserId)
return
}

logger.debug("Delivering message to webUser {} (guideUser {}) for session {}",
webUserId, toGuideUserId, event.conversationId)

val delivered = DeliveredMessage(
id = UUIDv7.generateString(),
sessionId = event.conversationId,
role = message.role.name.lowercase(),
body = message.content,
ts = event.timestamp,
authorId = event.fromUserId,
title = event.title
)

chatService.sendToUser(webUserId, delivered)

// Send status update to clear typing indicator
event.fromUserId?.let { fromUserId ->
chatService.sendStatusToUser(webUserId, StatusMessage(fromUserId = fromUserId))
}
}

@EventListener(condition = "#event.status.name() == 'PERSISTENCE_FAILED'")
fun onPersistenceFailed(event: MessageEvent) {
logger.error(
"Message persistence failed for session {}, role={}, error={}",
event.conversationId,
event.role,
event.error?.message,
event.error
)
// Could notify user of failure, implement retry logic, etc.
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.embabel.guide.chat.model

import com.embabel.chat.store.model.StoredMessage
import com.embabel.chat.store.model.SimpleStoredMessage
import java.time.Instant

/**
Expand All @@ -16,13 +16,13 @@ data class DeliveredMessage(
val title: String? = null
) {
companion object {
fun createFrom(msg: StoredMessage, sessionId: String, title: String? = null): DeliveredMessage {
fun createFrom(msg: SimpleStoredMessage, sessionId: String, title: String? = null): DeliveredMessage {
return DeliveredMessage(
id = msg.messageId,
sessionId = sessionId,
role = msg.role,
role = msg.role.name.lowercase(),
body = msg.content,
ts = msg.createdAt,
ts = msg.message.createdAt,
authorId = msg.author?.id,
title = title
)
Expand Down
Loading