feat: allow the agent to perform things concurrently (#74)
* feat: allow the agent to perform things concurrently Signed-off-by: mudler <mudler@localai.io> * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * collect errors Signed-off-by: mudler <mudler@localai.io> --------- Signed-off-by: mudler <mudler@localai.io> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
9e52438877
commit
25bb3fb123
@@ -2,6 +2,7 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
@@ -1004,36 +1005,63 @@ func (a *Agent) Run() error {
|
||||
|
||||
// Expose a REST API to interact with the agent to ask it things
|
||||
|
||||
//todoTimer := time.NewTicker(a.options.periodicRuns)
|
||||
timer := time.NewTimer(a.options.periodicRuns)
|
||||
|
||||
// we fire the periodicalRunner only once.
|
||||
go a.periodicalRunRunner(timer)
|
||||
var errs []error
|
||||
var muErr sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i <= a.options.parallelJobs; i++ {
|
||||
xlog.Debug("Starting agent worker", "worker", i)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
e := a.run(timer)
|
||||
muErr.Lock()
|
||||
errs = append(errs, e)
|
||||
muErr.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (a *Agent) run(timer *time.Timer) error {
|
||||
for {
|
||||
xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name)
|
||||
select {
|
||||
case job := <-a.jobQueue:
|
||||
a.loop(timer, job)
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
|
||||
a.consumeJob(job, UserRole)
|
||||
timer.Reset(a.options.periodicRuns)
|
||||
case <-a.context.Done():
|
||||
// Agent has been canceled, return error
|
||||
xlog.Warn("Agent has been canceled", "agent", a.Character.Name)
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) periodicalRunRunner(timer *time.Timer) {
|
||||
for {
|
||||
select {
|
||||
case <-a.context.Done():
|
||||
// Agent has been canceled, return error
|
||||
xlog.Warn("periodicalRunner has been canceled", "agent", a.Character.Name)
|
||||
return
|
||||
case <-timer.C:
|
||||
a.periodicallyRun(timer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) loop(timer *time.Timer, job *types.Job) {
|
||||
// Remember always to reset the timer - if we don't the agent will stop..
|
||||
defer timer.Reset(a.options.periodicRuns)
|
||||
// Consume the job and generate a response
|
||||
// TODO: Give a short-term memory to the agent
|
||||
// stop and drain the timer
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
|
||||
a.consumeJob(job, UserRole)
|
||||
}
|
||||
|
||||
func (a *Agent) Observer() Observer {
|
||||
return a.observer
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user