fix(ui): SSE in React chat

This commit is contained in:
Richard Palethorpe
2025-03-28 14:10:10 +00:00
parent d672842a81
commit 29beee6057
3 changed files with 32 additions and 22 deletions

View File

@@ -318,7 +318,7 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error {
// Get agent name from URL parameter // Get agent name from URL parameter
agentName := c.Params("name") agentName := c.Params("name")
// Validate message // Validate message
message := strings.TrimSpace(payload.Message) message := strings.TrimSpace(payload.Message)
if message == "" { if message == "" {
@@ -337,15 +337,15 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error {
// Get the SSE manager for this agent // Get the SSE manager for this agent
manager := pool.GetManager(agentName) manager := pool.GetManager(agentName)
// Create a unique message ID // Create a unique message ID
messageID := fmt.Sprintf("%d", time.Now().UnixNano()) messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message event via SSE // Send user message event via SSE
userMessageData, err := json.Marshal(map[string]interface{}{ userMessageData, err := json.Marshal(map[string]interface{}{
"id": messageID + "-user", "id": messageID + "-user",
"sender": "user", "sender": "user",
"content": message, "content": message,
"timestamp": time.Now().Format(time.RFC3339), "timestamp": time.Now().Format(time.RFC3339),
}) })
if err != nil { if err != nil {
@@ -354,29 +354,29 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error {
manager.Send( manager.Send(
sse.NewMessage(string(userMessageData)).WithEvent("json_message")) sse.NewMessage(string(userMessageData)).WithEvent("json_message"))
} }
// Send processing status // Send processing status
statusData, err := json.Marshal(map[string]interface{}{ statusData, err := json.Marshal(map[string]interface{}{
"status": "processing", "status": "processing",
"timestamp": time.Now().Format(time.RFC3339), "timestamp": time.Now().Format(time.RFC3339),
}) })
if err != nil { if err != nil {
xlog.Error("Error marshaling status message", "error", err) xlog.Error("Error marshaling status message", "error", err)
} else { } else {
manager.Send( manager.Send(
sse.NewMessage(string(statusData)).WithEvent("status")) sse.NewMessage(string(statusData)).WithEvent("json_status"))
} }
// Process the message asynchronously // Process the message asynchronously
go func() { go func() {
// Ask the agent for a response // Ask the agent for a response
response := agent.Ask(coreTypes.WithText(message)) response := agent.Ask(coreTypes.WithText(message))
if response.Error != nil { if response.Error != nil {
// Send error message // Send error message
xlog.Error("Error asking agent", "agent", agentName, "error", response.Error) xlog.Error("Error asking agent", "agent", agentName, "error", response.Error)
errorData, err := json.Marshal(map[string]interface{}{ errorData, err := json.Marshal(map[string]interface{}{
"error": response.Error.Error(), "error": response.Error.Error(),
"timestamp": time.Now().Format(time.RFC3339), "timestamp": time.Now().Format(time.RFC3339),
}) })
if err != nil { if err != nil {
@@ -389,9 +389,9 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error {
// Send agent response // Send agent response
xlog.Info("Response from agent", "agent", agentName, "response", response.Response) xlog.Info("Response from agent", "agent", agentName, "response", response.Response)
responseData, err := json.Marshal(map[string]interface{}{ responseData, err := json.Marshal(map[string]interface{}{
"id": messageID + "-agent", "id": messageID + "-agent",
"sender": "agent", "sender": "agent",
"content": response.Response, "content": response.Response,
"timestamp": time.Now().Format(time.RFC3339), "timestamp": time.Now().Format(time.RFC3339),
}) })
if err != nil { if err != nil {
@@ -401,23 +401,23 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error {
sse.NewMessage(string(responseData)).WithEvent("json_message")) sse.NewMessage(string(responseData)).WithEvent("json_message"))
} }
} }
// Send completed status // Send completed status
completedData, err := json.Marshal(map[string]interface{}{ completedData, err := json.Marshal(map[string]interface{}{
"status": "completed", "status": "completed",
"timestamp": time.Now().Format(time.RFC3339), "timestamp": time.Now().Format(time.RFC3339),
}) })
if err != nil { if err != nil {
xlog.Error("Error marshaling completed status", "error", err) xlog.Error("Error marshaling completed status", "error", err)
} else { } else {
manager.Send( manager.Send(
sse.NewMessage(string(completedData)).WithEvent("status")) sse.NewMessage(string(completedData)).WithEvent("json_status"))
} }
}() }()
// Return immediate success response // Return immediate success response
return c.Status(fiber.StatusAccepted).JSON(map[string]interface{}{ return c.Status(fiber.StatusAccepted).JSON(map[string]interface{}{
"status": "message_received", "status": "message_received",
"message_id": messageID, "message_id": messageID,
}) })
} }

View File

@@ -12,6 +12,7 @@ export function useChat(agentName) {
const [sending, setSending] = useState(false); const [sending, setSending] = useState(false);
const [error, setError] = useState(null); const [error, setError] = useState(null);
const processedMessageIds = useRef(new Set()); const processedMessageIds = useRef(new Set());
const localMessageContents = useRef(new Set()); // Track locally added message contents
// Use SSE hook to receive real-time messages // Use SSE hook to receive real-time messages
const { messages: sseMessages, statusUpdates, errorMessages, isConnected } = useSSE(agentName); const { messages: sseMessages, statusUpdates, errorMessages, isConnected } = useSSE(agentName);
@@ -42,6 +43,11 @@ export function useChat(agentName) {
// Add to processed set to avoid duplicates // Add to processed set to avoid duplicates
processedMessageIds.current.add(messageData.id); processedMessageIds.current.add(messageData.id);
// Skip user messages that we've already added locally
if (messageData.sender === 'user' && localMessageContents.current.has(messageData.content)) {
return;
}
// Add the message to our state // Add the message to our state
setMessages(prev => [...prev, { setMessages(prev => [...prev, {
id: messageData.id, id: messageData.id,
@@ -115,6 +121,9 @@ export function useChat(agentName) {
setMessages(prev => [...prev, userMessage]); setMessages(prev => [...prev, userMessage]);
// Track this message content to avoid duplication from SSE
localMessageContents.current.add(content);
try { try {
// Use the JSON-based API endpoint // Use the JSON-based API endpoint
await chatApi.sendMessage(agentName, content); await chatApi.sendMessage(agentName, content);
@@ -131,6 +140,7 @@ export function useChat(agentName) {
const clearChat = useCallback(() => { const clearChat = useCallback(() => {
setMessages([]); setMessages([]);
processedMessageIds.current.clear(); processedMessageIds.current.clear();
localMessageContents.current.clear(); // Clear tracked local messages
}, []); }, []);
return { return {
@@ -141,4 +151,4 @@ export function useChat(agentName) {
sendMessage, sendMessage,
clearChat, clearChat,
}; };
} }

View File

@@ -23,7 +23,7 @@ export function useSSE(agentName) {
} }
// Create a new EventSource connection // Create a new EventSource connection
const sseUrl = `${API_CONFIG.baseUrl}${API_CONFIG.endpoints.sse(agentName)}`; const sseUrl = new URL(`${API_CONFIG.endpoints.sse(agentName)}`, window.location.origin).href;
const eventSource = new EventSource(sseUrl); const eventSource = new EventSource(sseUrl);
eventSourceRef.current = eventSource; eventSourceRef.current = eventSource;