Compare commits
3 Commits
obs-fixes
...
chore/para
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47492f890f | ||
|
|
ddac344147 | ||
|
|
25bb3fb123 |
@@ -2,6 +2,7 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
@@ -989,7 +990,6 @@ func (a *Agent) periodicallyRun(timer *time.Timer) {
|
||||
}
|
||||
|
||||
func (a *Agent) Run() error {
|
||||
|
||||
a.startNewConversationsConsumer()
|
||||
xlog.Debug("Agent is now running", "agent", a.Character.Name)
|
||||
// The agent run does two things:
|
||||
@@ -1004,36 +1004,68 @@ func (a *Agent) Run() error {
|
||||
|
||||
// Expose a REST API to interact with the agent to ask it things
|
||||
|
||||
//todoTimer := time.NewTicker(a.options.periodicRuns)
|
||||
timer := time.NewTimer(a.options.periodicRuns)
|
||||
|
||||
// we fire the periodicalRunner only once.
|
||||
go a.periodicalRunRunner(timer)
|
||||
var errs []error
|
||||
var muErr sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
parallelJobs := a.options.parallelJobs
|
||||
if a.options.parallelJobs == 0 {
|
||||
parallelJobs = 1
|
||||
}
|
||||
|
||||
for i := 0; i < parallelJobs; i++ {
|
||||
xlog.Debug("Starting agent worker", "worker", i)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
e := a.run(timer)
|
||||
muErr.Lock()
|
||||
errs = append(errs, e)
|
||||
muErr.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (a *Agent) run(timer *time.Timer) error {
|
||||
for {
|
||||
xlog.Debug("Agent is now waiting for a new job", "agent", a.Character.Name)
|
||||
select {
|
||||
case job := <-a.jobQueue:
|
||||
a.loop(timer, job)
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
|
||||
a.consumeJob(job, UserRole)
|
||||
timer.Reset(a.options.periodicRuns)
|
||||
case <-a.context.Done():
|
||||
// Agent has been canceled, return error
|
||||
xlog.Warn("Agent has been canceled", "agent", a.Character.Name)
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) periodicalRunRunner(timer *time.Timer) {
|
||||
for {
|
||||
select {
|
||||
case <-a.context.Done():
|
||||
// Agent has been canceled, return error
|
||||
xlog.Warn("periodicalRunner has been canceled", "agent", a.Character.Name)
|
||||
return
|
||||
case <-timer.C:
|
||||
a.periodicallyRun(timer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) loop(timer *time.Timer, job *types.Job) {
|
||||
// Remember always to reset the timer - if we don't the agent will stop..
|
||||
defer timer.Reset(a.options.periodicRuns)
|
||||
// Consume the job and generate a response
|
||||
// TODO: Give a short-term memory to the agent
|
||||
// stop and drain the timer
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
|
||||
a.consumeJob(job, UserRole)
|
||||
}
|
||||
|
||||
func (a *Agent) Observer() Observer {
|
||||
return a.observer
|
||||
}
|
||||
|
||||
@@ -54,7 +54,8 @@ type options struct {
|
||||
|
||||
newConversationsSubscribers []func(openai.ChatCompletionMessage)
|
||||
|
||||
observer Observer
|
||||
observer Observer
|
||||
parallelJobs int
|
||||
}
|
||||
|
||||
func (o *options) SeparatedMultimodalModel() bool {
|
||||
@@ -63,6 +64,7 @@ func (o *options) SeparatedMultimodalModel() bool {
|
||||
|
||||
func defaultOptions() *options {
|
||||
return &options{
|
||||
parallelJobs: 1,
|
||||
periodicRuns: 15 * time.Minute,
|
||||
LLMAPI: llmOptions{
|
||||
APIURL: "http://localhost:8080",
|
||||
@@ -138,6 +140,13 @@ func EnableKnowledgeBaseWithResults(results int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithParallelJobs(jobs int) Option {
|
||||
return func(o *options) error {
|
||||
o.parallelJobs = jobs
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithNewConversationSubscriber(sub func(openai.ChatCompletionMessage)) Option {
|
||||
return func(o *options) error {
|
||||
o.newConversationsSubscribers = append(o.newConversationsSubscribers, sub)
|
||||
|
||||
@@ -25,6 +25,7 @@ var _ = Describe("Agent test", func() {
|
||||
agent, err = New(
|
||||
WithLLMAPIURL(apiURL),
|
||||
WithModel(testModel),
|
||||
WithTimeout("10m"),
|
||||
WithRandomIdentity(),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
@@ -61,6 +61,7 @@ type AgentConfig struct {
|
||||
SystemPrompt string `json:"system_prompt" form:"system_prompt"`
|
||||
LongTermMemory bool `json:"long_term_memory" form:"long_term_memory"`
|
||||
SummaryLongTermMemory bool `json:"summary_long_term_memory" form:"summary_long_term_memory"`
|
||||
ParallelJobs int `json:"parallel_jobs" form:"parallel_jobs"`
|
||||
}
|
||||
|
||||
type AgentConfigMeta struct {
|
||||
@@ -260,6 +261,16 @@ func NewAgentConfigMeta(
|
||||
Step: 1,
|
||||
Tags: config.Tags{Section: "AdvancedSettings"},
|
||||
},
|
||||
{
|
||||
Name: "parallel_jobs",
|
||||
Label: "Parallel Jobs",
|
||||
Type: "number",
|
||||
DefaultValue: 5,
|
||||
Min: 1,
|
||||
Step: 1,
|
||||
HelpText: "Number of concurrent tasks that can run in parallel",
|
||||
Tags: config.Tags{Section: "AdvancedSettings"},
|
||||
},
|
||||
},
|
||||
MCPServers: []config.Field{
|
||||
{
|
||||
|
||||
@@ -466,6 +466,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
|
||||
opts = append(opts, WithLoopDetectionSteps(config.LoopDetectionSteps))
|
||||
}
|
||||
|
||||
if config.ParallelJobs > 0 {
|
||||
opts = append(opts, WithParallelJobs(config.ParallelJobs))
|
||||
}
|
||||
|
||||
xlog.Info("Starting agent", "name", name, "config", config)
|
||||
|
||||
agent, err := New(opts...)
|
||||
|
||||
Reference in New Issue
Block a user