feat: retrials (#110)

* feat(jobs): rework next actions

Also attempt to retry when failing at generating parameters

Signed-off-by: mudler <mudler@localai.io>

* feat(retries): add retries for common operations

Signed-off-by: mudler <mudler@localai.io>

---------

Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2025-03-28 21:27:34 +01:00
committed by GitHub
parent 62940a1a56
commit 0644daa477
4 changed files with 95 additions and 41 deletions

View File

@@ -135,18 +135,13 @@ func (m Messages) IsLastMessageFromRole(role string) bool {
return m[len(m)-1].Role == role return m[len(m)-1].Role == role
} }
func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string) (*decisionResult, error) { func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string, maxAttempts int) (*decisionResult, error) {
stateHUD, err := renderTemplate(pickTemplate, a.prepareHUD(), a.availableActions(), reasoning) stateHUD, err := renderTemplate(pickTemplate, a.prepareHUD(), a.availableActions(), reasoning)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// check if there is already a message with the hud in the conversation already, otherwise
// add a message at the top with it
conversation := c conversation := c
if !Messages(c).Exist(stateHUD) && a.options.enableHUD { if !Messages(c).Exist(stateHUD) && a.options.enableHUD {
conversation = append([]openai.ChatCompletionMessage{ conversation = append([]openai.ChatCompletionMessage{
{ {
@@ -164,14 +159,25 @@ func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act
}) })
} }
return a.decision(ctx, var result *decisionResult
cc, var attemptErr error
a.availableActions().ToTools(),
openai.ToolChoice{ for attempts := 0; attempts < maxAttempts; attempts++ {
Type: openai.ToolTypeFunction, result, attemptErr = a.decision(ctx,
Function: openai.ToolFunction{Name: act.Definition().Name.String()}, cc,
}, a.availableActions().ToTools(),
) openai.ToolChoice{
Type: openai.ToolTypeFunction,
Function: openai.ToolFunction{Name: act.Definition().Name.String()},
},
)
if attemptErr == nil && result.actionParams != nil {
return result, nil
}
xlog.Warn("Attempt to generate parameters failed", "attempt", attempts+1, "error", attemptErr)
}
return nil, fmt.Errorf("failed to generate parameters after %d attempts: %w", maxAttempts, attemptErr)
} }
func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction types.Action, actionParams types.ActionParams, reasoning string, pickTemplate string, conv Messages) (Messages, error) { func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction types.Action, actionParams types.ActionParams, reasoning string, pickTemplate string, conv Messages) (Messages, error) {
@@ -221,7 +227,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction
subTaskAction := a.availableActions().Find(subtask.Action) subTaskAction := a.availableActions().Find(subtask.Action)
subTaskReasoning := fmt.Sprintf("%s Overall goal is: %s", subtask.Reasoning, planResult.Goal) subTaskReasoning := fmt.Sprintf("%s Overall goal is: %s", subtask.Reasoning, planResult.Goal)
params, err := a.generateParameters(ctx, pickTemplate, subTaskAction, conv, subTaskReasoning) params, err := a.generateParameters(ctx, pickTemplate, subTaskAction, conv, subTaskReasoning, maxRetries)
if err != nil { if err != nil {
return conv, fmt.Errorf("error generating action's parameters: %w", err) return conv, fmt.Errorf("error generating action's parameters: %w", err)

View File

@@ -19,6 +19,7 @@ const (
UserRole = "user" UserRole = "user"
AssistantRole = "assistant" AssistantRole = "assistant"
SystemRole = "system" SystemRole = "system"
maxRetries = 5
) )
type Agent struct { type Agent struct {
@@ -29,10 +30,8 @@ type Agent struct {
jobQueue chan *types.Job jobQueue chan *types.Job
context *types.ActionContext context *types.ActionContext
currentReasoning string currentState *action.AgentInternalState
currentState *action.AgentInternalState
nextAction types.Action
nextActionParams *types.ActionParams
selfEvaluationInProgress bool selfEvaluationInProgress bool
pause bool pause bool
@@ -175,19 +174,32 @@ func (a *Agent) Enqueue(j *types.Job) {
a.jobQueue <- j a.jobQueue <- j
} }
func (a *Agent) askLLM(ctx context.Context, conversation []openai.ChatCompletionMessage) (openai.ChatCompletionMessage, error) { func (a *Agent) askLLM(ctx context.Context, conversation []openai.ChatCompletionMessage, maxRetries int) (openai.ChatCompletionMessage, error) {
resp, err := a.client.CreateChatCompletion(ctx, var resp openai.ChatCompletionResponse
openai.ChatCompletionRequest{ var err error
Model: a.options.LLMAPI.Model,
Messages: conversation, for attempt := 0; attempt <= maxRetries; attempt++ {
}, resp, err = a.client.CreateChatCompletion(ctx,
) openai.ChatCompletionRequest{
Model: a.options.LLMAPI.Model,
Messages: conversation,
},
)
if err == nil && len(resp.Choices) == 1 && resp.Choices[0].Message.Content != "" {
break
}
xlog.Warn("Error asking LLM, retrying", "attempt", attempt+1, "error", err)
if attempt < maxRetries {
time.Sleep(2 * time.Second) // Optional: Add a delay between retries
}
}
if err != nil { if err != nil {
return openai.ChatCompletionMessage{}, err return openai.ChatCompletionMessage{}, err
} }
if len(resp.Choices) != 1 { if len(resp.Choices) != 1 {
return openai.ChatCompletionMessage{}, fmt.Errorf("no enough choices: %w", err) return openai.ChatCompletionMessage{}, fmt.Errorf("not enough choices: %w", err)
} }
return resp.Choices[0].Message, nil return resp.Choices[0].Message, nil
@@ -447,15 +459,26 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
var reasoning string var reasoning string
var actionParams types.ActionParams var actionParams types.ActionParams
if a.nextAction != nil { if job.HasNextAction() {
// if we are being re-evaluated, we already have the action // if we are being re-evaluated, we already have the action
// and the reasoning. Consume it here and reset it // and the reasoning. Consume it here and reset it
chosenAction = a.nextAction action, params, reason := job.GetNextAction()
reasoning = a.currentReasoning chosenAction = *action
actionParams = *a.nextActionParams reasoning = reason
a.currentReasoning = "" if params == nil {
a.nextActionParams = nil p, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
a.nextAction = nil if err != nil {
xlog.Error("Error generating parameters, trying again", "error", err)
// try again
job.SetNextAction(&chosenAction, nil, reasoning)
a.consumeJob(job, role)
return
}
actionParams = p.actionParams
} else {
actionParams = *params
}
job.ResetNextAction()
} else { } else {
var err error var err error
chosenAction, actionParams, reasoning, err = a.pickAction(job.GetContext(), pickTemplate, conv) chosenAction, actionParams, reasoning, err = a.pickAction(job.GetContext(), pickTemplate, conv)
@@ -501,9 +524,12 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
"reasoning", reasoning, "reasoning", reasoning,
) )
params, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning) params, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
if err != nil { if err != nil {
job.Result.Finish(fmt.Errorf("error generating action's parameters: %w", err)) xlog.Error("Error generating parameters, trying again", "error", err)
// try again
job.SetNextAction(&chosenAction, nil, reasoning)
a.consumeJob(job, role)
return return
} }
actionParams = params.actionParams actionParams = params.actionParams
@@ -625,9 +651,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
// We need to do another action (?) // We need to do another action (?)
// The agent decided to do another action // The agent decided to do another action
// call ourselves again // call ourselves again
a.currentReasoning = reasoning job.SetNextAction(&followingAction, &followingParams, reasoning)
a.nextAction = followingAction
a.nextActionParams = &followingParams
a.consumeJob(job, role) a.consumeJob(job, role)
return return
} else if followingAction == nil { } else if followingAction == nil {
@@ -731,7 +755,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
xlog.Info("Reasoning, ask LLM for a reply", "agent", a.Character.Name) xlog.Info("Reasoning, ask LLM for a reply", "agent", a.Character.Name)
xlog.Debug("Conversation", "conversation", fmt.Sprintf("%+v", conv)) xlog.Debug("Conversation", "conversation", fmt.Sprintf("%+v", conv))
msg, err := a.askLLM(job.GetContext(), conv) msg, err := a.askLLM(job.GetContext(), conv, maxRetries)
if err != nil { if err != nil {
job.Result.Conversation = conv job.Result.Conversation = conv
job.Result.Finish(err) job.Result.Finish(err)

View File

@@ -87,7 +87,7 @@ func (a *Agent) saveCurrentConversation(conv Messages) {
msg, err := a.askLLM(a.context.Context, []openai.ChatCompletionMessage{{ msg, err := a.askLLM(a.context.Context, []openai.ChatCompletionMessage{{
Role: "user", Role: "user",
Content: "Summarize the conversation below, keep the highlights as a bullet list:\n" + Messages(conv).String(), Content: "Summarize the conversation below, keep the highlights as a bullet list:\n" + Messages(conv).String(),
}}) }}, maxRetries)
if err != nil { if err != nil {
xlog.Error("Error summarizing conversation", "error", err) xlog.Error("Error summarizing conversation", "error", err)
} }

View File

@@ -21,6 +21,10 @@ type Job struct {
UUID string UUID string
Metadata map[string]interface{} Metadata map[string]interface{}
nextAction *Action
nextActionParams *ActionParams
nextActionReasoning string
context context.Context context context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
@@ -85,6 +89,26 @@ func (j *Job) CallbackWithResult(stateResult ActionState) {
j.ResultCallback(stateResult) j.ResultCallback(stateResult)
} }
func (j *Job) SetNextAction(action *Action, params *ActionParams, reasoning string) {
j.nextAction = action
j.nextActionParams = params
j.nextActionReasoning = reasoning
}
func (j *Job) GetNextAction() (*Action, *ActionParams, string) {
return j.nextAction, j.nextActionParams, j.nextActionReasoning
}
func (j *Job) HasNextAction() bool {
return j.nextAction != nil
}
func (j *Job) ResetNextAction() {
j.nextAction = nil
j.nextActionParams = nil
j.nextActionReasoning = ""
}
func WithTextImage(text, image string) JobOption { func WithTextImage(text, image string) JobOption {
return func(j *Job) { return func(j *Job) {
j.ConversationHistory = append(j.ConversationHistory, openai.ChatCompletionMessage{ j.ConversationHistory = append(j.ConversationHistory, openai.ChatCompletionMessage{