From 25bb3fb1230973ac1d2b290cfb6013ca1dedb693 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 22 Apr 2025 16:49:28 +0200 Subject: [PATCH] feat: allow the agent to perform things concurrently (#74) * feat: allow the agent to perform things concurrently Signed-off-by: mudler * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * collect errors Signed-off-by: mudler --------- Signed-off-by: mudler Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- core/agent/agent.go | 58 ++++++++++++++++++++++++++++++++----------- core/agent/options.go | 10 +++++++- core/state/config.go | 11 ++++++++ core/state/pool.go | 4 +++ 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/core/agent/agent.go b/core/agent/agent.go index 54a328a..44b91eb 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "fmt" "os" "sync" @@ -1004,36 +1005,63 @@ func (a *Agent) Run() error { // Expose a REST API to interact with the agent to ask it things - //todoTimer := time.NewTicker(a.options.periodicRuns) timer := time.NewTimer(a.options.periodicRuns) + + // we fire the periodicalRunner only once. + go a.periodicalRunRunner(timer) + var errs []error + var muErr sync.Mutex + var wg sync.WaitGroup + + for i := 0; i <= a.options.parallelJobs; i++ { + xlog.Debug("Starting agent worker", "worker", i) + wg.Add(1) + go func() { + e := a.run(timer) + muErr.Lock() + errs = append(errs, e) + muErr.Unlock() + wg.Done() + }() + } + + wg.Wait() + + return errors.Join(errs...) +} + +func (a *Agent) run(timer *time.Timer) error { for { xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name) select { case job := <-a.jobQueue: - a.loop(timer, job) + if !timer.Stop() { + <-timer.C + } + xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job) + a.consumeJob(job, UserRole) + timer.Reset(a.options.periodicRuns) case <-a.context.Done(): // Agent has been canceled, return error xlog.Warn("Agent has been canceled", "agent", a.Character.Name) return ErrContextCanceled + } + } +} + +func (a *Agent) periodicalRunRunner(timer *time.Timer) { + for { + select { + case <-a.context.Done(): + // Agent has been canceled, return error + xlog.Warn("periodicalRunner has been canceled", "agent", a.Character.Name) + return case <-timer.C: a.periodicallyRun(timer) } } } -func (a *Agent) loop(timer *time.Timer, job *types.Job) { - // Remember always to reset the timer - if we don't the agent will stop.. - defer timer.Reset(a.options.periodicRuns) - // Consume the job and generate a response - // TODO: Give a short-term memory to the agent - // stop and drain the timer - if !timer.Stop() { - <-timer.C - } - xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job) - a.consumeJob(job, UserRole) -} - func (a *Agent) Observer() Observer { return a.observer } diff --git a/core/agent/options.go b/core/agent/options.go index d8a2b67..d27896a 100644 --- a/core/agent/options.go +++ b/core/agent/options.go @@ -54,7 +54,8 @@ type options struct { newConversationsSubscribers []func(openai.ChatCompletionMessage) - observer Observer + observer Observer + parallelJobs int } func (o *options) SeparatedMultimodalModel() bool { @@ -138,6 +139,13 @@ func EnableKnowledgeBaseWithResults(results int) Option { } } +func WithParallelJobs(jobs int) Option { + return func(o *options) error { + o.parallelJobs = jobs + return nil + } +} + func WithNewConversationSubscriber(sub func(openai.ChatCompletionMessage)) Option { return func(o *options) error { o.newConversationsSubscribers = append(o.newConversationsSubscribers, sub) diff --git a/core/state/config.go b/core/state/config.go index 2d7a110..c510384 100644 --- a/core/state/config.go +++ b/core/state/config.go @@ -61,6 +61,7 @@ type AgentConfig struct { SystemPrompt string `json:"system_prompt" form:"system_prompt"` LongTermMemory bool `json:"long_term_memory" form:"long_term_memory"` SummaryLongTermMemory bool `json:"summary_long_term_memory" form:"summary_long_term_memory"` + ParallelJobs int `json:"parallel_jobs" form:"parallel_jobs"` } type AgentConfigMeta struct { @@ -260,6 +261,16 @@ func NewAgentConfigMeta( Step: 1, Tags: config.Tags{Section: "AdvancedSettings"}, }, + { + Name: "parallel_jobs", + Label: "Parallel Jobs", + Type: "number", + DefaultValue: 5, + Min: 1, + Step: 1, + HelpText: "Number of concurrent tasks that can run in parallel", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, }, MCPServers: []config.Field{ { diff --git a/core/state/pool.go b/core/state/pool.go index 0f4f96a..3081556 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -466,6 +466,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps)) } + if config.ParallelJobs > 0 { + opts = append(opts, WithParallelJobs(config.ParallelJobs)) + } + xlog.Info("Starting agent", "name", name, "config", config) agent, err := New(opts...)