feat(ui): Add structured observability events

Signed-off-by: Richard Palethorpe <io@richiejp.com>
This commit is contained in:
Richard Palethorpe
2025-04-15 06:24:20 +01:00
parent 30cf87affe
commit 8090deb6f9
15 changed files with 726 additions and 133 deletions

View File

@@ -2,7 +2,6 @@ package action
import (
"context"
"fmt"
"github.com/mudler/LocalAGI/core/types"
"github.com/sashabaranov/go-openai/jsonschema"
@@ -16,24 +15,6 @@ func NewState() *StateAction {
type StateAction struct{}
// State is the structure
// that is used to keep track of the current state
// and the Agent's short memory that it can update
// Besides a long term memory that is accessible by the agent (With vector database),
// And a context memory (that is always powered by a vector database),
// this memory is the shorter one that the LLM keeps across conversation and across its
// reasoning process's and life time.
// TODO: A special action is then used to let the LLM itself update its memory
// periodically during self-processing, and the same action is ALSO exposed
// during the conversation to let the user put for example, a new goal to the agent.
type AgentInternalState struct {
NowDoing string `json:"doing_now"`
DoingNext string `json:"doing_next"`
DoneHistory []string `json:"done_history"`
Memories []string `json:"memories"`
Goal string `json:"goal"`
}
func (a *StateAction) Run(context.Context, types.ActionParams) (types.ActionResult, error) {
return types.ActionResult{Result: "internal state has been updated"}, nil
}
@@ -76,23 +57,3 @@ func (a *StateAction) Definition() types.ActionDefinition {
},
}
}
const fmtT = `=====================
NowDoing: %s
DoingNext: %s
Your current goal is: %s
You have done: %+v
You have a short memory with: %+v
=====================
`
func (c AgentInternalState) String() string {
return fmt.Sprintf(
fmtT,
c.NowDoing,
c.DoingNext,
c.Goal,
c.DoneHistory,
c.Memories,
)
}

View File

