diff --git a/core/agent/agent.go b/core/agent/agent.go index 2caa50d..44b91eb 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "fmt" "os" "sync" @@ -1004,20 +1005,29 @@ func (a *Agent) Run() error { // Expose a REST API to interact with the agent to ask it things - //todoTimer := time.NewTicker(a.options.periodicRuns) timer := time.NewTimer(a.options.periodicRuns) - if a.options.parallelJobs > 1 { - // we fire the periodicalRunner only once. - go a.periodicalRunRunner(timer) - for i := 0; i < a.options.parallelJobs; i++ { - go a.run(timer) - } - select {} - } else { - go a.periodicalRunRunner(timer) - return a.run(timer) + // we fire the periodicalRunner only once. + go a.periodicalRunRunner(timer) + var errs []error + var muErr sync.Mutex + var wg sync.WaitGroup + + for i := 0; i <= a.options.parallelJobs; i++ { + xlog.Debug("Starting agent worker", "worker", i) + wg.Add(1) + go func() { + e := a.run(timer) + muErr.Lock() + errs = append(errs, e) + muErr.Unlock() + wg.Done() + }() } + + wg.Wait() + + return errors.Join(errs...) } func (a *Agent) run(timer *time.Timer) error { @@ -1041,11 +1051,10 @@ func (a *Agent) run(timer *time.Timer) error { func (a *Agent) periodicalRunRunner(timer *time.Timer) { for { - xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name) select { case <-a.context.Done(): // Agent has been canceled, return error - xlog.Warn("Agent has been canceled", "agent", a.Character.Name) + xlog.Warn("periodicalRunner has been canceled", "agent", a.Character.Name) return case <-timer.C: a.periodicallyRun(timer)