use go-fiber

This commit is contained in:
mudler
2024-04-07 20:13:40 +02:00
parent 59b91d1403
commit 8db36b4619
5 changed files with 315 additions and 36 deletions

View File

@@ -42,8 +42,8 @@
}
</style>
</head>
<body class="bg-gray-900 p-4 text-white">
<div class="chat-container bg-gray-800 shadow-lg rounded-lg" hx-ext="sse" sse-connect="/sse">
<body class="bg-gray-900 p-4 text-white" hx-ext="sse" sse-connect="/sse">
<div class="chat-container bg-gray-800 shadow-lg rounded-lg" >
<!-- Chat Header -->
<div class="border-b border-gray-700 p-4">
<h1 class="text-lg font-semibold">Talk to '{{.Character.Name}}'</h1>
@@ -68,7 +68,6 @@
</div>
</div>
<div sse-swap="messages" hx-swap="beforeend" id="messages" hx-on:htmx:after-settle="document.getElementById('messages').scrollIntoView(false)"></div>
<div sse-swap="message_status"></div>
</div>
<!-- Agent Status Box -->
@@ -82,9 +81,10 @@
<!-- Message Input -->
<div class="p-4 border-t border-gray-700">
<div sse-swap="message_status"></div>
<input id="inputMessage" name="message" type="text" hx-post="/chat" hx-target="#results" hx-indicator=".htmx-indicator"
class="p-2 border rounded w-full bg-gray-600 text-white placeholder-gray-300" placeholder="Type a message..." _="on htmx:afterRequest set my value to ''">
<div class="my-2 htmx-indicator" >Loading...</div>
<div class="my-2 htmx-indicator" ></div>
<div id="results" class="flex justify-center"></div>
</div>

View File

