feat: allow the agent to perform things concurrently

Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
mudler
2025-04-22 12:06:20 +02:00
parent 9e52438877
commit 7fcd5b86b9
4 changed files with 57 additions and 15 deletions

View File

@@ -1006,34 +1006,53 @@ func (a *Agent) Run() error {
//todoTimer := time.NewTicker(a.options.periodicRuns) //todoTimer := time.NewTicker(a.options.periodicRuns)
timer := time.NewTimer(a.options.periodicRuns) timer := time.NewTimer(a.options.periodicRuns)
if a.options.parallelJobs > 1 {
// we fire the periodicalRunner only once.
go a.periodicalRunRunner(timer)
for range a.options.parallelJobs {
go a.run(timer)
}
select {}
} else {
go a.periodicalRunRunner(timer)
return a.run(timer)
}
}
func (a *Agent) run(timer *time.Timer) error {
for { for {
xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name) xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name)
select { select {
case job := <-a.jobQueue: case job := <-a.jobQueue:
a.loop(timer, job) if !timer.Stop() {
<-timer.C
}
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
a.consumeJob(job, UserRole)
timer.Reset(a.options.periodicRuns)
case <-a.context.Done(): case <-a.context.Done():
// Agent has been canceled, return error // Agent has been canceled, return error
xlog.Warn("Agent has been canceled", "agent", a.Character.Name) xlog.Warn("Agent has been canceled", "agent", a.Character.Name)
return ErrContextCanceled return ErrContextCanceled
}
}
}
func (a *Agent) periodicalRunRunner(timer *time.Timer) {
for {
xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name)
select {
case <-a.context.Done():
// Agent has been canceled, return error
xlog.Warn("Agent has been canceled", "agent", a.Character.Name)
return
case <-timer.C: case <-timer.C:
a.periodicallyRun(timer) a.periodicallyRun(timer)
} }
} }
} }
func (a *Agent) loop(timer *time.Timer, job *types.Job) {
// Remember always to reset the timer - if we don't the agent will stop..
defer timer.Reset(a.options.periodicRuns)
// Consume the job and generate a response
// TODO: Give a short-term memory to the agent
// stop and drain the timer
if !timer.Stop() {
<-timer.C
}
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
a.consumeJob(job, UserRole)
}
func (a *Agent) Observer() Observer { func (a *Agent) Observer() Observer {
return a.observer return a.observer
} }

View File

@@ -55,6 +55,7 @@ type options struct {
newConversationsSubscribers []func(openai.ChatCompletionMessage) newConversationsSubscribers []func(openai.ChatCompletionMessage)
observer Observer observer Observer
parallelJobs int
} }
func (o *options) SeparatedMultimodalModel() bool { func (o *options) SeparatedMultimodalModel() bool {
@@ -138,6 +139,13 @@ func EnableKnowledgeBaseWithResults(results int) Option {
} }
} }
func WithParallelJobs(jobs int) Option {
return func(o *options) error {
o.parallelJobs = jobs
return nil
}
}
func WithNewConversationSubscriber(sub func(openai.ChatCompletionMessage)) Option { func WithNewConversationSubscriber(sub func(openai.ChatCompletionMessage)) Option {
return func(o *options) error { return func(o *options) error {
o.newConversationsSubscribers = append(o.newConversationsSubscribers, sub) o.newConversationsSubscribers = append(o.newConversationsSubscribers, sub)

View File

@@ -61,6 +61,7 @@ type AgentConfig struct {
SystemPrompt string `json:"system_prompt" form:"system_prompt"` SystemPrompt string `json:"system_prompt" form:"system_prompt"`
LongTermMemory bool `json:"long_term_memory" form:"long_term_memory"` LongTermMemory bool `json:"long_term_memory" form:"long_term_memory"`
SummaryLongTermMemory bool `json:"summary_long_term_memory" form:"summary_long_term_memory"` SummaryLongTermMemory bool `json:"summary_long_term_memory" form:"summary_long_term_memory"`
ParallelJobs int `json:"parallel_jobs" form:"parallel_jobs"`
} }
type AgentConfigMeta struct { type AgentConfigMeta struct {
@@ -260,6 +261,16 @@ func NewAgentConfigMeta(
Step: 1, Step: 1,
Tags: config.Tags{Section: "AdvancedSettings"}, Tags: config.Tags{Section: "AdvancedSettings"},
}, },
{
Name: "parallel_jobs",
Label: "Parallel Jobs",
Type: "number",
DefaultValue: 5,
Min: 1,
Step: 1,
HelpText: "Number of concurrent tasks that can run in parallel",
Tags: config.Tags{Section: "AdvancedSettings"},
},
}, },
MCPServers: []config.Field{ MCPServers: []config.Field{
{ {

View File

@@ -466,6 +466,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps)) opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps))
} }
if config.ParallelJobs > 0 {
opts = append(opts, WithParallelJobs(config.ParallelJobs))
}
xlog.Info("Starting agent", "name", name, "config", config) xlog.Info("Starting agent", "name", name, "config", config)
agent, err := New(opts...) agent, err := New(opts...)