From aa62d9ef9ef6f97123f13b770e02109fd388a186 Mon Sep 17 00:00:00 2001 From: mudler Date: Sun, 31 Mar 2024 17:20:06 +0200 Subject: [PATCH] refactor --- action/definition.go | 58 +++++++++++ action/intention.go | 35 +++++++ agent/actions.go | 243 ++++--------------------------------------- agent/agent.go | 79 ++++++++++++-- agent/agent_test.go | 58 +++++++++++ agent/jobs.go | 113 +++++++++++++++++++- 6 files changed, 350 insertions(+), 236 deletions(-) create mode 100644 action/definition.go create mode 100644 action/intention.go create mode 100644 agent/agent_test.go diff --git a/action/definition.go b/action/definition.go new file mode 100644 index 0000000..5ed3aae --- /dev/null +++ b/action/definition.go @@ -0,0 +1,58 @@ +package action + +import ( + "context" + "encoding/json" + + "github.com/sashabaranov/go-openai" + "github.com/sashabaranov/go-openai/jsonschema" +) + +type ActionContext struct { + context.Context + cancelFunc context.CancelFunc +} + +func (ac *ActionContext) Cancel() { + ac.cancelFunc() +} + +func NewContext(ctx context.Context, cancel context.CancelFunc) *ActionContext { + return &ActionContext{ + Context: ctx, + cancelFunc: cancel, + } +} + +type ActionParams map[string]string + +func (ap ActionParams) Read(s string) error { + err := json.Unmarshal([]byte(s), &ap) + return err +} + +func (ap ActionParams) String() string { + b, _ := json.Marshal(ap) + return string(b) +} + +//type ActionDefinition openai.FunctionDefinition + +type ActionDefinition struct { + Properties map[string]jsonschema.Definition + Required []string + Name string + Description string +} + +func (a ActionDefinition) ToFunctionDefinition() openai.FunctionDefinition { + return openai.FunctionDefinition{ + Name: a.Name, + Description: a.Description, + Parameters: jsonschema.Definition{ + Type: jsonschema.Object, + Properties: a.Properties, + Required: a.Required, + }, + } +} diff --git a/action/intention.go b/action/intention.go new file mode 100644 index 0000000..4d2cce1 --- /dev/null +++ b/action/intention.go @@ -0,0 +1,35 @@ +package action + +import ( + "github.com/sashabaranov/go-openai/jsonschema" +) + +func NewIntention(s ...string) *IntentAction { + return &IntentAction{tools: s} +} + +type IntentAction struct { + tools []string +} + +func (a *IntentAction) Run(ActionParams) (string, error) { + return "no-op", nil +} + +func (a *IntentAction) Definition() ActionDefinition { + return ActionDefinition{ + Name: "intent", + Description: "detect user intent", + Properties: map[string]jsonschema.Definition{ + "reasoning": { + Type: jsonschema.String, + Description: "The city and state, e.g. San Francisco, CA", + }, + "tool": { + Type: jsonschema.String, + Enum: a.tools, + }, + }, + Required: []string{"tool", "reasoning"}, + } +} diff --git a/agent/actions.go b/agent/actions.go index 713f01a..f276e62 100644 --- a/agent/actions.go +++ b/agent/actions.go @@ -6,119 +6,32 @@ import ( "encoding/json" "fmt" "html/template" - "time" - //"github.com/mudler/local-agent-framework/llm" + "github.com/mudler/local-agent-framework/action" "github.com/sashabaranov/go-openai" - "github.com/sashabaranov/go-openai/jsonschema" ) -type ActionContext struct { - context.Context - cancelFunc context.CancelFunc -} - -type ActionParams map[string]string - -func (ap ActionParams) Read(s string) error { - err := json.Unmarshal([]byte(s), &ap) - return err -} - -func (ap ActionParams) String() string { - b, _ := json.Marshal(ap) - return string(b) -} - -//type ActionDefinition openai.FunctionDefinition - // Actions is something the agent can do type Action interface { - Run(ActionParams) (string, error) - Definition() ActionDefinition + Run(action.ActionParams) (string, error) + Definition() action.ActionDefinition } -type ActionDefinition struct { - Properties map[string]jsonschema.Definition - Required []string - Name string - Description string -} +type Actions []Action -func (a ActionDefinition) ToFunctionDefinition() openai.FunctionDefinition { - return openai.FunctionDefinition{ - Name: a.Name, - Description: a.Description, - Parameters: jsonschema.Definition{ - Type: jsonschema.Object, - Properties: a.Properties, - Required: a.Required, - }, +func (a Actions) ToTools() []openai.Tool { + tools := []openai.Tool{} + for _, action := range a { + tools = append(tools, openai.Tool{ + Type: openai.ToolTypeFunction, + Function: action.Definition().ToFunctionDefinition(), + }) } + return tools } -var ErrContextCanceled = fmt.Errorf("context canceled") - -func (a *Agent) Stop() { - a.Lock() - defer a.Unlock() - a.context.cancelFunc() -} - -func (a *Agent) Run() error { - // The agent run does two things: - // picks up requests from a queue - // and generates a response/perform actions - - // It is also preemptive. - // That is, it can interrupt the current action - // if another one comes in. - - // If there is no action, periodically evaluate if it has to do something on its own. - - // Expose a REST API to interact with the agent to ask it things - - fmt.Println("Agent is running") - clearConvTimer := time.NewTicker(1 * time.Minute) - for { - fmt.Println("Agent loop") - - select { - case job := <-a.jobQueue: - fmt.Println("job from the queue") - - // Consume the job and generate a response - // TODO: Give a short-term memory to the agent - a.consumeJob(job) - case <-a.context.Done(): - fmt.Println("Context canceled, agent is stopping...") - - // Agent has been canceled, return error - return ErrContextCanceled - case <-clearConvTimer.C: - fmt.Println("Removing chat history...") - - // TODO: decide to do something on its own with the conversation result - // before clearing it out - - // Clear the conversation - a.currentConversation = []openai.ChatCompletionMessage{} - } - } -} - -// StopAction stops the current action -// if any. Can be called before adding a new job. -func (a *Agent) StopAction() { - a.Lock() - defer a.Unlock() - if a.actionContext != nil { - a.actionContext.cancelFunc() - } -} - -func (a *Agent) decision(ctx context.Context, conversation []openai.ChatCompletionMessage, tools []openai.Tool, toolchoice any) (ActionParams, error) { +func (a *Agent) decision(ctx context.Context, conversation []openai.ChatCompletionMessage, tools []openai.Tool, toolchoice any) (action.ActionParams, error) { decision := openai.ChatCompletionRequest{ Model: a.options.LLMAPI.Model, Messages: conversation, @@ -137,7 +50,7 @@ func (a *Agent) decision(ctx context.Context, conversation []openai.ChatCompleti return nil, fmt.Errorf("len(toolcalls): %v", len(msg.ToolCalls)) } - params := ActionParams{} + params := action.ActionParams{} if err := params.Read(msg.ToolCalls[0].Function.Arguments); err != nil { fmt.Println("can't read params", err) @@ -147,20 +60,7 @@ func (a *Agent) decision(ctx context.Context, conversation []openai.ChatCompleti return params, nil } -type Actions []Action - -func (a Actions) ToTools() []openai.Tool { - tools := []openai.Tool{} - for _, action := range a { - tools = append(tools, openai.Tool{ - Type: openai.ToolTypeFunction, - Function: action.Definition().ToFunctionDefinition(), - }) - } - return tools -} - -func (a *Agent) generateParameters(ctx context.Context, action Action, conversation []openai.ChatCompletionMessage) (ActionParams, error) { +func (a *Agent) generateParameters(ctx context.Context, action Action, conversation []openai.ChatCompletionMessage) (action.ActionParams, error) { return a.decision(ctx, conversation, a.options.actions.ToTools(), action.Definition().Name) } @@ -184,12 +84,12 @@ func (a *Agent) pickAction(ctx context.Context, messages []openai.ChatCompletion if err != nil { return nil, err } - definitions := []ActionDefinition{} + definitions := []action.ActionDefinition{} for _, m := range a.options.actions { definitions = append(definitions, m.Definition()) } err = tmpl.Execute(prompt, struct { - Actions []ActionDefinition + Actions []action.ActionDefinition Messages []openai.ChatCompletionMessage }{ Actions: definitions, @@ -205,7 +105,7 @@ func (a *Agent) pickAction(ctx context.Context, messages []openai.ChatCompletion for _, m := range a.options.actions { actionsID = append(actionsID, m.Definition().Name) } - intentionsTools := NewIntention(actionsID...) + intentionsTools := action.NewIntention(actionsID...) conversation := []openai.ChatCompletionMessage{ { @@ -254,110 +154,3 @@ func (a *Agent) pickAction(ctx context.Context, messages []openai.ChatCompletion return action, nil } - -func (a *Agent) consumeJob(job *Job) { - - // Consume the job and generate a response - a.Lock() - // Set the action context - ctx, cancel := context.WithCancel(context.Background()) - a.actionContext = &ActionContext{ - Context: ctx, - cancelFunc: cancel, - } - a.Unlock() - - if job.Image != "" { - // TODO: Use llava to explain the image content - } - - if job.Text == "" { - fmt.Println("no text!") - return - } - - messages := a.currentConversation - if job.Text != "" { - messages = append(messages, openai.ChatCompletionMessage{ - Role: "user", - Content: job.Text, - }) - } - - chosenAction, err := a.pickAction(ctx, messages) - if err != nil { - fmt.Printf("error picking action: %v\n", err) - return - } - params, err := a.generateParameters(ctx, chosenAction, messages) - if err != nil { - fmt.Printf("error generating parameters: %v\n", err) - return - } - - var result string - for _, action := range a.options.actions { - fmt.Println("Checking action: ", action.Definition().Name, chosenAction.Definition().Name) - if action.Definition().Name == chosenAction.Definition().Name { - fmt.Printf("Running action: %v\n", action.Definition().Name) - if result, err = action.Run(params); err != nil { - fmt.Printf("error running action: %v\n", err) - return - } - } - } - fmt.Printf("Action run result: %v\n", result) - - // calling the function - messages = append(messages, openai.ChatCompletionMessage{ - Role: "assistant", - FunctionCall: &openai.FunctionCall{ - Name: chosenAction.Definition().Name, - Arguments: params.String(), - }, - }) - - // result of calling the function - messages = append(messages, openai.ChatCompletionMessage{ - Role: openai.ChatMessageRoleTool, - Content: result, - Name: chosenAction.Definition().Name, - ToolCallID: chosenAction.Definition().Name, - }) - - resp, err := a.client.CreateChatCompletion(ctx, - openai.ChatCompletionRequest{ - Model: a.options.LLMAPI.Model, - Messages: messages, - // Tools: tools, - }, - ) - if err != nil || len(resp.Choices) != 1 { - fmt.Printf("2nd completion error: err:%v len(choices):%v\n", err, - len(resp.Choices)) - return - } - - // display OpenAI's response to the original question utilizing our function - msg := resp.Choices[0].Message - fmt.Printf("OpenAI answered the original request with: %v\n", - msg.Content) - - messages = append(messages, msg) - a.currentConversation = append(a.currentConversation, messages...) - - if len(msg.ToolCalls) != 0 { - fmt.Printf("OpenAI wants to call again functions: %v\n", msg) - // wants to call again an action (?) - job.Text = "" // Call the job with the current conversation - job.Result.SetResult(result) - a.jobQueue <- job - return - } - - // perform the action (if any) - // or reply with a result - // if there is an action... - job.Result.SetResult(result) - job.Result.Finish() -} diff --git a/agent/agent.go b/agent/agent.go index bf032d2..06f59ec 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "sync" + "time" + "github.com/mudler/local-agent-framework/action" "github.com/mudler/local-agent-framework/llm" "github.com/sashabaranov/go-openai" ) @@ -15,8 +17,8 @@ type Agent struct { Character Character client *openai.Client jobQueue chan *Job - actionContext *ActionContext - context *ActionContext + actionContext *action.ActionContext + context *action.ActionContext availableActions []Action currentConversation []openai.ChatCompletionMessage @@ -40,14 +42,11 @@ func New(opts ...Option) (*Agent, error) { ctx, cancel := context.WithCancel(c) a := &Agent{ - jobQueue: make(chan *Job), - options: options, - client: client, - Character: options.character, - context: &ActionContext{ - Context: ctx, - cancelFunc: cancel, - }, + jobQueue: make(chan *Job), + options: options, + client: client, + Character: options.character, + context: action.NewContext(ctx, cancel), availableActions: options.actions, } @@ -71,3 +70,63 @@ func (a *Agent) Ask(text, image string) []string { return j.Result.WaitResult() } + +var ErrContextCanceled = fmt.Errorf("context canceled") + +func (a *Agent) Stop() { + a.Lock() + defer a.Unlock() + a.context.Cancel() +} + +func (a *Agent) Run() error { + // The agent run does two things: + // picks up requests from a queue + // and generates a response/perform actions + + // It is also preemptive. + // That is, it can interrupt the current action + // if another one comes in. + + // If there is no action, periodically evaluate if it has to do something on its own. + + // Expose a REST API to interact with the agent to ask it things + + fmt.Println("Agent is running") + clearConvTimer := time.NewTicker(1 * time.Minute) + for { + fmt.Println("Agent loop") + + select { + case job := <-a.jobQueue: + fmt.Println("job from the queue") + + // Consume the job and generate a response + // TODO: Give a short-term memory to the agent + a.consumeJob(job) + case <-a.context.Done(): + fmt.Println("Context canceled, agent is stopping...") + + // Agent has been canceled, return error + return ErrContextCanceled + case <-clearConvTimer.C: + fmt.Println("Removing chat history...") + + // TODO: decide to do something on its own with the conversation result + // before clearing it out + + // Clear the conversation + a.currentConversation = []openai.ChatCompletionMessage{} + } + } +} + +// StopAction stops the current action +// if any. Can be called before adding a new job. +func (a *Agent) StopAction() { + a.Lock() + defer a.Unlock() + if a.actionContext != nil { + a.actionContext.Cancel() + } +} diff --git a/agent/agent_test.go b/agent/agent_test.go new file mode 100644 index 0000000..d088f57 --- /dev/null +++ b/agent/agent_test.go @@ -0,0 +1,58 @@ +package agent_test + +import ( + "fmt" + + "github.com/mudler/local-agent-framework/action" + . "github.com/mudler/local-agent-framework/agent" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sashabaranov/go-openai/jsonschema" +) + +const testActionResult = "It's going to be windy" + +var _ Action = &TestAction{} + +type TestAction struct{} + +func (a *TestAction) Run(action.ActionParams) (string, error) { + return testActionResult, nil +} + +func (a *TestAction) Definition() action.ActionDefinition { + return action.ActionDefinition{ + Name: "get_weather", + Description: "get current weather", + Properties: map[string]jsonschema.Definition{ + "location": { + Type: jsonschema.String, + Description: "The city and state, e.g. San Francisco, CA", + }, + "unit": { + Type: jsonschema.String, + Enum: []string{"celsius", "fahrenheit"}, + }, + }, + + Required: []string{"location"}, + } +} + +var _ = Describe("Agent test", func() { + Context("jobs", func() { + FIt("pick the correct action", func() { + agent, err := New( + WithLLMAPIURL(apiModel), + WithModel(testModel), + // WithRandomIdentity(), + WithActions(&TestAction{}), + ) + Expect(err).ToNot(HaveOccurred()) + go agent.Run() + defer agent.Stop() + res := agent.Ask("can you get the weather in boston?", "") + Expect(res).To(ContainElement(testActionResult), fmt.Sprint(res)) + }) + }) +}) diff --git a/agent/jobs.go b/agent/jobs.go index f7ccdeb..8cc83f3 100644 --- a/agent/jobs.go +++ b/agent/jobs.go @@ -1,6 +1,13 @@ package agent -import "sync" +import ( + "context" + "fmt" + "sync" + + "github.com/mudler/local-agent-framework/action" + "github.com/sashabaranov/go-openai" +) // Job is a request to the agent to do something type Job struct { @@ -62,3 +69,107 @@ func (j *JobResult) WaitResult() []string { defer j.Unlock() return j.Data } + +func (a *Agent) consumeJob(job *Job) { + + // Consume the job and generate a response + a.Lock() + // Set the action context + ctx, cancel := context.WithCancel(context.Background()) + a.actionContext = action.NewContext(ctx, cancel) + a.Unlock() + + if job.Image != "" { + // TODO: Use llava to explain the image content + } + + if job.Text == "" { + fmt.Println("no text!") + return + } + + messages := a.currentConversation + if job.Text != "" { + messages = append(messages, openai.ChatCompletionMessage{ + Role: "user", + Content: job.Text, + }) + } + + chosenAction, err := a.pickAction(ctx, messages) + if err != nil { + fmt.Printf("error picking action: %v\n", err) + return + } + params, err := a.generateParameters(ctx, chosenAction, messages) + if err != nil { + fmt.Printf("error generating parameters: %v\n", err) + return + } + + var result string + for _, action := range a.options.actions { + fmt.Println("Checking action: ", action.Definition().Name, chosenAction.Definition().Name) + if action.Definition().Name == chosenAction.Definition().Name { + fmt.Printf("Running action: %v\n", action.Definition().Name) + if result, err = action.Run(params); err != nil { + fmt.Printf("error running action: %v\n", err) + return + } + } + } + fmt.Printf("Action run result: %v\n", result) + + // calling the function + messages = append(messages, openai.ChatCompletionMessage{ + Role: "assistant", + FunctionCall: &openai.FunctionCall{ + Name: chosenAction.Definition().Name, + Arguments: params.String(), + }, + }) + + // result of calling the function + messages = append(messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleTool, + Content: result, + Name: chosenAction.Definition().Name, + ToolCallID: chosenAction.Definition().Name, + }) + + resp, err := a.client.CreateChatCompletion(ctx, + openai.ChatCompletionRequest{ + Model: a.options.LLMAPI.Model, + Messages: messages, + // Tools: tools, + }, + ) + if err != nil || len(resp.Choices) != 1 { + fmt.Printf("2nd completion error: err:%v len(choices):%v\n", err, + len(resp.Choices)) + return + } + + // display OpenAI's response to the original question utilizing our function + msg := resp.Choices[0].Message + fmt.Printf("OpenAI answered the original request with: %v\n", + msg.Content) + + messages = append(messages, msg) + a.currentConversation = append(a.currentConversation, messages...) + + if len(msg.ToolCalls) != 0 { + fmt.Printf("OpenAI wants to call again functions: %v\n", msg) + // wants to call again an action (?) + job.Text = "" // Call the job with the current conversation + job.Result.SetResult(result) + a.jobQueue <- job + return + } + + // perform the action (if any) + // or reply with a result + // if there is an action... + job.Result.SetResult(result) + job.Result.Finish() +}