@@ -12,7 +12,9 @@ import (
"github.com/donseba/go-htmx"
"github.com/donseba/go-htmx/sse"
fiber "github.com/gofiber/fiber/v3"
external "github.com/mudler/local-agent-framework/external"
"github.com/valyala/fasthttp/fasthttpadaptor"
. "github.com/mudler/local-agent-framework/agent"
)
@@ -24,7 +26,7 @@ type (
)
var (
sseManager sse.Manager
sseManager Manager
)
var testModel = os.Getenv("TEST_MODEL")
var apiModel = os.Getenv("API_MODEL")
@@ -94,7 +96,7 @@ func main() {
defer agent.Stop()
agentInstance = agent
sseManager = sse.NewManager(5)
sseManager = NewManager(5)
go func() {
for {
@@ -105,40 +107,53 @@ func main() {
}
time.Sleep(1 * time.Second) // Send a message every seconds
sseManager.Send(sse.NewMessage(fmt.Sprintf("connected clients: %v", clientsStr)).WithEvent("clients"))
sseManager.Send(NewMessage(fmt.Sprintf("connected clients: %v", clientsStr)).WithEvent("clients"))
}
}()
go func() {
for {
time.Sleep(1 * time.Second) // Send a message every seconds
sseManager.Send(sse.NewMessage(
sseManager.Send(NewMessage(
htmlIfy(agent.State().String()),
).WithEvent("hud"))
}
}()
mux := http.NewServeMux()
// Initialize a new Fiber app
webapp := fiber.New()
mux.Handle("GET /", http.HandlerFunc(app.Home(agent)))
// Define a route for the GET method on the root path '/'
webapp.Get("/sse", func(c fiber.Ctx) error {
sseManager.Handle(c, NewClient(randStringRunes(10)))
return nil
})
webapp.Get("/notify", wrapHandler(http.HandlerFunc(app.Notify)))
webapp.Post("/chat", wrapHandler(http.HandlerFunc(app.Chat(sseManager))))
webapp.Get("/talk", wrapHandler(http.HandlerFunc(app.Home(agent))))
log.Fatal(webapp.Listen(":3000"))
// External notifications (e.g. webhook)
mux.Handle("POST /notify", http.HandlerFunc(app.Notify))
// mux := http.NewServeMux()
// User chat
mux.Handle("POST /chat", http.HandlerFunc(app.Chat(sseManager)))
// mux.Handle("GET /", http.HandlerFunc(app.Home(agent)))
// Server Sent Events
mux.Handle("GET /sse", http.HandlerFunc(app.SSE))
// // External notifications (e.g. webhook)
// mux.Handle("POST /notify", http.HandlerFunc(app.Notify))
fmt.Print("Server started at http://localhost:3210")
err = http.ListenAndServe(":3210", mux)
log.Fatal(err)
// // User chat
// mux.Handle("POST /chat", http.HandlerFunc(app.Chat(sseManager)))
// // Server Sent Events
// //mux.Handle("GET /sse", http.HandlerFunc(app.SSE))
// fmt.Print("Server started at http://localhost:3210")
// err = http.ListenAndServe(":3210", mux)
// log.Fatal(err)
}
func (a *App) Home(agent *Agent) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
tmpl, err := template.ParseFiles("index.html")
tmpl, err := template.ParseFiles("chat.html")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -153,11 +168,10 @@ func (a *App) Home(agent *Agent) func(w http.ResponseWriter, r *http.Request) {
}
}
func (a *App) SSE(w http.ResponseWriter, r *http.Request) {
cl := sse.NewClient(randStringRunes(10))
sseManager.Handle(w, r, cl)
}
// func (a *App) SSE(w http.ResponseWriter, r *http.Request) {
// cl := sse.NewClient(randStringRunes(10))
// sseManager.Handle(w, r, cl)
// }
func (a *App) Notify(w http.ResponseWriter, r *http.Request) {
query := strings.ToLower(r.PostFormValue("message"))
@@ -172,7 +186,14 @@ func (a *App) Notify(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("Message sent"))
}
func (a *App) Chat(m sse.Manager) func(w http.ResponseWriter, r *http.Request) {
func wrapHandler(f func(http.ResponseWriter, *http.Request)) func(ctx fiber.Ctx) error {
return func(ctx fiber.Ctx) error {
fasthttpadaptor.NewFastHTTPHandler(http.HandlerFunc(f))(ctx.Context())
return nil
}
}
func (a *App) Chat(m Manager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
query := strings.ToLower(r.PostFormValue("message"))
if query == "" {
@@ -180,7 +201,7 @@ func (a *App) Chat(m sse.Manager) func(w http.ResponseWriter, r *http.Request) {
return
}
m.Send(
sse.NewMessage(
NewMessage(
chatDiv(query, "gray"),
).WithEvent("messages"))

224
example/webui/sse.go Normal file
View File

@@ -0,0 +1,224 @@
package main
import (
"bufio"
"fmt"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v3"
"github.com/valyala/fasthttp"
)
type (
// Listener defines the interface for the receiving end.
Listener interface {
ID() string
Chan() chan Envelope
}
// Envelope defines the interface for content that can be broadcast to clients.
Envelope interface {
String() string // Represent the envelope contents as a string for transmission.
}
// Manager defines the interface for managing clients and broadcasting messages.
Manager interface {
Send(message Envelope)
Handle(ctx fiber.Ctx, cl Listener)
Clients() []string
}
History interface {
Add(message Envelope) // Add adds a message to the history.
Send(c Listener) // Send sends the history to a client.
}
)
type Client struct {
id string
ch chan Envelope
}
func NewClient(id string) Listener {
return &Client{
id: id,
ch: make(chan Envelope, 50),
}
}
func (c *Client) ID() string { return c.id }
func (c *Client) Chan() chan Envelope { return c.ch }
// Message represents a simple message implementation.
type Message struct {
Event string
Time time.Time
Data string
}
// NewMessage returns a new message instance.
func NewMessage(data string) *Message {
return &Message{
Data: data,
Time: time.Now(),
}
}
// String returns the message as a string.
func (m *Message) String() string {
sb := strings.Builder{}
if m.Event != "" {
sb.WriteString(fmt.Sprintf("event: %s\n", m.Event))
}
sb.WriteString(fmt.Sprintf("data: %v\n\n", m.Data))
return sb.String()
}
// WithEvent sets the event name for the message.
func (m *Message) WithEvent(event string) Envelope {
m.Event = event
return m
}
// broadcastManager manages the clients and broadcasts messages to them.
type broadcastManager struct {
clients sync.Map
broadcast chan Envelope
workerPoolSize int
messageHistory *history
}
// NewManager initializes and returns a new Manager instance.
func NewManager(workerPoolSize int) Manager {
manager := &broadcastManager{
broadcast: make(chan Envelope),
workerPoolSize: workerPoolSize,
messageHistory: newHistory(10),
}
manager.startWorkers()
return manager
}
// Send broadcasts a message to all connected clients.
func (manager *broadcastManager) Send(message Envelope) {
manager.broadcast <- message
}
// Handle sets up a new client and handles the connection.
func (manager *broadcastManager) Handle(c fiber.Ctx, cl Listener) {
manager.register(cl)
ctx := c.Context()
ctx.SetContentType("text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")
// Send history to the newly connected client
manager.messageHistory.Send(cl)
ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
for {
select {
case msg, ok := <-cl.Chan():
if !ok {
// If the channel is closed, return from the function
return
}
_, err := fmt.Fprint(w, msg.String())
if err != nil {
// If an error occurs (e.g., client has disconnected), return from the function
return
}
w.Flush()
case <-ctx.Done():
manager.unregister(cl.ID())
close(cl.Chan())
return
}
}
}))
}
// Clients method to list connected client IDs
func (manager *broadcastManager) Clients() []string {
var clients []string
manager.clients.Range(func(key, value any) bool {
id, ok := key.(string)
if ok {
clients = append(clients, id)
}
return true
})
return clients
}
// startWorkers starts worker goroutines for message broadcasting.
func (manager *broadcastManager) startWorkers() {
for i := 0; i < manager.workerPoolSize; i++ {
go func() {
for message := range manager.broadcast {
manager.clients.Range(func(key, value any) bool {
client, ok := value.(Listener)
if !ok {
return true // Continue iteration
}
select {
case client.Chan() <- message:
manager.messageHistory.Add(message)
default:
// If the client's channel is full, drop the message
}
return true // Continue iteration
})
}
}()
}
}
// register adds a client to the manager.
func (manager *broadcastManager) register(client Listener) {
manager.clients.Store(client.ID(), client)
}
// unregister removes a client from the manager.
func (manager *broadcastManager) unregister(clientID string) {
manager.clients.Delete(clientID)
}
type history struct {
messages []Envelope
maxSize int // Maximum number of messages to retain
}
func newHistory(maxSize int) *history {
return &history{
messages: []Envelope{},
maxSize: maxSize,
}
}
func (h *history) Add(message Envelope) {
h.messages = append(h.messages, message)
// Ensure history does not exceed maxSize
if len(h.messages) > h.maxSize {
// Remove the oldest messages to fit the maxSize
h.messages = h.messages[len(h.messages)-h.maxSize:]
}
}
func (h *history) Send(c Listener) {
for _, msg := range h.messages {
c.Chan() <- msg
}
}