@@ -22,7 +22,7 @@ type decisionResult struct {
// decision forces the agent to take one of the available actions
func (a *Agent) decision(
ctx context.Context,
job *types.Job,
conversation []openai.ChatCompletionMessage,
tools []openai.Tool, toolchoice string, maxRetries int) (*decisionResult, error) {
@@ -35,31 +35,63 @@ func (a *Agent) decision(
}
}
decision := openai.ChatCompletionRequest{
Model: a.options.LLMAPI.Model,
Messages: conversation,
Tools: tools,
}
if choice != nil {
decision.ToolChoice = *choice
}
var obs *types.Observable
if job.Obs != nil {
obs = a.observer.NewObservable()
obs.Name = "decision"
obs.ParentID = job.Obs.ID
obs.Icon = "brain"
obs.Creation = &types.Creation{
ChatCompletionRequest: &decision,
}
a.observer.Update(*obs)
}
var lastErr error
for attempts := 0; attempts < maxRetries; attempts++ {
decision := openai.ChatCompletionRequest{
Model: a.options.LLMAPI.Model,
Messages: conversation,
Tools: tools,
}
if choice != nil {
decision.ToolChoice = *choice
}
resp, err := a.client.CreateChatCompletion(ctx, decision)
resp, err := a.client.CreateChatCompletion(job.GetContext(), decision)
if err != nil {
lastErr = err
xlog.Warn("Attempt to make a decision failed", "attempt", attempts+1, "error", err)
if obs != nil {
obs.Progress = append(obs.Progress, types.Progress{
Error: err.Error(),
})
a.observer.Update(*obs)
}
continue
}
jsonResp, _ := json.Marshal(resp)
xlog.Debug("Decision response", "response", string(jsonResp))
if obs != nil {
obs.AddProgress(types.Progress{
ChatCompletionResponse: &resp,
})
}
if len(resp.Choices) != 1 {
lastErr = fmt.Errorf("no choices: %d", len(resp.Choices))
xlog.Warn("Attempt to make a decision failed", "attempt", attempts+1, "error", lastErr)
if obs != nil {
obs.Progress[len(obs.Progress)-1].Error = lastErr.Error()
a.observer.Update(*obs)
}
continue
}
@@ -68,6 +100,12 @@ func (a *Agent) decision(
if err := a.saveConversation(append(conversation, msg), "decision"); err != nil {
xlog.Error("Error saving conversation", "error", err)
}
if obs != nil {
obs.MakeLastProgressCompletion()
a.observer.Update(*obs)
}
return &decisionResult{message: msg.Content}, nil
}
@@ -75,6 +113,12 @@ func (a *Agent) decision(
if err := params.Read(msg.ToolCalls[0].Function.Arguments); err != nil {
lastErr = err
xlog.Warn("Attempt to parse action parameters failed", "attempt", attempts+1, "error", err)
if obs != nil {
obs.Progress[len(obs.Progress)-1].Error = lastErr.Error()
a.observer.Update(*obs)
}
continue
}
@@ -82,6 +126,11 @@ func (a *Agent) decision(
xlog.Error("Error saving conversation", "error", err)
}
if obs != nil {
obs.MakeLastProgressCompletion()
a.observer.Update(*obs)
}
return &decisionResult{actionParams: params, actioName: msg.ToolCalls[0].Function.Name, message: msg.Content}, nil
}
@@ -173,7 +222,7 @@ func (m Messages) IsLastMessageFromRole(role string) bool {
return m[len(m)-1].Role == role
}
func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string, maxAttempts int) (*decisionResult, error) {
func (a *Agent) generateParameters(job *types.Job, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string, maxAttempts int) (*decisionResult, error) {
stateHUD, err := renderTemplate(pickTemplate, a.prepareHUD(), a.availableActions(), reasoning)
if err != nil {
return nil, err
@@ -201,7 +250,7 @@ func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act
var attemptErr error
for attempts := 0; attempts < maxAttempts; attempts++ {
result, attemptErr = a.decision(ctx,
result, attemptErr = a.decision(job,
cc,
a.availableActions().ToTools(),
act.Definition().Name.String(),
@@ -263,7 +312,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction
subTaskAction := a.availableActions().Find(subtask.Action)
subTaskReasoning := fmt.Sprintf("%s Overall goal is: %s", subtask.Reasoning, planResult.Goal)
params, err := a.generateParameters(ctx, pickTemplate, subTaskAction, conv, subTaskReasoning, maxRetries)
params, err := a.generateParameters(job, pickTemplate, subTaskAction, conv, subTaskReasoning, maxRetries)
if err != nil {
xlog.Error("error generating action's parameters", "error", err)
return conv, fmt.Errorf("error generating action's parameters: %w", err)
@@ -293,7 +342,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction
break
}
result, err := a.runAction(ctx, subTaskAction, actionParams)
result, err := a.runAction(job, subTaskAction, actionParams)
if err != nil {
xlog.Error("error running action", "error", err)
return conv, fmt.Errorf("error running action: %w", err)
@@ -378,7 +427,7 @@ func (a *Agent) prepareHUD() (promptHUD *PromptHUD) {
}
// pickAction picks an action based on the conversation
func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai.ChatCompletionMessage, maxRetries int) (types.Action, types.ActionParams, string, error) {
func (a *Agent) pickAction(job *types.Job, templ string, messages []openai.ChatCompletionMessage, maxRetries int) (types.Action, types.ActionParams, string, error) {
c := messages
xlog.Debug("[pickAction] picking action starts", "messages", messages)
@@ -389,7 +438,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai.
xlog.Debug("not forcing reasoning")
// We also could avoid to use functions here and get just a reply from the LLM
// and then use the reply to get the action
thought, err := a.decision(ctx,
thought, err := a.decision(job,
messages,
a.availableActions().ToTools(),
"",
@@ -431,7 +480,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai.
}, c...)
}
thought, err := a.decision(ctx,
thought, err := a.decision(job,
c,
types.Actions{action.NewReasoning()}.ToTools(),
action.NewReasoning().Definition().Name.String(), maxRetries)
@@ -467,7 +516,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai.
// to avoid hallucinations
// Extract an action
params, err := a.decision(ctx,
params, err := a.decision(job,
append(c, openai.ChatCompletionMessage{
Role: "system",
Content: "Pick the relevant action given the following reasoning: " + originalReasoning,

View File

@@ -30,7 +30,7 @@ type Agent struct {
jobQueue chan *types.Job
context *types.ActionContext
currentState *action.AgentInternalState
currentState *types.AgentInternalState
selfEvaluationInProgress bool
pause bool
@@ -41,6 +41,8 @@ type Agent struct {
subscriberMutex sync.Mutex
newMessagesSubscribers []func(openai.ChatCompletionMessage)
observer Observer
}
type RAGDB interface {
@@ -69,12 +71,17 @@ func New(opts ...Option) (*Agent, error) {
options: options,
client: client,
Character: options.character,
currentState: &action.AgentInternalState{},
currentState: &types.AgentInternalState{},
context: types.NewActionContext(ctx, cancel),
newConversations: make(chan openai.ChatCompletionMessage),
newMessagesSubscribers: options.newConversationsSubscribers,
}
// Initialize observer if provided
if options.observer != nil {
a.observer = options.observer
}
if a.options.statefile != "" {
if _, err := os.Stat(a.options.statefile); err == nil {
if err = a.LoadState(a.options.statefile); err != nil {
@@ -146,6 +153,14 @@ func (a *Agent) Ask(opts ...types.JobOption) *types.JobResult {
xlog.Debug("Agent has finished being asked", "agent", a.Character.Name)
}()
if a.observer != nil {
obs := a.observer.NewObservable()
obs.Name = "job"
obs.Icon = "plug"
a.observer.Update(*obs)
opts = append(opts, types.WithObservable(obs))
}
return a.Execute(types.NewJob(
append(
opts,
@@ -163,6 +178,20 @@ func (a *Agent) Execute(j *types.Job) *types.JobResult {
xlog.Debug("Agent has finished", "agent", a.Character.Name)
}()
if j.Obs != nil {
j.Result.AddFinalizer(func(ccm []openai.ChatCompletionMessage) {
j.Obs.Completion = &types.Completion{
Conversation: ccm,
}
if j.Result.Error != nil {
j.Obs.Completion.Error = j.Result.Error.Error()
}
a.observer.Update(*j.Obs)
})
}
a.Enqueue(j)
return j.Result.WaitResult()
}
@@ -237,41 +266,90 @@ func (a *Agent) Memory() RAGDB {
return a.options.ragdb
}
func (a *Agent) runAction(ctx context.Context, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) {
func (a *Agent) runAction(job *types.Job, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) {
var obs *types.Observable
if job.Obs != nil {
obs = a.observer.NewObservable()
obs.Name = "action"
obs.Icon = "bolt"
obs.ParentID = job.Obs.ID
obs.Creation = &types.Creation{
FunctionDefinition: chosenAction.Definition().ToFunctionDefinition(),
FunctionParams: params,
}
a.observer.Update(*obs)
}
xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String())
for _, act := range a.availableActions() {
if act.Definition().Name == chosenAction.Definition().Name {
res, err := act.Run(ctx, params)
res, err := act.Run(job.GetContext(), params)
if err != nil {
if obs != nil {
obs.Completion = &types.Completion{
Error: err.Error(),
}
}
return types.ActionResult{}, fmt.Errorf("error running action: %w", err)
}
if obs != nil {
obs.Progress = append(obs.Progress, types.Progress{
ActionResult: res.Result,
})
a.observer.Update(*obs)
}
result = res
}
}
xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String())
if chosenAction.Definition().Name.Is(action.StateActionName) {
// We need to store the result in the state
state := action.AgentInternalState{}
state := types.AgentInternalState{}
err = params.Unmarshal(&state)
if err != nil {
return types.ActionResult{}, fmt.Errorf("error unmarshalling state of the agent: %w", err)
werr := fmt.Errorf("error unmarshalling state of the agent: %w", err)
if obs != nil {
obs.Completion = &types.Completion{
Error: werr.Error(),
}
}
return types.ActionResult{}, werr
}
// update the current state with the one we just got from the action
a.currentState = &state
if obs != nil {
obs.Progress = append(obs.Progress, types.Progress{
AgentState: &state,
})
a.observer.Update(*obs)
}
// update the state file
if a.options.statefile != "" {
if err := a.SaveState(a.options.statefile); err != nil {
if obs != nil {
obs.Completion = &types.Completion{
Error: err.Error(),
}
}
return types.ActionResult{}, err
}
}
}
xlog.Debug("[runAction] Action result", "action", chosenAction.Definition().Name, "params", params.String(), "result", result.Result)
if obs != nil {
obs.MakeLastProgressCompletion()
a.observer.Update(*obs)
}
return result, nil
}
@@ -468,7 +546,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
chosenAction = *action
reasoning = reason
if params == nil {
p, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
p, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries)
if err != nil {
xlog.Error("Error generating parameters, trying again", "error", err)
// try again
@@ -483,7 +561,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
job.ResetNextAction()
} else {
var err error
chosenAction, actionParams, reasoning, err = a.pickAction(job.GetContext(), pickTemplate, conv, maxRetries)
chosenAction, actionParams, reasoning, err = a.pickAction(job, pickTemplate, conv, maxRetries)
if err != nil {
xlog.Error("Error picking action", "error", err)
job.Result.Finish(err)
@@ -557,7 +635,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
"reasoning", reasoning,
)
params, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
params, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries)
if err != nil {
xlog.Error("Error generating parameters, trying again", "error", err)
// try again
@@ -652,7 +730,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
}
if !chosenAction.Definition().Name.Is(action.PlanActionName) {
result, err := a.runAction(job.GetContext(), chosenAction, actionParams)
result, err := a.runAction(job, chosenAction, actionParams)
if err != nil {
//job.Result.Finish(fmt.Errorf("error running action: %w", err))
//return
@@ -677,7 +755,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
}
// given the result, we can now re-evaluate the conversation
followingAction, followingParams, reasoning, err := a.pickAction(job.GetContext(), reEvaluationTemplate, conv, maxRetries)
followingAction, followingParams, reasoning, err := a.pickAction(job, reEvaluationTemplate, conv, maxRetries)
if err != nil {
job.Result.Conversation = conv
job.Result.Finish(fmt.Errorf("error picking action: %w", err))
@@ -955,3 +1033,7 @@ func (a *Agent) loop(timer *time.Timer, job *types.Job) {
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
a.consumeJob(job, UserRole)
}
func (a *Agent) Observer() Observer {
return a.observer
}

87
core/agent/observer.go Normal file
View File

@@ -0,0 +1,87 @@
package agent
import (
"encoding/json"
"sync"
"sync/atomic"
"github.com/mudler/LocalAGI/core/sse"
"github.com/mudler/LocalAGI/core/types"
"github.com/mudler/LocalAGI/pkg/xlog"
)
type Observer interface {
NewObservable() *types.Observable
Update(types.Observable)
History() []types.Observable
}
type SSEObserver struct {
agent string
maxID int32
manager sse.Manager
mutex sync.Mutex
history []types.Observable
historyLast int
}
func NewSSEObserver(agent string, manager sse.Manager) *SSEObserver {
return &SSEObserver{
agent: agent,
maxID: 1,
manager: manager,
history: make([]types.Observable, 100),
}
}
func (s *SSEObserver) NewObservable() *types.Observable {
id := atomic.AddInt32(&s.maxID, 1)
return &types.Observable{
ID: id - 1,
Agent: s.agent,
}
}
func (s *SSEObserver) Update(obs types.Observable) {
data, err := json.Marshal(obs)
if err != nil {
xlog.Error("Error marshaling observable", "error", err)
return
}
msg := sse.NewMessage(string(data)).WithEvent("observable_update")
s.manager.Send(msg)
s.mutex.Lock()
defer s.mutex.Unlock()
for i, o := range s.history {
if o.ID == obs.ID {
s.history[i] = obs
return
}
}
s.history[s.historyLast] = obs
s.historyLast += 1
if s.historyLast >= len(s.history) {
s.historyLast = 0
}
}
func (s *SSEObserver) History() []types.Observable {
h := make([]types.Observable, 0, 20)
s.mutex.Lock()
defer s.mutex.Unlock()
for _, obs := range s.history {
if obs.ID == 0 {
continue
}
h = append(h, obs)
}
return h
}

View File

@@ -53,6 +53,8 @@ type options struct {
mcpServers []MCPServer
newConversationsSubscribers []func(openai.ChatCompletionMessage)
observer Observer
}
func (o *options) SeparatedMultimodalModel() bool {
@@ -336,3 +338,10 @@ func WithActions(actions ...types.Action) Option {
return nil
}
}
func WithObserver(observer Observer) Option {
return func(o *options) error {
o.observer = observer
return nil
}
}

View File

@@ -6,7 +6,7 @@ import (
"os"
"path/filepath"
"github.com/mudler/LocalAGI/core/action"
"github.com/mudler/LocalAGI/core/types"
"github.com/sashabaranov/go-openai/jsonschema"
)
@@ -15,7 +15,7 @@ import (
// in the prompts
type PromptHUD struct {
Character Character `json:"character"`
CurrentState action.AgentInternalState `json:"current_state"`
CurrentState types.AgentInternalState `json:"current_state"`
PermanentGoal string `json:"permanent_goal"`
ShowCharacter bool `json:"show_character"`
}
@@ -80,7 +80,7 @@ func Load(path string) (*Character, error) {
return &c, nil
}
func (a *Agent) State() action.AgentInternalState {
func (a *Agent) State() types.AgentInternalState {
return *a.currentState
}

View File

@@ -407,6 +407,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
c.AgentResultCallback()(state)
}
}),
WithObserver(NewSSEObserver(name, manager)),
}
if config.HUD {

View File

@@ -27,6 +27,8 @@ type Job struct {
context context.Context
cancel context.CancelFunc
Obs *Observable
}
type ActionRequest struct {
@@ -198,3 +200,9 @@ func (j *Job) Cancel() {
func (j *Job) GetContext() context.Context {
return j.context
}
func WithObservable(obs *Observable) JobOption {
return func(j *Job) {
j.Obs = obs
}
}

61
core/types/observable.go Normal file
View File

@@ -0,0 +1,61 @@
package types
import (
"github.com/mudler/LocalAGI/pkg/xlog"
"github.com/sashabaranov/go-openai"
)
type Creation struct {
ChatCompletionRequest *openai.ChatCompletionRequest `json:"chat_completion_request,omitempty"`
FunctionDefinition *openai.FunctionDefinition `json:"function_definition,omitempty"`
FunctionParams ActionParams `json:"function_params,omitempty"`
}
type Progress struct {
Error string `json:"error,omitempty"`
ChatCompletionResponse *openai.ChatCompletionResponse `json:"chat_completion_response,omitempty"`
ActionResult string `json:"action_result,omitempty"`
AgentState *AgentInternalState `json:"agent_state"`
}
type Completion struct {
Error string `json:"error,omitempty"`
ChatCompletionResponse *openai.ChatCompletionResponse `json:"chat_completion_response,omitempty"`
Conversation []openai.ChatCompletionMessage `json:"conversation,omitempty"`
ActionResult string `json:"action_result,omitempty"`
AgentState *AgentInternalState `json:"agent_state"`
}
type Observable struct {
ID int32 `json:"id"`
ParentID int32 `json:"parent_id,omitempty"`
Agent string `json:"agent"`
Name string `json:"name"`
Icon string `json:"icon"`
Creation *Creation `json:"creation,omitempty"`
Progress []Progress `json:"progress,omitempty"`
Completion *Completion `json:"completion,omitempty"`
}
func (o *Observable) AddProgress(p Progress) {
if o.Progress == nil {
o.Progress = make([]Progress, 0)
}
o.Progress = append(o.Progress, p)
}
func (o *Observable) MakeLastProgressCompletion() {
if len(o.Progress) == 0 {
xlog.Error("Observable completed without any progress", "id", o.ID, "name", o.Name)
return
}
p := o.Progress[len(o.Progress)-1]
o.Progress = o.Progress[:len(o.Progress)-1]
o.Completion = &Completion{
Error: p.Error,
ChatCompletionResponse: p.ChatCompletionResponse,
ActionResult: p.ActionResult,
AgentState: p.AgentState,
}
}

41
core/types/state.go Normal file
View File

@@ -0,0 +1,41 @@
package types
import "fmt"
// State is the structure
// that is used to keep track of the current state
// and the Agent's short memory that it can update
// Besides a long term memory that is accessible by the agent (With vector database),
// And a context memory (that is always powered by a vector database),
// this memory is the shorter one that the LLM keeps across conversation and across its
// reasoning process's and life time.
// TODO: A special action is then used to let the LLM itself update its memory
// periodically during self-processing, and the same action is ALSO exposed
// during the conversation to let the user put for example, a new goal to the agent.
type AgentInternalState struct {
NowDoing string `json:"doing_now"`
DoingNext string `json:"doing_next"`
DoneHistory []string `json:"done_history"`
Memories []string `json:"memories"`
Goal string `json:"goal"`
}
const fmtT = `=====================
NowDoing: %s
DoingNext: %s
Your current goal is: %s
You have done: %+v
You have a short memory with: %+v
=====================
`
func (c AgentInternalState) String() string {
return fmt.Sprintf(
fmtT,
c.NowDoing,
c.DoingNext,
c.Goal,
c.DoneHistory,
c.Memories,
)
}