diff --git a/core/agent/agent.go b/core/agent/agent.go index 54a328a..9061f8f 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -1006,34 +1006,53 @@ func (a *Agent) Run() error { //todoTimer := time.NewTicker(a.options.periodicRuns) timer := time.NewTimer(a.options.periodicRuns) + + if a.options.parallelJobs > 1 { + // we fire the periodicalRunner only once. + go a.periodicalRunRunner(timer) + for range a.options.parallelJobs { + go a.run(timer) + } + select {} + } else { + go a.periodicalRunRunner(timer) + return a.run(timer) + } +} + +func (a *Agent) run(timer *time.Timer) error { for { xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name) select { case job := <-a.jobQueue: - a.loop(timer, job) + if !timer.Stop() { + <-timer.C + } + xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job) + a.consumeJob(job, UserRole) + timer.Reset(a.options.periodicRuns) case <-a.context.Done(): // Agent has been canceled, return error xlog.Warn("Agent has been canceled", "agent", a.Character.Name) return ErrContextCanceled + } + } +} + +func (a *Agent) periodicalRunRunner(timer *time.Timer) { + for { + xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name) + select { + case <-a.context.Done(): + // Agent has been canceled, return error + xlog.Warn("Agent has been canceled", "agent", a.Character.Name) + return case <-timer.C: a.periodicallyRun(timer) } } } -func (a *Agent) loop(timer *time.Timer, job *types.Job) { - // Remember always to reset the timer - if we don't the agent will stop.. - defer timer.Reset(a.options.periodicRuns) - // Consume the job and generate a response - // TODO: Give a short-term memory to the agent - // stop and drain the timer - if !timer.Stop() { - <-timer.C - } - xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job) - a.consumeJob(job, UserRole) -} - func (a *Agent) Observer() Observer { return a.observer } diff --git a/core/agent/options.go b/core/agent/options.go index d8a2b67..d27896a 100644 --- a/core/agent/options.go +++ b/core/agent/options.go @@ -54,7 +54,8 @@ type options struct { newConversationsSubscribers []func(openai.ChatCompletionMessage) - observer Observer + observer Observer + parallelJobs int } func (o *options) SeparatedMultimodalModel() bool { @@ -138,6 +139,13 @@ func EnableKnowledgeBaseWithResults(results int) Option { } } +func WithParallelJobs(jobs int) Option { + return func(o *options) error { + o.parallelJobs = jobs + return nil + } +} + func WithNewConversationSubscriber(sub func(openai.ChatCompletionMessage)) Option { return func(o *options) error { o.newConversationsSubscribers = append(o.newConversationsSubscribers, sub) diff --git a/core/state/config.go b/core/state/config.go index 2d7a110..c510384 100644 --- a/core/state/config.go +++ b/core/state/config.go @@ -61,6 +61,7 @@ type AgentConfig struct { SystemPrompt string `json:"system_prompt" form:"system_prompt"` LongTermMemory bool `json:"long_term_memory" form:"long_term_memory"` SummaryLongTermMemory bool `json:"summary_long_term_memory" form:"summary_long_term_memory"` + ParallelJobs int `json:"parallel_jobs" form:"parallel_jobs"` } type AgentConfigMeta struct { @@ -260,6 +261,16 @@ func NewAgentConfigMeta( Step: 1, Tags: config.Tags{Section: "AdvancedSettings"}, }, + { + Name: "parallel_jobs", + Label: "Parallel Jobs", + Type: "number", + DefaultValue: 5, + Min: 1, + Step: 1, + HelpText: "Number of concurrent tasks that can run in parallel", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, }, MCPServers: []config.Field{ { diff --git a/core/state/pool.go b/core/state/pool.go index 0f4f96a..3081556 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -466,6 +466,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps)) } + if config.ParallelJobs > 0 { + opts = append(opts, WithParallelJobs(config.ParallelJobs)) + } + xlog.Info("Starting agent", "name", name, "config", config) agent, err := New(opts...)