From f289e824df54a0062de24952718a27da0b23902d Mon Sep 17 00:00:00 2001 From: Richard Palethorpe Date: Tue, 22 Apr 2025 16:38:49 +0100 Subject: [PATCH] fix: Handle state on agent restart and update observables Keep some agent start across restarts, such as the SSE manager and observer. This allows restarts to be shown on the state page and also allows avatars to be kept when reconfiguring the agent. Also observable updates can happen out of order because SSE manager has multiple workers. For now handle this in the client. Finally fix an issue with the IRC client to make it disconnect and handle being assigned a different nickname by the server. Signed-off-by: Richard Palethorpe --- core/agent/observer.go | 3 +- core/state/pool.go | 70 ++++++++++++++++++++++-- services/connectors/irc.go | 10 +++- webui/app.go | 12 +--- webui/react-ui/src/pages/AgentStatus.jsx | 11 +++- 5 files changed, 86 insertions(+), 20 deletions(-) diff --git a/core/agent/observer.go b/core/agent/observer.go index a3283a6..c01aa8a 100644 --- a/core/agent/observer.go +++ b/core/agent/observer.go @@ -36,7 +36,8 @@ func NewSSEObserver(agent string, manager sse.Manager) *SSEObserver { } func (s *SSEObserver) NewObservable() *types.Observable { - id := atomic.AddInt32(&s.maxID, 1) + id := atomic.AddInt32(&s.maxID, 1) + return &types.Observable{ ID: id - 1, Agent: s.agent, diff --git a/core/state/pool.go b/core/state/pool.go index 0f4f96a..f3eeef7 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -166,7 +166,56 @@ func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error { } }(a.pool[name]) - return a.startAgentWithConfig(name, agentConfig) + return a.startAgentWithConfig(name, agentConfig, nil) +} + +func (a *AgentPool) RecreateAgent(name string, agentConfig *AgentConfig) error { + a.Lock() + defer a.Unlock() + + oldAgent := a.agents[name] + var o *types.Observable + obs := oldAgent.Observer() + if obs != nil { + o = obs.NewObservable() + o.Name = "Restarting Agent" + o.Icon = "sync" + o.Creation = &types.Creation{} + obs.Update(*o) + } + + stateFile, characterFile := a.stateFiles(name) + + os.Remove(stateFile) + os.Remove(characterFile) + + oldAgent.Stop() + + a.pool[name] = *agentConfig + delete(a.agents, name) + + if err := a.save(); err != nil { + if obs != nil { + o.Completion = &types.Completion{Error: err.Error()} + obs.Update(*o) + } + return err + } + + if err := a.startAgentWithConfig(name, agentConfig, obs); err != nil { + if obs != nil { + o.Completion = &types.Completion{Error: err.Error()} + obs.Update(*o) + } + return err + } + + if obs != nil { + o.Completion = &types.Completion{} + obs.Update(*o) + } + + return nil } func createAgentAvatar(APIURL, APIKey, model, imageModel, avatarDir string, agent AgentConfig) error { @@ -268,8 +317,13 @@ func (a *AgentPool) GetStatusHistory(name string) *Status { return a.agentStatus[name] } -func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error { - manager := sse.NewManager(5) +func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs Observer) error { + var manager sse.Manager + if m, ok := a.managers[name]; ok { + manager = m + } else { + manager = sse.NewManager(5) + } ctx := context.Background() model := a.defaultModel multimodalModel := a.defaultMultimodalModel @@ -331,6 +385,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error // dynamicPrompts = append(dynamicPrompts, p.ToMap()) // } + if obs == nil { + obs = NewSSEObserver(name, manager) + } + opts := []Option{ WithModel(model), WithLLMAPIURL(a.apiURL), @@ -407,7 +465,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error c.AgentResultCallback()(state) } }), - WithObserver(NewSSEObserver(name, manager)), + WithObserver(obs), } if config.HUD { @@ -510,7 +568,7 @@ func (a *AgentPool) StartAll() error { if a.agents[name] != nil { // Agent already started continue } - if err := a.startAgentWithConfig(name, &config); err != nil { + if err := a.startAgentWithConfig(name, &config, nil); err != nil { xlog.Error("Failed to start agent", "name", name, "error", err) } } @@ -548,7 +606,7 @@ func (a *AgentPool) Start(name string) error { return nil } if config, ok := a.pool[name]; ok { - return a.startAgentWithConfig(name, &config) + return a.startAgentWithConfig(name, &config, nil) } return fmt.Errorf("agent %s not found", name) diff --git a/services/connectors/irc.go b/services/connectors/irc.go index 60d2da6..819c6cc 100644 --- a/services/connectors/irc.go +++ b/services/connectors/irc.go @@ -77,8 +77,9 @@ func (i *IRC) Start(a *agent.Agent) { } i.conn.UseTLS = false i.conn.AddCallback("001", func(e *irc.Event) { - xlog.Info("Connected to IRC server", "server", i.server) + xlog.Info("Connected to IRC server", "server", i.server, "arguments", e.Arguments) i.conn.Join(i.channel) + i.nickname = e.Arguments[0] xlog.Info("Joined channel", "channel", i.channel) }) @@ -207,6 +208,13 @@ func (i *IRC) Start(a *agent.Agent) { // Start the IRC client in a goroutine go i.conn.Loop() + go func() { + select { + case <-a.Context().Done(): + i.conn.Quit() + return + } + }() } // IRCConfigMeta returns the metadata for IRC connector configuration fields diff --git a/webui/app.go b/webui/app.go index ec95f2a..9fecefa 100644 --- a/webui/app.go +++ b/webui/app.go @@ -176,17 +176,7 @@ func (a *App) UpdateAgentConfig(pool *state.AgentPool) func(c *fiber.Ctx) error return errorJSONMessage(c, err.Error()) } - // Remove the agent first - if err := pool.Remove(agentName); err != nil { - return errorJSONMessage(c, "Error removing agent: "+err.Error()) - } - - // Create agent with new config - if err := pool.CreateAgent(agentName, &newConfig); err != nil { - // Try to restore the old configuration if update fails - if restoreErr := pool.CreateAgent(agentName, oldConfig); restoreErr != nil { - return errorJSONMessage(c, fmt.Sprintf("Failed to update agent and restore failed: %v, %v", err, restoreErr)) - } + if err := pool.RecreateAgent(agentName, &newConfig); err != nil { return errorJSONMessage(c, "Error updating agent: "+err.Error()) } diff --git a/webui/react-ui/src/pages/AgentStatus.jsx b/webui/react-ui/src/pages/AgentStatus.jsx index 658aae8..ae787b6 100644 --- a/webui/react-ui/src/pages/AgentStatus.jsx +++ b/webui/react-ui/src/pages/AgentStatus.jsx @@ -99,8 +99,17 @@ function AgentStatus() { creation: data.creation, progress: data.progress, completion: data.completion, - // children are always built client-side }; + // Events can be received out of order + if (data.creation) + updated.creation = data.creation; + if (data.completion) + updated.completion = data.completion; + if (data.parent_id && !prevMap[data.parent_id]) + prevMap[data.parent_id] = { + id: data.parent_id, + name: "unknown", + }; const newMap = { ...prevMap, [data.id]: updated }; setObservableTree(buildObservableTree(newMap)); return newMap;