Keep some agent start across restarts, such as the SSE manager and observer. This allows restarts to be shown on the state page and also allows avatars to be kept when reconfiguring the agent. Also observable updates can happen out of order because SSE manager has multiple workers. For now handle this in the client. Finally fix an issue with the IRC client to make it disconnect and handle being assigned a different nickname by the server. Signed-off-by: Richard Palethorpe <io@richiejp.com>
89 lines
1.5 KiB
Go
89 lines
1.5 KiB
Go
package agent
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/mudler/LocalAGI/core/sse"
|
|
"github.com/mudler/LocalAGI/core/types"
|
|
"github.com/mudler/LocalAGI/pkg/xlog"
|
|
)
|
|
|
|
type Observer interface {
|
|
NewObservable() *types.Observable
|
|
Update(types.Observable)
|
|
History() []types.Observable
|
|
}
|
|
|
|
type SSEObserver struct {
|
|
agent string
|
|
maxID int32
|
|
manager sse.Manager
|
|
|
|
mutex sync.Mutex
|
|
history []types.Observable
|
|
historyLast int
|
|
}
|
|
|
|
func NewSSEObserver(agent string, manager sse.Manager) *SSEObserver {
|
|
return &SSEObserver{
|
|
agent: agent,
|
|
maxID: 1,
|
|
manager: manager,
|
|
history: make([]types.Observable, 100),
|
|
}
|
|
}
|
|
|
|
func (s *SSEObserver) NewObservable() *types.Observable {
|
|
id := atomic.AddInt32(&s.maxID, 1)
|
|
|
|
return &types.Observable{
|
|
ID: id - 1,
|
|
Agent: s.agent,
|
|
}
|
|
}
|
|
|
|
func (s *SSEObserver) Update(obs types.Observable) {
|
|
data, err := json.Marshal(obs)
|
|
if err != nil {
|
|
xlog.Error("Error marshaling observable", "error", err)
|
|
return
|
|
}
|
|
msg := sse.NewMessage(string(data)).WithEvent("observable_update")
|
|
s.manager.Send(msg)
|
|
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
for i, o := range s.history {
|
|
if o.ID == obs.ID {
|
|
s.history[i] = obs
|
|
return
|
|
}
|
|
}
|
|
|
|
s.history[s.historyLast] = obs
|
|
s.historyLast += 1
|
|
if s.historyLast >= len(s.history) {
|
|
s.historyLast = 0
|
|
}
|
|
}
|
|
|
|
func (s *SSEObserver) History() []types.Observable {
|
|
h := make([]types.Observable, 0, 20)
|
|
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
for _, obs := range s.history {
|
|
if obs.ID == 0 {
|
|
continue
|
|
}
|
|
|
|
h = append(h, obs)
|
|
}
|
|
|
|
return h
|
|
}
|