From 7c679ead94981547ca7676b86de04a0e21219a33 Mon Sep 17 00:00:00 2001 From: mudler Date: Mon, 1 Apr 2024 20:02:25 +0200 Subject: [PATCH] use options --- agent/agent.go | 15 ++-------- agent/agent_test.go | 11 ++++++-- agent/jobs.go | 69 ++++++++++++++++++++++++++++++++------------- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 84a89e0..ff01696 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -63,12 +63,11 @@ func New(opts ...Option) (*Agent, error) { // Ask is a pre-emptive, blocking call that returns the response as soon as it's ready. // It discards any other computation. -func (a *Agent) Ask(text, image string) []string { +func (a *Agent) Ask(opts ...JobOption) []string { //a.StopAction() - j := NewJob(text, image) - fmt.Println("Job created", text) + j := NewJob(opts...) + // fmt.Println("Job created", text) a.jobQueue <- j - fmt.Println("Waiting for result") return j.Result.WaitResult() } @@ -94,26 +93,18 @@ func (a *Agent) Run() error { // Expose a REST API to interact with the agent to ask it things - fmt.Println("Agent is running") clearConvTimer := time.NewTicker(1 * time.Minute) for { - fmt.Println("Agent loop") - select { case job := <-a.jobQueue: - fmt.Println("job from the queue") // Consume the job and generate a response // TODO: Give a short-term memory to the agent a.consumeJob(job) case <-a.context.Done(): - fmt.Println("Context canceled, agent is stopping...") - // Agent has been canceled, return error return ErrContextCanceled case <-clearConvTimer.C: - fmt.Println("Removing chat history...") - // TODO: decide to do something on its own with the conversation result // before clearing it out diff --git a/agent/agent_test.go b/agent/agent_test.go index 910a086..aa5af1e 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -61,7 +61,12 @@ var _ = Describe("Agent test", func() { Expect(err).ToNot(HaveOccurred()) go agent.Run() defer agent.Stop() - res := agent.Ask("can you get the weather in boston, and afterward of Milano, Italy?", "") + res := agent.Ask( + WithReasoningCallback(func(a Action, ap action.ActionParams, s string) { + fmt.Println("Reasoning", s) + }), + WithText("can you get the weather in boston, and afterward of Milano, Italy?"), + ) Expect(res).To(ContainElement(testActionResult), fmt.Sprint(res)) Expect(res).To(ContainElement(testActionResult2), fmt.Sprint(res)) }) @@ -75,7 +80,9 @@ var _ = Describe("Agent test", func() { Expect(err).ToNot(HaveOccurred()) go agent.Run() defer agent.Stop() - res := agent.Ask("can you get the weather in boston?", "") + res := agent.Ask( + WithText("can you get the weather in boston?"), + ) Expect(res).To(ContainElement(testActionResult), fmt.Sprint(res)) }) }) diff --git a/agent/jobs.go b/agent/jobs.go index e97d019..5f463de 100644 --- a/agent/jobs.go +++ b/agent/jobs.go @@ -14,52 +14,82 @@ type Job struct { // The job is a request to the agent to do something // It can be a question, a command, or a request to do something // The agent will try to do it, and return a response - Text string - Image string // base64 encoded image - Result *JobResult + Text string + Image string // base64 encoded image + Result *JobResult + reasoningCallback func(Action, action.ActionParams, string) + resultCallback func(Action, action.ActionParams, string, string) } // JobResult is the result of a job type JobResult struct { sync.Mutex // The result of a job - Data []string - reasoningCallback func(Action, action.ActionParams, string) - resultCallback func(Action, action.ActionParams, string, string) - ready chan bool + Data []string + Error error + ready chan bool +} + +type JobOption func(*Job) + +func WithReasoningCallback(f func(Action, action.ActionParams, string)) JobOption { + return func(r *Job) { + r.reasoningCallback = f + } +} + +func WithResultCallback(f func(Action, action.ActionParams, string, string)) JobOption { + return func(r *Job) { + r.resultCallback = f + } } // NewJobResult creates a new job result func NewJobResult() *JobResult { - return &JobResult{ + r := &JobResult{ ready: make(chan bool), } + return r } -func (j *JobResult) Callback(a Action, p action.ActionParams, s string) { +func (j *Job) Callback(a Action, p action.ActionParams, s string) { if j.reasoningCallback == nil { return } j.reasoningCallback(a, p, s) } -func (j *JobResult) CallbackWithResult(a Action, p action.ActionParams, s, r string) { +func (j *Job) CallbackWithResult(a Action, p action.ActionParams, s, r string) { if j.resultCallback == nil { return } j.resultCallback(a, p, s, r) } +func WithImage(image string) JobOption { + return func(j *Job) { + j.Image = image + } +} + +func WithText(text string) JobOption { + return func(j *Job) { + j.Text = text + } +} + // NewJob creates a new job // It is a request to the agent to do something // It has a JobResult to get the result asynchronously // To wait for a Job result, use JobResult.WaitResult() -func NewJob(text, image string) *Job { - return &Job{ - Text: text, - Image: image, +func NewJob(opts ...JobOption) *Job { + j := &Job{ Result: NewJobResult(), } + for _, o := range opts { + o(j) + } + return j } // SetResult sets the result of a job @@ -71,10 +101,11 @@ func (j *JobResult) SetResult(text string) { } // SetResult sets the result of a job -func (j *JobResult) Finish() { +func (j *JobResult) Finish(e error) { j.Lock() defer j.Unlock() + j.Error = e close(j.ready) } @@ -165,7 +196,7 @@ func (a *Agent) consumeJob(job *Job) { return } - job.Result.Callback(chosenAction, params.actionParams, reasoning) + job.Callback(chosenAction, params.actionParams, reasoning) if params.actionParams == nil { fmt.Println("no parameters") @@ -186,7 +217,7 @@ func (a *Agent) consumeJob(job *Job) { } fmt.Printf("Action run result: %v\n", result) job.Result.SetResult(result) - job.Result.CallbackWithResult(chosenAction, params.actionParams, reasoning, result) + job.CallbackWithResult(chosenAction, params.actionParams, reasoning, result) // calling the function messages = append(messages, openai.ChatCompletionMessage{ @@ -220,8 +251,6 @@ func (a *Agent) consumeJob(job *Job) { } else if !chosenAction.Definition().Name.Is(action.ReplyActionName) { // We need to do another action (?) // The agent decided to do another action - fmt.Println("Another action to do: ", followingAction.Definition().Name) - fmt.Println("Reasoning: ", reasoning) // call ourselves again a.currentReasoning = reasoning a.nextAction = followingAction @@ -249,5 +278,5 @@ func (a *Agent) consumeJob(job *Job) { msg.Content) a.currentConversation = append(a.currentConversation, msg) - job.Result.Finish() + job.Result.Finish(nil) }