225 lines
5.1 KiB
Go
225 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v3"
|
|
"github.com/valyala/fasthttp"
|
|
)
|
|
|
|
type (
|
|
// Listener defines the interface for the receiving end.
|
|
Listener interface {
|
|
ID() string
|
|
Chan() chan Envelope
|
|
}
|
|
|
|
// Envelope defines the interface for content that can be broadcast to clients.
|
|
Envelope interface {
|
|
String() string // Represent the envelope contents as a string for transmission.
|
|
}
|
|
|
|
// Manager defines the interface for managing clients and broadcasting messages.
|
|
Manager interface {
|
|
Send(message Envelope)
|
|
Handle(ctx fiber.Ctx, cl Listener)
|
|
Clients() []string
|
|
}
|
|
|
|
History interface {
|
|
Add(message Envelope) // Add adds a message to the history.
|
|
Send(c Listener) // Send sends the history to a client.
|
|
}
|
|
)
|
|
|
|
type Client struct {
|
|
id string
|
|
ch chan Envelope
|
|
}
|
|
|
|
func NewClient(id string) Listener {
|
|
return &Client{
|
|
id: id,
|
|
ch: make(chan Envelope, 50),
|
|
}
|
|
}
|
|
|
|
func (c *Client) ID() string { return c.id }
|
|
func (c *Client) Chan() chan Envelope { return c.ch }
|
|
|
|
// Message represents a simple message implementation.
|
|
type Message struct {
|
|
Event string
|
|
Time time.Time
|
|
Data string
|
|
}
|
|
|
|
// NewMessage returns a new message instance.
|
|
func NewMessage(data string) *Message {
|
|
return &Message{
|
|
Data: data,
|
|
Time: time.Now(),
|
|
}
|
|
}
|
|
|
|
// String returns the message as a string.
|
|
func (m *Message) String() string {
|
|
sb := strings.Builder{}
|
|
|
|
if m.Event != "" {
|
|
sb.WriteString(fmt.Sprintf("event: %s\n", m.Event))
|
|
}
|
|
sb.WriteString(fmt.Sprintf("data: %v\n\n", m.Data))
|
|
|
|
return sb.String()
|
|
}
|
|
|
|
// WithEvent sets the event name for the message.
|
|
func (m *Message) WithEvent(event string) Envelope {
|
|
m.Event = event
|
|
return m
|
|
}
|
|
|
|
// broadcastManager manages the clients and broadcasts messages to them.
|
|
type broadcastManager struct {
|
|
clients sync.Map
|
|
broadcast chan Envelope
|
|
workerPoolSize int
|
|
messageHistory *history
|
|
}
|
|
|
|
// NewManager initializes and returns a new Manager instance.
|
|
func NewManager(workerPoolSize int) Manager {
|
|
manager := &broadcastManager{
|
|
broadcast: make(chan Envelope),
|
|
workerPoolSize: workerPoolSize,
|
|
messageHistory: newHistory(10),
|
|
}
|
|
|
|
manager.startWorkers()
|
|
|
|
return manager
|
|
}
|
|
|
|
// Send broadcasts a message to all connected clients.
|
|
func (manager *broadcastManager) Send(message Envelope) {
|
|
manager.broadcast <- message
|
|
}
|
|
|
|
// Handle sets up a new client and handles the connection.
|
|
func (manager *broadcastManager) Handle(c fiber.Ctx, cl Listener) {
|
|
|
|
manager.register(cl)
|
|
ctx := c.Context()
|
|
|
|
ctx.SetContentType("text/event-stream")
|
|
ctx.Response.Header.Set("Cache-Control", "no-cache")
|
|
ctx.Response.Header.Set("Connection", "keep-alive")
|
|
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
|
|
ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
|
|
ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")
|
|
|
|
// Send history to the newly connected client
|
|
manager.messageHistory.Send(cl)
|
|
ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
|
|
for {
|
|
select {
|
|
case msg, ok := <-cl.Chan():
|
|
if !ok {
|
|
// If the channel is closed, return from the function
|
|
return
|
|
}
|
|
_, err := fmt.Fprint(w, msg.String())
|
|
if err != nil {
|
|
// If an error occurs (e.g., client has disconnected), return from the function
|
|
return
|
|
}
|
|
|
|
w.Flush()
|
|
|
|
case <-ctx.Done():
|
|
manager.unregister(cl.ID())
|
|
close(cl.Chan())
|
|
return
|
|
}
|
|
}
|
|
}))
|
|
}
|
|
|
|
// Clients method to list connected client IDs
|
|
func (manager *broadcastManager) Clients() []string {
|
|
var clients []string
|
|
manager.clients.Range(func(key, value any) bool {
|
|
id, ok := key.(string)
|
|
if ok {
|
|
clients = append(clients, id)
|
|
}
|
|
return true
|
|
})
|
|
return clients
|
|
}
|
|
|
|
// startWorkers starts worker goroutines for message broadcasting.
|
|
func (manager *broadcastManager) startWorkers() {
|
|
for i := 0; i < manager.workerPoolSize; i++ {
|
|
go func() {
|
|
for message := range manager.broadcast {
|
|
manager.clients.Range(func(key, value any) bool {
|
|
client, ok := value.(Listener)
|
|
if !ok {
|
|
return true // Continue iteration
|
|
}
|
|
select {
|
|
case client.Chan() <- message:
|
|
manager.messageHistory.Add(message)
|
|
default:
|
|
// If the client's channel is full, drop the message
|
|
}
|
|
return true // Continue iteration
|
|
})
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// register adds a client to the manager.
|
|
func (manager *broadcastManager) register(client Listener) {
|
|
manager.clients.Store(client.ID(), client)
|
|
}
|
|
|
|
// unregister removes a client from the manager.
|
|
func (manager *broadcastManager) unregister(clientID string) {
|
|
manager.clients.Delete(clientID)
|
|
}
|
|
|
|
type history struct {
|
|
messages []Envelope
|
|
maxSize int // Maximum number of messages to retain
|
|
}
|
|
|
|
func newHistory(maxSize int) *history {
|
|
return &history{
|
|
messages: []Envelope{},
|
|
maxSize: maxSize,
|
|
}
|
|
}
|
|
|
|
func (h *history) Add(message Envelope) {
|
|
h.messages = append(h.messages, message)
|
|
// Ensure history does not exceed maxSize
|
|
if len(h.messages) > h.maxSize {
|
|
// Remove the oldest messages to fit the maxSize
|
|
h.messages = h.messages[len(h.messages)-h.maxSize:]
|
|
}
|
|
}
|
|
|
|
func (h *history) Send(c Listener) {
|
|
for _, msg := range h.messages {
|
|
c.Chan() <- msg
|
|
}
|
|
}
|