fix: do not track an internal currentConversation (#91)

It is prone to races, and does not really track all conversations for
each job

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2025-03-25 00:36:09 +01:00
committed by GitHub
parent b09749dddb
commit 53c1554d55
5 changed files with 115 additions and 177 deletions

View File

@@ -34,7 +34,6 @@ type Agent struct {
currentState *action.AgentInternalState
nextAction types.Action
nextActionParams *types.ActionParams
currentConversation Messages
selfEvaluationInProgress bool
pause bool
@@ -149,18 +148,6 @@ func (a *Agent) Ask(opts ...types.JobOption) *types.JobResult {
return j.Result.WaitResult()
}
func (a *Agent) CurrentConversation() []openai.ChatCompletionMessage {
a.Lock()
defer a.Unlock()
return a.currentConversation
}
func (a *Agent) SetConversation(conv []openai.ChatCompletionMessage) {
a.Lock()
defer a.Unlock()
a.currentConversation = conv
}
func (a *Agent) askLLM(ctx context.Context, conversation []openai.ChatCompletionMessage) (openai.ChatCompletionMessage, error) {
resp, err := a.client.CreateChatCompletion(ctx,
openai.ChatCompletionRequest{
@@ -179,20 +166,6 @@ func (a *Agent) askLLM(ctx context.Context, conversation []openai.ChatCompletion
return resp.Choices[0].Message, nil
}
func (a *Agent) ResetConversation() {
a.Lock()
defer a.Unlock()
xlog.Info("Resetting conversation", "agent", a.Character.Name)
// store into memory the conversation before pruning it
// TODO: Shall we summarize the conversation into a bullet list of highlights
// using the LLM instead?
a.saveCurrentConversation()
a.currentConversation = []openai.ChatCompletionMessage{}
}
var ErrContextCanceled = fmt.Errorf("context canceled")
func (a *Agent) Stop() {
@@ -261,7 +234,7 @@ func (a *Agent) runAction(chosenAction types.Action, params types.ActionParams)
return result, nil
}
func (a *Agent) processPrompts() {
func (a *Agent) processPrompts(conversation Messages) Messages {
//if job.Image != "" {
// TODO: Use llava to explain the image content
//}
@@ -276,25 +249,27 @@ func (a *Agent) processPrompts() {
xlog.Debug("Prompt is empty, skipping", "agent", a.Character.Name)
continue
}
if !Messages(a.currentConversation).Exist(a.options.systemPrompt) {
a.currentConversation = append([]openai.ChatCompletionMessage{
if !conversation.Exist(a.options.systemPrompt) {
conversation = append([]openai.ChatCompletionMessage{
{
Role: prompt.Role(),
Content: message,
}}, a.currentConversation...)
}}, conversation...)
}
}
// TODO: move to a Promptblock?
if a.options.systemPrompt != "" {
if !Messages(a.currentConversation).Exist(a.options.systemPrompt) {
a.currentConversation = append([]openai.ChatCompletionMessage{
if !conversation.Exist(a.options.systemPrompt) {
conversation = append([]openai.ChatCompletionMessage{
{
Role: "system",
Content: a.options.systemPrompt,
}}, a.currentConversation...)
}}, conversation...)
}
}
return conversation
}
func (a *Agent) describeImage(ctx context.Context, model, imageURL string) (string, error) {
@@ -349,90 +324,43 @@ func extractImageContent(message openai.ChatCompletionMessage) (imageURL, text s
return
}
func (a *Agent) processUserInputs(job *types.Job, role string) {
noNewMessage := job.Text == "" && job.Image == ""
onlyText := job.Text != "" && job.Image == ""
func (a *Agent) processUserInputs(job *types.Job, role string, conv Messages) Messages {
// walk conversation history, and check if last message from user contains image.
// If it does, we need to describe the image first with a model that supports image understanding (if the current model doesn't support it)
// and add it to the conversation context
if a.options.SeparatedMultimodalModel() && noNewMessage {
lastUserMessage := a.currentConversation.GetLatestUserMessage()
if lastUserMessage != nil && a.currentConversation.IsLastMessageFromRole(UserRole) {
imageURL, text, err := extractImageContent(*lastUserMessage)
if err == nil {
// We have an image, we need to describe it first
// and add it to the conversation context
imageDescription, err := a.describeImage(a.context.Context, a.options.LLMAPI.MultimodalModel, imageURL)
if err != nil {
xlog.Error("Error describing image", "error", err)
} else {
// We replace the user message with the image description
// and add the user text to the conversation
explainerMessage := openai.ChatCompletionMessage{
Role: "system",
Content: fmt.Sprintf("The user shared an image which can be described as: %s", imageDescription),
}
// remove lastUserMessage from the conversation
a.currentConversation = a.currentConversation.RemoveLastUserMessage()
a.currentConversation = append(a.currentConversation, explainerMessage)
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
Role: role,
Content: text,
})
}
}
}
if !a.options.SeparatedMultimodalModel() {
return conv
}
if onlyText {
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
Role: role,
Content: job.Text,
})
}
if job.Image != "" {
// If an image is present with the text
// we have two cases: if the model supports both images and text, we can send both
// if the model supports only text, we can send the text only and we need to describe the image first with a model that support image understanding and add it to the conversation context
if a.options.SeparatedMultimodalModel() {
// We need to describe the image first
imageDescription, err := a.describeImage(a.context.Context, a.options.LLMAPI.Model, job.Image)
lastUserMessage := conv.GetLatestUserMessage()
if lastUserMessage != nil && conv.IsLastMessageFromRole(UserRole) {
imageURL, text, err := extractImageContent(*lastUserMessage)
if err == nil {
// We have an image, we need to describe it first
// and add it to the conversation context
imageDescription, err := a.describeImage(a.context.Context, a.options.LLMAPI.MultimodalModel, imageURL)
if err != nil {
xlog.Error("Error describing image", "error", err)
} else {
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
// We replace the user message with the image description
// and add the user text to the conversation
explainerMessage := openai.ChatCompletionMessage{
Role: "system",
Content: fmt.Sprintf("The user shared an image which can be described as: %s", imageDescription),
})
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
}
// remove lastUserMessage from the conversation
conv = conv.RemoveLastUserMessage()
conv = append(conv, explainerMessage)
conv = append(conv, openai.ChatCompletionMessage{
Role: role,
Content: job.Text,
Content: text,
})
}
} else {
// Just append to the message both the image and the text
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
Role: role,
MultiContent: []openai.ChatMessagePart{
{
Type: openai.ChatMessagePartTypeText,
Text: job.Text,
},
{
Type: openai.ChatMessagePartTypeImageURL,
ImageURL: &openai.ChatMessageImageURL{
URL: job.Image,
},
},
},
})
}
}
return conv
}
func (a *Agent) consumeJob(job *types.Job, role string) {
@@ -449,14 +377,13 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
// We are self evaluating if we consume the job as a system role
selfEvaluation := role == SystemRole
conv := job.ConversationHistory
a.Lock()
// Set the action context
ctx, cancel := context.WithCancel(context.Background())
a.actionContext = types.NewActionContext(ctx, cancel)
a.selfEvaluationInProgress = selfEvaluation
if len(job.ConversationHistory) != 0 {
a.currentConversation = job.ConversationHistory
}
a.Unlock()
defer func() {
@@ -476,11 +403,11 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
}()
}
a.processPrompts()
a.processUserInputs(job, role)
conv = a.processPrompts(conv)
conv = a.processUserInputs(job, role, conv)
// RAG
a.knowledgeBaseLookup()
a.knowledgeBaseLookup(conv)
var pickTemplate string
var reEvaluationTemplate string
@@ -509,7 +436,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
a.nextAction = nil
} else {
var err error
chosenAction, actionParams, reasoning, err = a.pickAction(ctx, pickTemplate, a.currentConversation)
chosenAction, actionParams, reasoning, err = a.pickAction(ctx, pickTemplate, conv)
if err != nil {
xlog.Error("Error picking action", "error", err)
job.Result.Finish(err)
@@ -525,12 +452,12 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
//job.Result.Finish(fmt.Errorf("no action to do"))\
xlog.Info("No action to do, just reply", "agent", a.Character.Name, "reasoning", reasoning)
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
conv = append(conv, openai.ChatCompletionMessage{
Role: "assistant",
Content: reasoning,
})
job.Result.Conversation = a.currentConversation
a.saveCurrentConversation()
job.Result.Conversation = conv
a.saveCurrentConversation(conv)
job.Result.SetResponse(reasoning)
job.Result.Finish(nil)
return
@@ -550,7 +477,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
"reasoning", reasoning,
)
params, err := a.generateParameters(ctx, pickTemplate, chosenAction, a.currentConversation, reasoning)
params, err := a.generateParameters(ctx, pickTemplate, chosenAction, conv, reasoning)
if err != nil {
job.Result.Finish(fmt.Errorf("error generating action's parameters: %w", err))
return
@@ -572,7 +499,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
return
}
if err := a.handlePlanning(ctx, job, chosenAction, actionParams, reasoning, pickTemplate); err != nil {
if err := a.handlePlanning(ctx, job, chosenAction, actionParams, reasoning, pickTemplate, conv); err != nil {
job.Result.Finish(fmt.Errorf("error running action: %w", err))
return
}
@@ -590,7 +517,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
Reasoning: reasoning,
},
ActionResult: types.ActionResult{Result: "stopped by callback"}})
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(nil)
return
}
@@ -604,7 +531,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
return
}
a.currentConversation = []openai.ChatCompletionMessage{
conv = []openai.ChatCompletionMessage{
{
Role: "assistant",
Content: message.Message,
@@ -616,7 +543,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
Content: message.Message,
}
}()
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.SetResponse("decided to initiate a new conversation")
job.Result.Finish(nil)
return
@@ -647,17 +574,17 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
job.CallbackWithResult(stateResult)
xlog.Debug("Action executed", "agent", a.Character.Name, "action", chosenAction.Definition().Name, "result", result)
a.addFunctionResultToConversation(chosenAction, actionParams, result)
conv = a.addFunctionResultToConversation(chosenAction, actionParams, result, conv)
}
//a.currentConversation = append(a.currentConversation, messages...)
//a.currentConversation = messages
//conv = append(conv, messages...)
//conv = messages
// given the result, we can now ask OpenAI to complete the conversation or
// to continue using another tool given the result
followingAction, followingParams, reasoning, err := a.pickAction(ctx, reEvaluationTemplate, a.currentConversation)
followingAction, followingParams, reasoning, err := a.pickAction(ctx, reEvaluationTemplate, conv)
if err != nil {
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(fmt.Errorf("error picking action: %w", err))
return
}
@@ -673,7 +600,6 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
a.currentReasoning = reasoning
a.nextAction = followingAction
a.nextActionParams = &followingParams
job.Text = ""
a.consumeJob(job, role)
return
} else if followingAction == nil {
@@ -687,17 +613,17 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
Content: reasoning,
}
a.currentConversation = append(a.currentConversation, msg)
a.saveCurrentConversation()
conv = append(conv, msg)
a.saveCurrentConversation(conv)
job.Result.SetResponse(msg.Content)
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(nil)
return
}
}
}
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
// At this point can only be a reply action
xlog.Info("Computing reply", "agent", a.Character.Name)
@@ -706,7 +632,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
replyResponse := action.ReplyResponse{}
if err := actionParams.Unmarshal(&replyResponse); err != nil {
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(fmt.Errorf("error unmarshalling reply response: %w", err))
return
}
@@ -730,17 +656,17 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
if a.options.enableHUD {
prompt, err := renderTemplate(hudTemplate, a.prepareHUD(), a.availableActions(), reasoning)
if err != nil {
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(fmt.Errorf("error renderTemplate: %w", err))
return
}
if !a.currentConversation.Exist(prompt) {
a.currentConversation = append([]openai.ChatCompletionMessage{
if !Messages(conv).Exist(prompt) {
conv = append([]openai.ChatCompletionMessage{
{
Role: "system",
Content: prompt,
},
}, a.currentConversation...)
}, conv...)
}
}
@@ -748,7 +674,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
// resp, err := a.client.CreateChatCompletion(ctx,
// openai.ChatCompletionRequest{
// Model: a.options.LLMAPI.Model,
// Messages: append(a.currentConversation,
// Messages: append(conv,
// openai.ChatCompletionMessage{
// Role: "system",
// Content: "Assistant thought: " + replyResponse.Message,
@@ -765,19 +691,19 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
Content: replyResponse.Message,
}
a.currentConversation = append(a.currentConversation, msg)
job.Result.Conversation = a.currentConversation
conv = append(conv, msg)
job.Result.Conversation = conv
job.Result.SetResponse(msg.Content)
a.saveCurrentConversation()
a.saveCurrentConversation(conv)
job.Result.Finish(nil)
return
}
xlog.Info("Reasoning, ask LLM for a reply", "agent", a.Character.Name)
xlog.Debug("Conversation", "conversation", fmt.Sprintf("%+v", a.currentConversation))
msg, err := a.askLLM(ctx, a.currentConversation)
xlog.Debug("Conversation", "conversation", fmt.Sprintf("%+v", conv))
msg, err := a.askLLM(ctx, conv)
if err != nil {
job.Result.Conversation = a.currentConversation
job.Result.Conversation = conv
job.Result.Finish(err)
xlog.Error("Error asking LLM for a reply", "error", err)
return
@@ -793,17 +719,17 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
}
}
a.currentConversation = append(a.currentConversation, msg)
conv = append(conv, msg)
job.Result.SetResponse(msg.Content)
xlog.Info("Response from LLM", "response", msg.Content, "agent", a.Character.Name)
job.Result.Conversation = a.currentConversation
a.saveCurrentConversation()
job.Result.Conversation = conv
a.saveCurrentConversation(conv)
job.Result.Finish(nil)
}
func (a *Agent) addFunctionResultToConversation(chosenAction types.Action, actionParams types.ActionParams, result types.ActionResult) {
func (a *Agent) addFunctionResultToConversation(chosenAction types.Action, actionParams types.ActionParams, result types.ActionResult, conv Messages) Messages {
// calling the function
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
conv = append(conv, openai.ChatCompletionMessage{
Role: "assistant",
ToolCalls: []openai.ToolCall{
{
@@ -817,12 +743,14 @@ func (a *Agent) addFunctionResultToConversation(chosenAction types.Action, actio
})
// result of calling the function
a.currentConversation = append(a.currentConversation, openai.ChatCompletionMessage{
conv = append(conv, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleTool,
Content: result.Result,
Name: chosenAction.Definition().Name.String(),
ToolCallID: chosenAction.Definition().Name.String(),
})
return conv
}
// This is running in the background.
@@ -839,9 +767,7 @@ func (a *Agent) periodicallyRun(timer *time.Timer) {
// This would be a special action that would be picked up by the agent
// and would be used to contact the user.
xlog.Info("START -- Periodically run is starting")
// if len(a.CurrentConversation()) != 0 {
// if len(conv()) != 0 {
// // Here the LLM could decide to store some part of the conversation too in the memory
// evaluateMemory := NewJob(
// WithText(
@@ -856,10 +782,9 @@ func (a *Agent) periodicallyRun(timer *time.Timer) {
// }
if !a.options.standaloneJob {
a.ResetConversation()
return
}
xlog.Info("Periodically running", "agent", a.Character.Name)
// Here we go in a loop of
// - asking the agent to do something
@@ -873,9 +798,8 @@ func (a *Agent) periodicallyRun(timer *time.Timer) {
types.WithResultCallback(a.options.resultCallback),
)
a.consumeJob(whatNext, SystemRole)
a.ResetConversation()
xlog.Info("STOP -- Periodically run is done")
xlog.Info("STOP -- Periodically run is done", "agent", a.Character.Name)
// Save results from state