Files
LocalAGI/core/state/pool.go
Richard Palethorpe f289e824df fix: Handle state on agent restart and update observables
Keep some agent start across restarts, such as the SSE manager and
observer. This allows restarts to be shown on the state page and also
allows avatars to be kept when reconfiguring the agent.

Also observable updates can happen out of order because SSE manager has
multiple workers. For now handle this in the client.

Finally fix an issue with the IRC client to make it disconnect and
handle being assigned a different nickname by the server.

Signed-off-by: Richard Palethorpe <io@richiejp.com>
2025-04-23 07:23:51 +01:00

689 lines
16 KiB
Go

package state
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
. "github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/sse"
"github.com/mudler/LocalAGI/core/types"
"github.com/mudler/LocalAGI/pkg/llm"
"github.com/mudler/LocalAGI/pkg/localrag"
"github.com/mudler/LocalAGI/pkg/utils"
"github.com/sashabaranov/go-openai"
"github.com/sashabaranov/go-openai/jsonschema"
"github.com/mudler/LocalAGI/pkg/xlog"
)
type AgentPool struct {
sync.Mutex
file string
pooldir string
pool AgentPoolData
agents map[string]*Agent
managers map[string]sse.Manager
agentStatus map[string]*Status
apiURL, defaultModel, defaultMultimodalModel string
imageModel, localRAGAPI, localRAGKey, apiKey string
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action
connectors func(*AgentConfig) []Connector
dynamicPrompt func(*AgentConfig) []DynamicPrompt
timeout string
conversationLogs string
}
type Status struct {
ActionResults []types.ActionState
}
func (s *Status) addResult(result types.ActionState) {
// If we have more than 10 results, remove the oldest one
if len(s.ActionResults) > 10 {
s.ActionResults = s.ActionResults[1:]
}
s.ActionResults = append(s.ActionResults, result)
}
func (s *Status) Results() []types.ActionState {
return s.ActionResults
}
type AgentPoolData map[string]AgentConfig
func loadPoolFromFile(path string) (*AgentPoolData, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
poolData := &AgentPoolData{}
err = json.Unmarshal(data, poolData)
return poolData, err
}
func NewAgentPool(
defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory string,
LocalRAGAPI string,
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action,
connectors func(*AgentConfig) []Connector,
promptBlocks func(*AgentConfig) []DynamicPrompt,
timeout string,
withLogs bool,
) (*AgentPool, error) {
// if file exists, try to load an existing pool.
// if file does not exist, create a new pool.
poolfile := filepath.Join(directory, "pool.json")
conversationPath := ""
if withLogs {
conversationPath = filepath.Join(directory, "conversations")
}
if _, err := os.Stat(poolfile); err != nil {
// file does not exist, create a new pool
return &AgentPool{
file: poolfile,
pooldir: directory,
apiURL: apiURL,
defaultModel: defaultModel,
defaultMultimodalModel: defaultMultimodalModel,
imageModel: imageModel,
localRAGAPI: LocalRAGAPI,
apiKey: apiKey,
agents: make(map[string]*Agent),
pool: make(map[string]AgentConfig),
agentStatus: make(map[string]*Status),
managers: make(map[string]sse.Manager),
connectors: connectors,
availableActions: availableActions,
dynamicPrompt: promptBlocks,
timeout: timeout,
conversationLogs: conversationPath,
}, nil
}
poolData, err := loadPoolFromFile(poolfile)
if err != nil {
return nil, err
}
return &AgentPool{
file: poolfile,
apiURL: apiURL,
pooldir: directory,
defaultModel: defaultModel,
defaultMultimodalModel: defaultMultimodalModel,
imageModel: imageModel,
apiKey: apiKey,
agents: make(map[string]*Agent),
managers: make(map[string]sse.Manager),
agentStatus: map[string]*Status{},
pool: *poolData,
connectors: connectors,
localRAGAPI: LocalRAGAPI,
dynamicPrompt: promptBlocks,
availableActions: availableActions,
timeout: timeout,
conversationLogs: conversationPath,
}, nil
}
func replaceInvalidChars(s string) string {
s = strings.ReplaceAll(s, "/", "_")
return strings.ReplaceAll(s, " ", "_")
}
// CreateAgent adds a new agent to the pool
// and starts it.
// It also saves the state to the file.
func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error {
a.Lock()
defer a.Unlock()
name = replaceInvalidChars(name)
agentConfig.Name = name
if _, ok := a.pool[name]; ok {
return fmt.Errorf("agent %s already exists", name)
}
a.pool[name] = *agentConfig
if err := a.save(); err != nil {
return err
}
go func(ac AgentConfig) {
// Create the agent avatar
if err := createAgentAvatar(a.apiURL, a.apiKey, a.defaultModel, a.imageModel, a.pooldir, ac); err != nil {
xlog.Error("Failed to create agent avatar", "error", err)
}
}(a.pool[name])
return a.startAgentWithConfig(name, agentConfig, nil)
}
func (a *AgentPool) RecreateAgent(name string, agentConfig *AgentConfig) error {
a.Lock()
defer a.Unlock()
oldAgent := a.agents[name]
var o *types.Observable
obs := oldAgent.Observer()
if obs != nil {
o = obs.NewObservable()
o.Name = "Restarting Agent"
o.Icon = "sync"
o.Creation = &types.Creation{}
obs.Update(*o)
}
stateFile, characterFile := a.stateFiles(name)
os.Remove(stateFile)
os.Remove(characterFile)
oldAgent.Stop()
a.pool[name] = *agentConfig
delete(a.agents, name)
if err := a.save(); err != nil {
if obs != nil {
o.Completion = &types.Completion{Error: err.Error()}
obs.Update(*o)
}
return err
}
if err := a.startAgentWithConfig(name, agentConfig, obs); err != nil {
if obs != nil {
o.Completion = &types.Completion{Error: err.Error()}
obs.Update(*o)
}
return err
}
if obs != nil {
o.Completion = &types.Completion{}
obs.Update(*o)
}
return nil
}
func createAgentAvatar(APIURL, APIKey, model, imageModel, avatarDir string, agent AgentConfig) error {
client := llm.NewClient(APIKey, APIURL+"/v1", "10m")
if imageModel == "" {
return fmt.Errorf("image model not set")
}
if model == "" {
return fmt.Errorf("default model not set")
}
imagePath := filepath.Join(avatarDir, "avatars", fmt.Sprintf("%s.png", agent.Name))
if _, err := os.Stat(imagePath); err == nil {
// Image already exists
xlog.Debug("Avatar already exists", "path", imagePath)
return nil
}
var results struct {
ImagePrompt string `json:"image_prompt"`
}
err := llm.GenerateTypedJSON(
context.Background(),
llm.NewClient(APIKey, APIURL, "10m"),
"Generate a prompt that I can use to create a random avatar for the bot '"+agent.Name+"', the description of the bot is: "+agent.Description,
model,
jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"image_prompt": {
Type: jsonschema.String,
Description: "The prompt to generate the image",
},
},
Required: []string{"image_prompt"},
}, &results)
if err != nil {
return fmt.Errorf("failed to generate image prompt: %w", err)
}
if results.ImagePrompt == "" {
xlog.Error("Failed to generate image prompt")
return fmt.Errorf("failed to generate image prompt")
}
req := openai.ImageRequest{
Prompt: results.ImagePrompt,
Model: imageModel,
Size: openai.CreateImageSize256x256,
ResponseFormat: openai.CreateImageResponseFormatB64JSON,
}
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
resp, err := client.CreateImage(ctx, req)
if err != nil {
return fmt.Errorf("failed to generate image: %w", err)
}
if len(resp.Data) == 0 {
return fmt.Errorf("failed to generate image")
}
imageJson := resp.Data[0].B64JSON
os.MkdirAll(filepath.Join(avatarDir, "avatars"), 0755)
// Save the image to the agent directory
imageData, err := base64.StdEncoding.DecodeString(imageJson)
if err != nil {
return err
}
return os.WriteFile(imagePath, imageData, 0644)
}
func (a *AgentPool) List() []string {
a.Lock()
defer a.Unlock()
var agents []string
for agent := range a.pool {
agents = append(agents, agent)
}
// return a sorted list
sort.SliceStable(agents, func(i, j int) bool {
return agents[i] < agents[j]
})
return agents
}
func (a *AgentPool) GetStatusHistory(name string) *Status {
a.Lock()
defer a.Unlock()
return a.agentStatus[name]
}
func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs Observer) error {
var manager sse.Manager
if m, ok := a.managers[name]; ok {
manager = m
} else {
manager = sse.NewManager(5)
}
ctx := context.Background()
model := a.defaultModel
multimodalModel := a.defaultMultimodalModel
if config.MultimodalModel != "" {
multimodalModel = config.MultimodalModel
}
if config.Model != "" {
model = config.Model
}
if config.PeriodicRuns == "" {
config.PeriodicRuns = "10m"
}
if config.APIURL != "" {
a.apiURL = config.APIURL
}
if config.APIKey != "" {
a.apiKey = config.APIKey
}
if config.LocalRAGURL != "" {
a.localRAGAPI = config.LocalRAGURL
}
if config.LocalRAGAPIKey != "" {
a.localRAGKey = config.LocalRAGAPIKey
}
connectors := a.connectors(config)
promptBlocks := a.dynamicPrompt(config)
actions := a.availableActions(config)(ctx, a)
stateFile, characterFile := a.stateFiles(name)
actionsLog := []string{}
for _, action := range actions {
actionsLog = append(actionsLog, action.Definition().Name.String())
}
connectorLog := []string{}
for _, connector := range connectors {
connectorLog = append(connectorLog, fmt.Sprintf("%+v", connector))
}
xlog.Info(
"Creating agent",
"name", name,
"model", model,
"api_url", a.apiURL,
"actions", actionsLog,
"connectors", connectorLog,
)
// dynamicPrompts := []map[string]string{}
// for _, p := range config.DynamicPrompts {
// dynamicPrompts = append(dynamicPrompts, p.ToMap())
// }
if obs == nil {
obs = NewSSEObserver(name, manager)
}
opts := []Option{
WithModel(model),
WithLLMAPIURL(a.apiURL),
WithContext(ctx),
WithMCPServers(config.MCPServers...),
WithPeriodicRuns(config.PeriodicRuns),
WithPermanentGoal(config.PermanentGoal),
WithPrompts(promptBlocks...),
// WithDynamicPrompts(dynamicPrompts...),
WithCharacter(Character{
Name: name,
}),
WithActions(
actions...,
),
WithStateFile(stateFile),
WithCharacterFile(characterFile),
WithLLMAPIKey(a.apiKey),
WithTimeout(a.timeout),
WithRAGDB(localrag.NewWrappedClient(a.localRAGAPI, a.localRAGKey, name)),
WithAgentReasoningCallback(func(state types.ActionCurrentState) bool {
xlog.Info(
"Agent is thinking",
"agent", name,
"reasoning", state.Reasoning,
"action", state.Action.Definition().Name,
"params", state.Params,
)
manager.Send(
sse.NewMessage(
fmt.Sprintf(`Thinking: %s`, utils.HTMLify(state.Reasoning)),
).WithEvent("status"),
)
for _, c := range connectors {
if !c.AgentReasoningCallback()(state) {
return false
}
}
return true
}),
WithSystemPrompt(config.SystemPrompt),
WithMultimodalModel(multimodalModel),
WithAgentResultCallback(func(state types.ActionState) {
a.Lock()
if _, ok := a.agentStatus[name]; !ok {
a.agentStatus[name] = &Status{}
}
a.agentStatus[name].addResult(state)
a.Unlock()
xlog.Debug(
"Calling agent result callback",
)
text := fmt.Sprintf(`Reasoning: %s
Action taken: %+v
Parameters: %+v
Result: %s`,
state.Reasoning,
state.ActionCurrentState.Action.Definition().Name,
state.ActionCurrentState.Params,
state.Result)
manager.Send(
sse.NewMessage(
utils.HTMLify(
text,
),
).WithEvent("status"),
)
for _, c := range connectors {
c.AgentResultCallback()(state)
}
}),
WithObserver(obs),
}
if config.HUD {
opts = append(opts, EnableHUD)
}
if a.conversationLogs != "" {
opts = append(opts, WithConversationsPath(a.conversationLogs))
}
if config.StandaloneJob {
opts = append(opts, EnableStandaloneJob)
}
if config.LongTermMemory {
opts = append(opts, EnableLongTermMemory)
}
if config.SummaryLongTermMemory {
opts = append(opts, EnableSummaryMemory)
}
if config.CanStopItself {
opts = append(opts, CanStopItself)
}
if config.CanPlan {
opts = append(opts, EnablePlanning)
}
if config.InitiateConversations {
opts = append(opts, EnableInitiateConversations)
}
if config.RandomIdentity {
if config.IdentityGuidance != "" {
opts = append(opts, WithRandomIdentity(config.IdentityGuidance))
} else {
opts = append(opts, WithRandomIdentity())
}
}
if config.EnableKnowledgeBase {
opts = append(opts, EnableKnowledgeBase)
}
if config.EnableReasoning {
opts = append(opts, EnableForceReasoning)
}
if config.KnowledgeBaseResults > 0 {
opts = append(opts, EnableKnowledgeBaseWithResults(config.KnowledgeBaseResults))
}
if config.LoopDetectionSteps > 0 {
opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps))
}
xlog.Info("Starting agent", "name", name, "config", config)
agent, err := New(opts...)
if err != nil {
return err
}
a.agents[name] = agent
a.managers[name] = manager
go func() {
if err := agent.Run(); err != nil {
xlog.Error("Agent stopped", "error", err.Error(), "name", name)
}
}()
xlog.Info("Starting connectors", "name", name, "config", config)
for _, c := range connectors {
go c.Start(agent)
}
go func() {
for {
time.Sleep(1 * time.Second) // Send a message every seconds
manager.Send(sse.NewMessage(
utils.HTMLify(agent.State().String()),
).WithEvent("hud"))
}
}()
xlog.Info("Agent started", "name", name)
return nil
}
// Starts all the agents in the pool
func (a *AgentPool) StartAll() error {
a.Lock()
defer a.Unlock()
for name, config := range a.pool {
if a.agents[name] != nil { // Agent already started
continue
}
if err := a.startAgentWithConfig(name, &config, nil); err != nil {
xlog.Error("Failed to start agent", "name", name, "error", err)
}
}
return nil
}
func (a *AgentPool) StopAll() {
a.Lock()
defer a.Unlock()
for _, agent := range a.agents {
agent.Stop()
}
}
func (a *AgentPool) Stop(name string) {
a.Lock()
defer a.Unlock()
a.stop(name)
}
func (a *AgentPool) stop(name string) {
if agent, ok := a.agents[name]; ok {
agent.Stop()
}
}
func (a *AgentPool) Start(name string) error {
a.Lock()
defer a.Unlock()
if agent, ok := a.agents[name]; ok {
err := agent.Run()
if err != nil {
return fmt.Errorf("agent %s failed to start: %w", name, err)
}
xlog.Info("Agent started", "name", name)
return nil
}
if config, ok := a.pool[name]; ok {
return a.startAgentWithConfig(name, &config, nil)
}
return fmt.Errorf("agent %s not found", name)
}
func (a *AgentPool) stateFiles(name string) (string, string) {
stateFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.state.json", name))
characterFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.character.json", name))
return stateFile, characterFile
}
func (a *AgentPool) Remove(name string) error {
a.Lock()
defer a.Unlock()
// Cleanup character and state
stateFile, characterFile := a.stateFiles(name)
os.Remove(stateFile)
os.Remove(characterFile)
a.stop(name)
delete(a.agents, name)
delete(a.pool, name)
// remove avatar
os.Remove(filepath.Join(a.pooldir, "avatars", fmt.Sprintf("%s.png", name)))
if err := a.save(); err != nil {
return err
}
return nil
}
func (a *AgentPool) Save() error {
a.Lock()
defer a.Unlock()
return a.save()
}
func (a *AgentPool) save() error {
data, err := json.MarshalIndent(a.pool, "", " ")
if err != nil {
return err
}
return os.WriteFile(a.file, data, 0644)
}
func (a *AgentPool) GetAgent(name string) *Agent {
a.Lock()
defer a.Unlock()
return a.agents[name]
}
func (a *AgentPool) AllAgents() []string {
a.Lock()
defer a.Unlock()
var agents []string
for agent := range a.agents {
agents = append(agents, agent)
}
return agents
}
func (a *AgentPool) GetConfig(name string) *AgentConfig {
a.Lock()
defer a.Unlock()
agent, exists := a.pool[name]
if !exists {
return nil
}
return &agent
}
func (a *AgentPool) GetManager(name string) sse.Manager {
a.Lock()
defer a.Unlock()
return a.managers[name]
}