diff --git a/core/agent/agent.go b/core/agent/agent.go index 7712ea7..7b11d71 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -71,6 +71,7 @@ func New(opts ...Option) (*Agent, error) { Character: options.character, currentState: &action.AgentInternalState{}, context: types.NewActionContext(ctx, cancel), + newConversations: make(chan openai.ChatCompletionMessage), newMessagesSubscribers: options.newConversationsSubscribers, } @@ -104,8 +105,6 @@ func New(opts ...Option) (*Agent, error) { "model", a.options.LLMAPI.Model, ) - a.startNewConversationsConsumer() - return a, nil } @@ -117,6 +116,7 @@ func (a *Agent) startNewConversationsConsumer() { return case msg := <-a.newConversations: + xlog.Debug("New conversation", "agent", a.Character.Name, "message", msg.Content) a.subscriberMutex.Lock() subs := a.newMessagesSubscribers a.subscriberMutex.Unlock() @@ -577,25 +577,28 @@ func (a *Agent) consumeJob(job *types.Job, role string) { if selfEvaluation && a.options.initiateConversations && chosenAction.Definition().Name.Is(action.ConversationActionName) { + xlog.Info("LLM decided to initiate a new conversation", "agent", a.Character.Name) + message := action.ConversationActionResponse{} if err := actionParams.Unmarshal(&message); err != nil { + xlog.Error("Error unmarshalling conversation response", "error", err) job.Result.Finish(fmt.Errorf("error unmarshalling conversation response: %w", err)) return } - conv = []openai.ChatCompletionMessage{ - { - Role: "assistant", - Content: message.Message, - }, + msg := openai.ChatCompletionMessage{ + Role: "assistant", + Content: message.Message, + } + + go func(agent *Agent) { + xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content) + agent.newConversations <- msg + }(a) + + job.Result.Conversation = []openai.ChatCompletionMessage{ + msg, } - go func() { - a.newConversations <- openai.ChatCompletionMessage{ - Role: "assistant", - Content: message.Message, - } - }() - job.Result.Conversation = conv job.Result.SetResponse("decided to initiate a new conversation") job.Result.Finish(nil) return @@ -881,6 +884,9 @@ func (a *Agent) periodicallyRun(timer *time.Timer) { } func (a *Agent) Run() error { + + a.startNewConversationsConsumer() + xlog.Debug("Agent is now running", "agent", a.Character.Name) // The agent run does two things: // picks up requests from a queue // and generates a response/perform actions diff --git a/core/agent/agent_test.go b/core/agent/agent_test.go index ebac839..45384d9 100644 --- a/core/agent/agent_test.go +++ b/core/agent/agent_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "github.com/mudler/LocalAgent/pkg/xlog" "github.com/mudler/LocalAgent/services/actions" @@ -12,6 +13,7 @@ import ( "github.com/mudler/LocalAgent/core/types" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sashabaranov/go-openai" "github.com/sashabaranov/go-openai/jsonschema" ) @@ -235,6 +237,43 @@ var _ = Describe("Agent test", func() { }) + It("Can initiate conversations", func() { + + message := openai.ChatCompletionMessage{} + mu := &sync.Mutex{} + agent, err := New( + WithLLMAPIURL(apiURL), + WithModel(testModel), + WithLLMAPIKey(apiKeyURL), + WithNewConversationSubscriber(func(m openai.ChatCompletionMessage) { + mu.Lock() + message = m + mu.Unlock() + }), + WithActions( + actions.NewSearch(map[string]string{}), + ), + EnablePlanning, + EnableForceReasoning, + EnableInitiateConversations, + EnableStandaloneJob, + EnableHUD, + WithPeriodicRuns("1s"), + WithPermanentGoal("use the new_conversation tool"), + // EnableStandaloneJob, + // WithRandomIdentity(), + ) + Expect(err).ToNot(HaveOccurred()) + go agent.Run() + defer agent.Stop() + + Eventually(func() string { + mu.Lock() + defer mu.Unlock() + return message.Content + }, "10m", "10s").ShouldNot(BeEmpty()) + }) + /* It("it automatically performs things in the background", func() { agent, err := New(