diff --git a/example/webui/index.html b/example/webui/chat.html similarity index 94% rename from example/webui/index.html rename to example/webui/chat.html index d67d566..360b14a 100644 --- a/example/webui/index.html +++ b/example/webui/chat.html @@ -42,8 +42,8 @@ } - -
+ +

Talk to '{{.Character.Name}}'

@@ -68,7 +68,6 @@
-
@@ -82,9 +81,10 @@
+
-
Loading...
+
diff --git a/example/webui/main.go b/example/webui/main.go index 102f566..5ff68af 100644 --- a/example/webui/main.go +++ b/example/webui/main.go @@ -12,7 +12,9 @@ import ( "github.com/donseba/go-htmx" "github.com/donseba/go-htmx/sse" + fiber "github.com/gofiber/fiber/v3" external "github.com/mudler/local-agent-framework/external" + "github.com/valyala/fasthttp/fasthttpadaptor" . "github.com/mudler/local-agent-framework/agent" ) @@ -24,7 +26,7 @@ type ( ) var ( - sseManager sse.Manager + sseManager Manager ) var testModel = os.Getenv("TEST_MODEL") var apiModel = os.Getenv("API_MODEL") @@ -94,7 +96,7 @@ func main() { defer agent.Stop() agentInstance = agent - sseManager = sse.NewManager(5) + sseManager = NewManager(5) go func() { for { @@ -105,40 +107,53 @@ func main() { } time.Sleep(1 * time.Second) // Send a message every seconds - sseManager.Send(sse.NewMessage(fmt.Sprintf("connected clients: %v", clientsStr)).WithEvent("clients")) + sseManager.Send(NewMessage(fmt.Sprintf("connected clients: %v", clientsStr)).WithEvent("clients")) } }() go func() { for { time.Sleep(1 * time.Second) // Send a message every seconds - sseManager.Send(sse.NewMessage( + sseManager.Send(NewMessage( htmlIfy(agent.State().String()), ).WithEvent("hud")) } }() - mux := http.NewServeMux() + // Initialize a new Fiber app + webapp := fiber.New() - mux.Handle("GET /", http.HandlerFunc(app.Home(agent))) + // Define a route for the GET method on the root path '/' + webapp.Get("/sse", func(c fiber.Ctx) error { + sseManager.Handle(c, NewClient(randStringRunes(10))) + return nil + }) + webapp.Get("/notify", wrapHandler(http.HandlerFunc(app.Notify))) + webapp.Post("/chat", wrapHandler(http.HandlerFunc(app.Chat(sseManager)))) + webapp.Get("/talk", wrapHandler(http.HandlerFunc(app.Home(agent)))) + log.Fatal(webapp.Listen(":3000")) - // External notifications (e.g. webhook) - mux.Handle("POST /notify", http.HandlerFunc(app.Notify)) + // mux := http.NewServeMux() - // User chat - mux.Handle("POST /chat", http.HandlerFunc(app.Chat(sseManager))) + // mux.Handle("GET /", http.HandlerFunc(app.Home(agent))) - // Server Sent Events - mux.Handle("GET /sse", http.HandlerFunc(app.SSE)) + // // External notifications (e.g. webhook) + // mux.Handle("POST /notify", http.HandlerFunc(app.Notify)) - fmt.Print("Server started at http://localhost:3210") - err = http.ListenAndServe(":3210", mux) - log.Fatal(err) + // // User chat + // mux.Handle("POST /chat", http.HandlerFunc(app.Chat(sseManager))) + + // // Server Sent Events + // //mux.Handle("GET /sse", http.HandlerFunc(app.SSE)) + + // fmt.Print("Server started at http://localhost:3210") + // err = http.ListenAndServe(":3210", mux) + // log.Fatal(err) } func (a *App) Home(agent *Agent) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - tmpl, err := template.ParseFiles("index.html") + tmpl, err := template.ParseFiles("chat.html") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -153,11 +168,10 @@ func (a *App) Home(agent *Agent) func(w http.ResponseWriter, r *http.Request) { } } -func (a *App) SSE(w http.ResponseWriter, r *http.Request) { - cl := sse.NewClient(randStringRunes(10)) - - sseManager.Handle(w, r, cl) -} +// func (a *App) SSE(w http.ResponseWriter, r *http.Request) { +// cl := sse.NewClient(randStringRunes(10)) +// sseManager.Handle(w, r, cl) +// } func (a *App) Notify(w http.ResponseWriter, r *http.Request) { query := strings.ToLower(r.PostFormValue("message")) @@ -172,7 +186,14 @@ func (a *App) Notify(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("Message sent")) } -func (a *App) Chat(m sse.Manager) func(w http.ResponseWriter, r *http.Request) { +func wrapHandler(f func(http.ResponseWriter, *http.Request)) func(ctx fiber.Ctx) error { + return func(ctx fiber.Ctx) error { + fasthttpadaptor.NewFastHTTPHandler(http.HandlerFunc(f))(ctx.Context()) + return nil + } +} + +func (a *App) Chat(m Manager) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { query := strings.ToLower(r.PostFormValue("message")) if query == "" { @@ -180,7 +201,7 @@ func (a *App) Chat(m sse.Manager) func(w http.ResponseWriter, r *http.Request) { return } m.Send( - sse.NewMessage( + NewMessage( chatDiv(query, "gray"), ).WithEvent("messages")) diff --git a/example/webui/sse.go b/example/webui/sse.go new file mode 100644 index 0000000..391d65b --- /dev/null +++ b/example/webui/sse.go @@ -0,0 +1,224 @@ +package main + +import ( + "bufio" + "fmt" + "strings" + "sync" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/valyala/fasthttp" +) + +type ( + // Listener defines the interface for the receiving end. + Listener interface { + ID() string + Chan() chan Envelope + } + + // Envelope defines the interface for content that can be broadcast to clients. + Envelope interface { + String() string // Represent the envelope contents as a string for transmission. + } + + // Manager defines the interface for managing clients and broadcasting messages. + Manager interface { + Send(message Envelope) + Handle(ctx fiber.Ctx, cl Listener) + Clients() []string + } + + History interface { + Add(message Envelope) // Add adds a message to the history. + Send(c Listener) // Send sends the history to a client. + } +) + +type Client struct { + id string + ch chan Envelope +} + +func NewClient(id string) Listener { + return &Client{ + id: id, + ch: make(chan Envelope, 50), + } +} + +func (c *Client) ID() string { return c.id } +func (c *Client) Chan() chan Envelope { return c.ch } + +// Message represents a simple message implementation. +type Message struct { + Event string + Time time.Time + Data string +} + +// NewMessage returns a new message instance. +func NewMessage(data string) *Message { + return &Message{ + Data: data, + Time: time.Now(), + } +} + +// String returns the message as a string. +func (m *Message) String() string { + sb := strings.Builder{} + + if m.Event != "" { + sb.WriteString(fmt.Sprintf("event: %s\n", m.Event)) + } + sb.WriteString(fmt.Sprintf("data: %v\n\n", m.Data)) + + return sb.String() +} + +// WithEvent sets the event name for the message. +func (m *Message) WithEvent(event string) Envelope { + m.Event = event + return m +} + +// broadcastManager manages the clients and broadcasts messages to them. +type broadcastManager struct { + clients sync.Map + broadcast chan Envelope + workerPoolSize int + messageHistory *history +} + +// NewManager initializes and returns a new Manager instance. +func NewManager(workerPoolSize int) Manager { + manager := &broadcastManager{ + broadcast: make(chan Envelope), + workerPoolSize: workerPoolSize, + messageHistory: newHistory(10), + } + + manager.startWorkers() + + return manager +} + +// Send broadcasts a message to all connected clients. +func (manager *broadcastManager) Send(message Envelope) { + manager.broadcast <- message +} + +// Handle sets up a new client and handles the connection. +func (manager *broadcastManager) Handle(c fiber.Ctx, cl Listener) { + + manager.register(cl) + ctx := c.Context() + + ctx.SetContentType("text/event-stream") + ctx.Response.Header.Set("Cache-Control", "no-cache") + ctx.Response.Header.Set("Connection", "keep-alive") + ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") + ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control") + ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true") + + // Send history to the newly connected client + manager.messageHistory.Send(cl) + ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + for { + select { + case msg, ok := <-cl.Chan(): + if !ok { + // If the channel is closed, return from the function + return + } + _, err := fmt.Fprint(w, msg.String()) + if err != nil { + // If an error occurs (e.g., client has disconnected), return from the function + return + } + + w.Flush() + + case <-ctx.Done(): + manager.unregister(cl.ID()) + close(cl.Chan()) + return + } + } + })) +} + +// Clients method to list connected client IDs +func (manager *broadcastManager) Clients() []string { + var clients []string + manager.clients.Range(func(key, value any) bool { + id, ok := key.(string) + if ok { + clients = append(clients, id) + } + return true + }) + return clients +} + +// startWorkers starts worker goroutines for message broadcasting. +func (manager *broadcastManager) startWorkers() { + for i := 0; i < manager.workerPoolSize; i++ { + go func() { + for message := range manager.broadcast { + manager.clients.Range(func(key, value any) bool { + client, ok := value.(Listener) + if !ok { + return true // Continue iteration + } + select { + case client.Chan() <- message: + manager.messageHistory.Add(message) + default: + // If the client's channel is full, drop the message + } + return true // Continue iteration + }) + } + }() + } +} + +// register adds a client to the manager. +func (manager *broadcastManager) register(client Listener) { + manager.clients.Store(client.ID(), client) +} + +// unregister removes a client from the manager. +func (manager *broadcastManager) unregister(clientID string) { + manager.clients.Delete(clientID) +} + +type history struct { + messages []Envelope + maxSize int // Maximum number of messages to retain +} + +func newHistory(maxSize int) *history { + return &history{ + messages: []Envelope{}, + maxSize: maxSize, + } +} + +func (h *history) Add(message Envelope) { + h.messages = append(h.messages, message) + // Ensure history does not exceed maxSize + if len(h.messages) > h.maxSize { + // Remove the oldest messages to fit the maxSize + h.messages = h.messages[len(h.messages)-h.maxSize:] + } +} + +func (h *history) Send(c Listener) { + for _, msg := range h.messages { + c.Chan() <- msg + } +} diff --git a/go.mod b/go.mod index 8040f41..1b6b46f 100644 --- a/go.mod +++ b/go.mod @@ -5,22 +5,32 @@ go 1.22 toolchain go1.22.2 require ( + github.com/donseba/go-htmx v1.8.0 + github.com/gofiber/fiber/v3 v3.0.0-20240405062939-c8c51ee78331 github.com/onsi/ginkgo/v2 v2.15.0 github.com/onsi/gomega v1.31.1 + github.com/sap-nocops/duckduckgogo v0.0.0-20201102135645-176990152850 github.com/sashabaranov/go-openai v1.18.3 + github.com/valyala/fasthttp v1.52.0 ) require ( github.com/PuerkitoBio/goquery v1.6.0 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect github.com/andybalholm/cascadia v1.1.0 // indirect - github.com/donseba/go-htmx v1.8.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/gofiber/utils/v2 v2.0.0-beta.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect - github.com/sap-nocops/duckduckgogo v0.0.0-20201102135645-176990152850 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.15.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.7 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.16.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 1cdbed1..08547df 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/PuerkitoBio/goquery v1.6.0 h1:j7taAbelrdcsOlGeMenZxc2AWXD5fieT1/znArdnx94= github.com/PuerkitoBio/goquery v1.6.0/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/andybalholm/cascadia v1.1.0 h1:BuuO6sSfQNFRu1LppgbD25Hr2vLYW25JvxHs5zzsLTo= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -14,13 +16,26 @@ github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/gofiber/fiber/v3 v3.0.0-20240405062939-c8c51ee78331 h1:kDxTNPKMIRz8q28+tJHL2p87Cjtmkfn/OsLfastmpaY= +github.com/gofiber/fiber/v3 v3.0.0-20240405062939-c8c51ee78331/go.mod h1:w7sdfTY0okjZ1oVH6rSOGvuACUIt0By1iK0HKUb3uqM= +github.com/gofiber/utils/v2 v2.0.0-beta.4 h1:1gjbVFFwVwUb9arPcqiB6iEjHBwo7cHsyS41NeIW3co= +github.com/gofiber/utils/v2 v2.0.0-beta.4/go.mod h1:sdRsPU1FXX6YiDGGxd+q2aPJRMzpsxdzCXo9dz+xtOY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo= @@ -32,17 +47,26 @@ github.com/sap-nocops/duckduckgogo v0.0.0-20201102135645-176990152850/go.mod h1: github.com/sashabaranov/go-openai v1.18.3 h1:dspFGkmZbhjg1059KhqLYSV2GaCiRIn+bOu50TlXUq8= github.com/sashabaranov/go-openai v1.18.3/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.52.0 h1:wqBQpxH71XW0e2g+Og4dzQM8pk34aFYlA1Ga8db7gU0= +github.com/valyala/fasthttp v1.52.0/go.mod h1:hf5C4QnVMkNXMspnsUlfM3WitlgYflyhHYoKol/szxQ= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=