From 29beee605782a74f51f396bfd0032440e3e437ef Mon Sep 17 00:00:00 2001 From: Richard Palethorpe Date: Fri, 28 Mar 2025 14:10:10 +0000 Subject: [PATCH] fix(ui): SSE in React chat --- webui/app.go | 40 ++++++++++++++--------------- webui/react-ui/src/hooks/useChat.js | 12 ++++++++- webui/react-ui/src/hooks/useSSE.js | 2 +- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/webui/app.go b/webui/app.go index 1f5b0bb..fcbdc81 100644 --- a/webui/app.go +++ b/webui/app.go @@ -318,7 +318,7 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error { // Get agent name from URL parameter agentName := c.Params("name") - + // Validate message message := strings.TrimSpace(payload.Message) if message == "" { @@ -337,15 +337,15 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error { // Get the SSE manager for this agent manager := pool.GetManager(agentName) - + // Create a unique message ID messageID := fmt.Sprintf("%d", time.Now().UnixNano()) - + // Send user message event via SSE userMessageData, err := json.Marshal(map[string]interface{}{ - "id": messageID + "-user", - "sender": "user", - "content": message, + "id": messageID + "-user", + "sender": "user", + "content": message, "timestamp": time.Now().Format(time.RFC3339), }) if err != nil { @@ -354,29 +354,29 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error { manager.Send( sse.NewMessage(string(userMessageData)).WithEvent("json_message")) } - + // Send processing status statusData, err := json.Marshal(map[string]interface{}{ - "status": "processing", + "status": "processing", "timestamp": time.Now().Format(time.RFC3339), }) if err != nil { xlog.Error("Error marshaling status message", "error", err) } else { manager.Send( - sse.NewMessage(string(statusData)).WithEvent("status")) + sse.NewMessage(string(statusData)).WithEvent("json_status")) } - + // Process the message asynchronously go func() { // Ask the agent for a response response := agent.Ask(coreTypes.WithText(message)) - + if response.Error != nil { // Send error message xlog.Error("Error asking agent", "agent", agentName, "error", response.Error) errorData, err := json.Marshal(map[string]interface{}{ - "error": response.Error.Error(), + "error": response.Error.Error(), "timestamp": time.Now().Format(time.RFC3339), }) if err != nil { @@ -389,9 +389,9 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error { // Send agent response xlog.Info("Response from agent", "agent", agentName, "response", response.Response) responseData, err := json.Marshal(map[string]interface{}{ - "id": messageID + "-agent", - "sender": "agent", - "content": response.Response, + "id": messageID + "-agent", + "sender": "agent", + "content": response.Response, "timestamp": time.Now().Format(time.RFC3339), }) if err != nil { @@ -401,23 +401,23 @@ func (a *App) ChatAPI(pool *state.AgentPool) func(c *fiber.Ctx) error { sse.NewMessage(string(responseData)).WithEvent("json_message")) } } - + // Send completed status completedData, err := json.Marshal(map[string]interface{}{ - "status": "completed", + "status": "completed", "timestamp": time.Now().Format(time.RFC3339), }) if err != nil { xlog.Error("Error marshaling completed status", "error", err) } else { manager.Send( - sse.NewMessage(string(completedData)).WithEvent("status")) + sse.NewMessage(string(completedData)).WithEvent("json_status")) } }() - + // Return immediate success response return c.Status(fiber.StatusAccepted).JSON(map[string]interface{}{ - "status": "message_received", + "status": "message_received", "message_id": messageID, }) } diff --git a/webui/react-ui/src/hooks/useChat.js b/webui/react-ui/src/hooks/useChat.js index 3252fe4..759097b 100644 --- a/webui/react-ui/src/hooks/useChat.js +++ b/webui/react-ui/src/hooks/useChat.js @@ -12,6 +12,7 @@ export function useChat(agentName) { const [sending, setSending] = useState(false); const [error, setError] = useState(null); const processedMessageIds = useRef(new Set()); + const localMessageContents = useRef(new Set()); // Track locally added message contents // Use SSE hook to receive real-time messages const { messages: sseMessages, statusUpdates, errorMessages, isConnected } = useSSE(agentName); @@ -42,6 +43,11 @@ export function useChat(agentName) { // Add to processed set to avoid duplicates processedMessageIds.current.add(messageData.id); + // Skip user messages that we've already added locally + if (messageData.sender === 'user' && localMessageContents.current.has(messageData.content)) { + return; + } + // Add the message to our state setMessages(prev => [...prev, { id: messageData.id, @@ -115,6 +121,9 @@ export function useChat(agentName) { setMessages(prev => [...prev, userMessage]); + // Track this message content to avoid duplication from SSE + localMessageContents.current.add(content); + try { // Use the JSON-based API endpoint await chatApi.sendMessage(agentName, content); @@ -131,6 +140,7 @@ export function useChat(agentName) { const clearChat = useCallback(() => { setMessages([]); processedMessageIds.current.clear(); + localMessageContents.current.clear(); // Clear tracked local messages }, []); return { @@ -141,4 +151,4 @@ export function useChat(agentName) { sendMessage, clearChat, }; -} +} \ No newline at end of file diff --git a/webui/react-ui/src/hooks/useSSE.js b/webui/react-ui/src/hooks/useSSE.js index 2218c77..262b885 100644 --- a/webui/react-ui/src/hooks/useSSE.js +++ b/webui/react-ui/src/hooks/useSSE.js @@ -23,7 +23,7 @@ export function useSSE(agentName) { } // Create a new EventSource connection - const sseUrl = `${API_CONFIG.baseUrl}${API_CONFIG.endpoints.sse(agentName)}`; + const sseUrl = new URL(`${API_CONFIG.endpoints.sse(agentName)}`, window.location.origin).href; const eventSource = new EventSource(sseUrl); eventSourceRef.current = eventSource;