From b5a12a1da6b44e616fef7ffe9628b1067286c3e1 Mon Sep 17 00:00:00 2001 From: Richard Palethorpe Date: Fri, 18 Apr 2025 16:32:43 +0100 Subject: [PATCH] feat(ui): Structured observability/status view (#40) * refactor(ui): Make message status SSE name more specific Signed-off-by: Richard Palethorpe * feat(ui): Add structured observability events Signed-off-by: Richard Palethorpe --------- Signed-off-by: Richard Palethorpe --- core/action/state.go | 39 --- core/agent/actions.go | 89 +++++-- core/agent/agent.go | 110 ++++++-- core/agent/observer.go | 87 +++++++ core/agent/options.go | 9 + core/agent/state.go | 6 +- core/state/pool.go | 1 + core/types/job.go | 8 + core/types/observable.go | 61 +++++ core/types/state.go | 41 +++ webui/app.go | 4 +- webui/react-ui/bun.lock | 3 + webui/react-ui/package.json | 3 +- webui/react-ui/src/App.css | 71 +++++- webui/react-ui/src/hooks/useSSE.js | 4 +- webui/react-ui/src/pages/AgentStatus.jsx | 303 +++++++++++++++++++---- webui/routes.go | 28 ++- 17 files changed, 730 insertions(+), 137 deletions(-) create mode 100644 core/agent/observer.go create mode 100644 core/types/observable.go create mode 100644 core/types/state.go diff --git a/core/action/state.go b/core/action/state.go index 8ce77e9..d0ac586 100644 --- a/core/action/state.go +++ b/core/action/state.go @@ -2,7 +2,6 @@ package action import ( "context" - "fmt" "github.com/mudler/LocalAGI/core/types" "github.com/sashabaranov/go-openai/jsonschema" @@ -16,24 +15,6 @@ func NewState() *StateAction { type StateAction struct{} -// State is the structure -// that is used to keep track of the current state -// and the Agent's short memory that it can update -// Besides a long term memory that is accessible by the agent (With vector database), -// And a context memory (that is always powered by a vector database), -// this memory is the shorter one that the LLM keeps across conversation and across its -// reasoning process's and life time. -// TODO: A special action is then used to let the LLM itself update its memory -// periodically during self-processing, and the same action is ALSO exposed -// during the conversation to let the user put for example, a new goal to the agent. -type AgentInternalState struct { - NowDoing string `json:"doing_now"` - DoingNext string `json:"doing_next"` - DoneHistory []string `json:"done_history"` - Memories []string `json:"memories"` - Goal string `json:"goal"` -} - func (a *StateAction) Run(context.Context, types.ActionParams) (types.ActionResult, error) { return types.ActionResult{Result: "internal state has been updated"}, nil } @@ -76,23 +57,3 @@ func (a *StateAction) Definition() types.ActionDefinition { }, } } - -const fmtT = `===================== -NowDoing: %s -DoingNext: %s -Your current goal is: %s -You have done: %+v -You have a short memory with: %+v -===================== -` - -func (c AgentInternalState) String() string { - return fmt.Sprintf( - fmtT, - c.NowDoing, - c.DoingNext, - c.Goal, - c.DoneHistory, - c.Memories, - ) -} diff --git a/core/agent/actions.go b/core/agent/actions.go index 0ad7a61..0cae1b3 100644 --- a/core/agent/actions.go +++ b/core/agent/actions.go @@ -22,7 +22,7 @@ type decisionResult struct { // decision forces the agent to take one of the available actions func (a *Agent) decision( - ctx context.Context, + job *types.Job, conversation []openai.ChatCompletionMessage, tools []openai.Tool, toolchoice string, maxRetries int) (*decisionResult, error) { @@ -35,31 +35,63 @@ func (a *Agent) decision( } } + decision := openai.ChatCompletionRequest{ + Model: a.options.LLMAPI.Model, + Messages: conversation, + Tools: tools, + } + + if choice != nil { + decision.ToolChoice = *choice + } + + var obs *types.Observable + if job.Obs != nil { + obs = a.observer.NewObservable() + obs.Name = "decision" + obs.ParentID = job.Obs.ID + obs.Icon = "brain" + obs.Creation = &types.Creation{ + ChatCompletionRequest: &decision, + } + a.observer.Update(*obs) + } + var lastErr error for attempts := 0; attempts < maxRetries; attempts++ { - decision := openai.ChatCompletionRequest{ - Model: a.options.LLMAPI.Model, - Messages: conversation, - Tools: tools, - } - - if choice != nil { - decision.ToolChoice = *choice - } - - resp, err := a.client.CreateChatCompletion(ctx, decision) + resp, err := a.client.CreateChatCompletion(job.GetContext(), decision) if err != nil { lastErr = err xlog.Warn("Attempt to make a decision failed", "attempt", attempts+1, "error", err) + + if obs != nil { + obs.Progress = append(obs.Progress, types.Progress{ + Error: err.Error(), + }) + a.observer.Update(*obs) + } + continue } jsonResp, _ := json.Marshal(resp) xlog.Debug("Decision response", "response", string(jsonResp)) + if obs != nil { + obs.AddProgress(types.Progress{ + ChatCompletionResponse: &resp, + }) + } + if len(resp.Choices) != 1 { lastErr = fmt.Errorf("no choices: %d", len(resp.Choices)) xlog.Warn("Attempt to make a decision failed", "attempt", attempts+1, "error", lastErr) + + if obs != nil { + obs.Progress[len(obs.Progress)-1].Error = lastErr.Error() + a.observer.Update(*obs) + } + continue } @@ -68,6 +100,12 @@ func (a *Agent) decision( if err := a.saveConversation(append(conversation, msg), "decision"); err != nil { xlog.Error("Error saving conversation", "error", err) } + + if obs != nil { + obs.MakeLastProgressCompletion() + a.observer.Update(*obs) + } + return &decisionResult{message: msg.Content}, nil } @@ -75,6 +113,12 @@ func (a *Agent) decision( if err := params.Read(msg.ToolCalls[0].Function.Arguments); err != nil { lastErr = err xlog.Warn("Attempt to parse action parameters failed", "attempt", attempts+1, "error", err) + + if obs != nil { + obs.Progress[len(obs.Progress)-1].Error = lastErr.Error() + a.observer.Update(*obs) + } + continue } @@ -82,6 +126,11 @@ func (a *Agent) decision( xlog.Error("Error saving conversation", "error", err) } + if obs != nil { + obs.MakeLastProgressCompletion() + a.observer.Update(*obs) + } + return &decisionResult{actionParams: params, actioName: msg.ToolCalls[0].Function.Name, message: msg.Content}, nil } @@ -173,7 +222,7 @@ func (m Messages) IsLastMessageFromRole(role string) bool { return m[len(m)-1].Role == role } -func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string, maxAttempts int) (*decisionResult, error) { +func (a *Agent) generateParameters(job *types.Job, pickTemplate string, act types.Action, c []openai.ChatCompletionMessage, reasoning string, maxAttempts int) (*decisionResult, error) { stateHUD, err := renderTemplate(pickTemplate, a.prepareHUD(), a.availableActions(), reasoning) if err != nil { return nil, err @@ -201,7 +250,7 @@ func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act var attemptErr error for attempts := 0; attempts < maxAttempts; attempts++ { - result, attemptErr = a.decision(ctx, + result, attemptErr = a.decision(job, cc, a.availableActions().ToTools(), act.Definition().Name.String(), @@ -263,7 +312,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction subTaskAction := a.availableActions().Find(subtask.Action) subTaskReasoning := fmt.Sprintf("%s Overall goal is: %s", subtask.Reasoning, planResult.Goal) - params, err := a.generateParameters(ctx, pickTemplate, subTaskAction, conv, subTaskReasoning, maxRetries) + params, err := a.generateParameters(job, pickTemplate, subTaskAction, conv, subTaskReasoning, maxRetries) if err != nil { xlog.Error("error generating action's parameters", "error", err) return conv, fmt.Errorf("error generating action's parameters: %w", err) @@ -293,7 +342,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction break } - result, err := a.runAction(ctx, subTaskAction, actionParams) + result, err := a.runAction(job, subTaskAction, actionParams) if err != nil { xlog.Error("error running action", "error", err) return conv, fmt.Errorf("error running action: %w", err) @@ -378,7 +427,7 @@ func (a *Agent) prepareHUD() (promptHUD *PromptHUD) { } // pickAction picks an action based on the conversation -func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai.ChatCompletionMessage, maxRetries int) (types.Action, types.ActionParams, string, error) { +func (a *Agent) pickAction(job *types.Job, templ string, messages []openai.ChatCompletionMessage, maxRetries int) (types.Action, types.ActionParams, string, error) { c := messages xlog.Debug("[pickAction] picking action starts", "messages", messages) @@ -389,7 +438,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai. xlog.Debug("not forcing reasoning") // We also could avoid to use functions here and get just a reply from the LLM // and then use the reply to get the action - thought, err := a.decision(ctx, + thought, err := a.decision(job, messages, a.availableActions().ToTools(), "", @@ -431,7 +480,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai. }, c...) } - thought, err := a.decision(ctx, + thought, err := a.decision(job, c, types.Actions{action.NewReasoning()}.ToTools(), action.NewReasoning().Definition().Name.String(), maxRetries) @@ -467,7 +516,7 @@ func (a *Agent) pickAction(ctx context.Context, templ string, messages []openai. // to avoid hallucinations // Extract an action - params, err := a.decision(ctx, + params, err := a.decision(job, append(c, openai.ChatCompletionMessage{ Role: "system", Content: "Pick the relevant action given the following reasoning: " + originalReasoning, diff --git a/core/agent/agent.go b/core/agent/agent.go index e93996e..54a328a 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -30,7 +30,7 @@ type Agent struct { jobQueue chan *types.Job context *types.ActionContext - currentState *action.AgentInternalState + currentState *types.AgentInternalState selfEvaluationInProgress bool pause bool @@ -41,6 +41,8 @@ type Agent struct { subscriberMutex sync.Mutex newMessagesSubscribers []func(openai.ChatCompletionMessage) + + observer Observer } type RAGDB interface { @@ -69,12 +71,17 @@ func New(opts ...Option) (*Agent, error) { options: options, client: client, Character: options.character, - currentState: &action.AgentInternalState{}, + currentState: &types.AgentInternalState{}, context: types.NewActionContext(ctx, cancel), newConversations: make(chan openai.ChatCompletionMessage), newMessagesSubscribers: options.newConversationsSubscribers, } + // Initialize observer if provided + if options.observer != nil { + a.observer = options.observer + } + if a.options.statefile != "" { if _, err := os.Stat(a.options.statefile); err == nil { if err = a.LoadState(a.options.statefile); err != nil { @@ -146,6 +153,14 @@ func (a *Agent) Ask(opts ...types.JobOption) *types.JobResult { xlog.Debug("Agent has finished being asked", "agent", a.Character.Name) }() + if a.observer != nil { + obs := a.observer.NewObservable() + obs.Name = "job" + obs.Icon = "plug" + a.observer.Update(*obs) + opts = append(opts, types.WithObservable(obs)) + } + return a.Execute(types.NewJob( append( opts, @@ -163,6 +178,20 @@ func (a *Agent) Execute(j *types.Job) *types.JobResult { xlog.Debug("Agent has finished", "agent", a.Character.Name) }() + if j.Obs != nil { + j.Result.AddFinalizer(func(ccm []openai.ChatCompletionMessage) { + j.Obs.Completion = &types.Completion{ + Conversation: ccm, + } + + if j.Result.Error != nil { + j.Obs.Completion.Error = j.Result.Error.Error() + } + + a.observer.Update(*j.Obs) + }) + } + a.Enqueue(j) return j.Result.WaitResult() } @@ -237,41 +266,90 @@ func (a *Agent) Memory() RAGDB { return a.options.ragdb } -func (a *Agent) runAction(ctx context.Context, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) { +func (a *Agent) runAction(job *types.Job, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) { + var obs *types.Observable + if job.Obs != nil { + obs = a.observer.NewObservable() + obs.Name = "action" + obs.Icon = "bolt" + obs.ParentID = job.Obs.ID + obs.Creation = &types.Creation{ + FunctionDefinition: chosenAction.Definition().ToFunctionDefinition(), + FunctionParams: params, + } + a.observer.Update(*obs) + } + + xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String()) + for _, act := range a.availableActions() { if act.Definition().Name == chosenAction.Definition().Name { - res, err := act.Run(ctx, params) + res, err := act.Run(job.GetContext(), params) if err != nil { + if obs != nil { + obs.Completion = &types.Completion{ + Error: err.Error(), + } + } + return types.ActionResult{}, fmt.Errorf("error running action: %w", err) } + if obs != nil { + obs.Progress = append(obs.Progress, types.Progress{ + ActionResult: res.Result, + }) + a.observer.Update(*obs) + } + result = res } } - xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String()) - if chosenAction.Definition().Name.Is(action.StateActionName) { // We need to store the result in the state - state := action.AgentInternalState{} + state := types.AgentInternalState{} err = params.Unmarshal(&state) if err != nil { - return types.ActionResult{}, fmt.Errorf("error unmarshalling state of the agent: %w", err) + werr := fmt.Errorf("error unmarshalling state of the agent: %w", err) + if obs != nil { + obs.Completion = &types.Completion{ + Error: werr.Error(), + } + } + return types.ActionResult{}, werr } // update the current state with the one we just got from the action a.currentState = &state + if obs != nil { + obs.Progress = append(obs.Progress, types.Progress{ + AgentState: &state, + }) + a.observer.Update(*obs) + } // update the state file if a.options.statefile != "" { if err := a.SaveState(a.options.statefile); err != nil { + if obs != nil { + obs.Completion = &types.Completion{ + Error: err.Error(), + } + } + return types.ActionResult{}, err } } } xlog.Debug("[runAction] Action result", "action", chosenAction.Definition().Name, "params", params.String(), "result", result.Result) - + + if obs != nil { + obs.MakeLastProgressCompletion() + a.observer.Update(*obs) + } + return result, nil } @@ -468,7 +546,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) { chosenAction = *action reasoning = reason if params == nil { - p, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries) + p, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries) if err != nil { xlog.Error("Error generating parameters, trying again", "error", err) // try again @@ -483,7 +561,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) { job.ResetNextAction() } else { var err error - chosenAction, actionParams, reasoning, err = a.pickAction(job.GetContext(), pickTemplate, conv, maxRetries) + chosenAction, actionParams, reasoning, err = a.pickAction(job, pickTemplate, conv, maxRetries) if err != nil { xlog.Error("Error picking action", "error", err) job.Result.Finish(err) @@ -557,7 +635,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) { "reasoning", reasoning, ) - params, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries) + params, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries) if err != nil { xlog.Error("Error generating parameters, trying again", "error", err) // try again @@ -652,7 +730,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) { } if !chosenAction.Definition().Name.Is(action.PlanActionName) { - result, err := a.runAction(job.GetContext(), chosenAction, actionParams) + result, err := a.runAction(job, chosenAction, actionParams) if err != nil { //job.Result.Finish(fmt.Errorf("error running action: %w", err)) //return @@ -677,7 +755,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) { } // given the result, we can now re-evaluate the conversation - followingAction, followingParams, reasoning, err := a.pickAction(job.GetContext(), reEvaluationTemplate, conv, maxRetries) + followingAction, followingParams, reasoning, err := a.pickAction(job, reEvaluationTemplate, conv, maxRetries) if err != nil { job.Result.Conversation = conv job.Result.Finish(fmt.Errorf("error picking action: %w", err)) @@ -955,3 +1033,7 @@ func (a *Agent) loop(timer *time.Timer, job *types.Job) { xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job) a.consumeJob(job, UserRole) } + +func (a *Agent) Observer() Observer { + return a.observer +} diff --git a/core/agent/observer.go b/core/agent/observer.go new file mode 100644 index 0000000..a3283a6 --- /dev/null +++ b/core/agent/observer.go @@ -0,0 +1,87 @@ +package agent + +import ( + "encoding/json" + "sync" + "sync/atomic" + + "github.com/mudler/LocalAGI/core/sse" + "github.com/mudler/LocalAGI/core/types" + "github.com/mudler/LocalAGI/pkg/xlog" +) + +type Observer interface { + NewObservable() *types.Observable + Update(types.Observable) + History() []types.Observable +} + +type SSEObserver struct { + agent string + maxID int32 + manager sse.Manager + + mutex sync.Mutex + history []types.Observable + historyLast int +} + +func NewSSEObserver(agent string, manager sse.Manager) *SSEObserver { + return &SSEObserver{ + agent: agent, + maxID: 1, + manager: manager, + history: make([]types.Observable, 100), + } +} + +func (s *SSEObserver) NewObservable() *types.Observable { + id := atomic.AddInt32(&s.maxID, 1) + return &types.Observable{ + ID: id - 1, + Agent: s.agent, + } +} + +func (s *SSEObserver) Update(obs types.Observable) { + data, err := json.Marshal(obs) + if err != nil { + xlog.Error("Error marshaling observable", "error", err) + return + } + msg := sse.NewMessage(string(data)).WithEvent("observable_update") + s.manager.Send(msg) + + s.mutex.Lock() + defer s.mutex.Unlock() + + for i, o := range s.history { + if o.ID == obs.ID { + s.history[i] = obs + return + } + } + + s.history[s.historyLast] = obs + s.historyLast += 1 + if s.historyLast >= len(s.history) { + s.historyLast = 0 + } +} + +func (s *SSEObserver) History() []types.Observable { + h := make([]types.Observable, 0, 20) + + s.mutex.Lock() + defer s.mutex.Unlock() + + for _, obs := range s.history { + if obs.ID == 0 { + continue + } + + h = append(h, obs) + } + + return h +} diff --git a/core/agent/options.go b/core/agent/options.go index 831016a..d8a2b67 100644 --- a/core/agent/options.go +++ b/core/agent/options.go @@ -53,6 +53,8 @@ type options struct { mcpServers []MCPServer newConversationsSubscribers []func(openai.ChatCompletionMessage) + + observer Observer } func (o *options) SeparatedMultimodalModel() bool { @@ -336,3 +338,10 @@ func WithActions(actions ...types.Action) Option { return nil } } + +func WithObserver(observer Observer) Option { + return func(o *options) error { + o.observer = observer + return nil + } +} diff --git a/core/agent/state.go b/core/agent/state.go index 0b02af9..2c35306 100644 --- a/core/agent/state.go +++ b/core/agent/state.go @@ -6,7 +6,7 @@ import ( "os" "path/filepath" - "github.com/mudler/LocalAGI/core/action" + "github.com/mudler/LocalAGI/core/types" "github.com/sashabaranov/go-openai/jsonschema" ) @@ -15,7 +15,7 @@ import ( // in the prompts type PromptHUD struct { Character Character `json:"character"` - CurrentState action.AgentInternalState `json:"current_state"` + CurrentState types.AgentInternalState `json:"current_state"` PermanentGoal string `json:"permanent_goal"` ShowCharacter bool `json:"show_character"` } @@ -80,7 +80,7 @@ func Load(path string) (*Character, error) { return &c, nil } -func (a *Agent) State() action.AgentInternalState { +func (a *Agent) State() types.AgentInternalState { return *a.currentState } diff --git a/core/state/pool.go b/core/state/pool.go index 85217cf..0f4f96a 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -407,6 +407,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error c.AgentResultCallback()(state) } }), + WithObserver(NewSSEObserver(name, manager)), } if config.HUD { diff --git a/core/types/job.go b/core/types/job.go index 9b157fb..c699c2e 100644 --- a/core/types/job.go +++ b/core/types/job.go @@ -27,6 +27,8 @@ type Job struct { context context.Context cancel context.CancelFunc + + Obs *Observable } type ActionRequest struct { @@ -198,3 +200,9 @@ func (j *Job) Cancel() { func (j *Job) GetContext() context.Context { return j.context } + +func WithObservable(obs *Observable) JobOption { + return func(j *Job) { + j.Obs = obs + } +} diff --git a/core/types/observable.go b/core/types/observable.go new file mode 100644 index 0000000..74dd348 --- /dev/null +++ b/core/types/observable.go @@ -0,0 +1,61 @@ +package types + +import ( + "github.com/mudler/LocalAGI/pkg/xlog" + "github.com/sashabaranov/go-openai" +) + +type Creation struct { + ChatCompletionRequest *openai.ChatCompletionRequest `json:"chat_completion_request,omitempty"` + FunctionDefinition *openai.FunctionDefinition `json:"function_definition,omitempty"` + FunctionParams ActionParams `json:"function_params,omitempty"` +} + +type Progress struct { + Error string `json:"error,omitempty"` + ChatCompletionResponse *openai.ChatCompletionResponse `json:"chat_completion_response,omitempty"` + ActionResult string `json:"action_result,omitempty"` + AgentState *AgentInternalState `json:"agent_state"` +} + +type Completion struct { + Error string `json:"error,omitempty"` + ChatCompletionResponse *openai.ChatCompletionResponse `json:"chat_completion_response,omitempty"` + Conversation []openai.ChatCompletionMessage `json:"conversation,omitempty"` + ActionResult string `json:"action_result,omitempty"` + AgentState *AgentInternalState `json:"agent_state"` +} + +type Observable struct { + ID int32 `json:"id"` + ParentID int32 `json:"parent_id,omitempty"` + Agent string `json:"agent"` + Name string `json:"name"` + Icon string `json:"icon"` + + Creation *Creation `json:"creation,omitempty"` + Progress []Progress `json:"progress,omitempty"` + Completion *Completion `json:"completion,omitempty"` +} + +func (o *Observable) AddProgress(p Progress) { + if o.Progress == nil { + o.Progress = make([]Progress, 0) + } + o.Progress = append(o.Progress, p) +} + +func (o *Observable) MakeLastProgressCompletion() { + if len(o.Progress) == 0 { + xlog.Error("Observable completed without any progress", "id", o.ID, "name", o.Name) + return + } + p := o.Progress[len(o.Progress)-1] + o.Progress = o.Progress[:len(o.Progress)-1] + o.Completion = &Completion{ + Error: p.Error, + ChatCompletionResponse: p.ChatCompletionResponse, + ActionResult: p.ActionResult, + AgentState: p.AgentState, + } +} diff --git a/core/types/state.go b/core/types/state.go new file mode 100644 index 0000000..e3baa5e --- /dev/null +++ b/core/types/state.go @@ -0,0 +1,41 @@ +package types + +import "fmt" + +// State is the structure +// that is used to keep track of the current state +// and the Agent's short memory that it can update +// Besides a long term memory that is accessible by the agent (With vector database), +// And a context memory (that is always powered by a vector database), +// this memory is the shorter one that the LLM keeps across conversation and across its +// reasoning process's and life time. +// TODO: A special action is then used to let the LLM itself update its memory +// periodically during self-processing, and the same action is ALSO exposed +// during the conversation to let the user put for example, a new goal to the agent. +type AgentInternalState struct { + NowDoing string `json:"doing_now"` + DoingNext string `json:"doing_next"` + DoneHistory []string `json:"done_history"` + Memories []string `json:"memories"` + Goal string `json:"goal"` +} + +const fmtT = `===================== +NowDoing: %s +DoingNext: %s +Your current goal is: %s +You have done: %+v +You have a short memory with: %+v +===================== +` + +func (c AgentInternalState) String() string { + return fmt.Sprintf( + fmtT, + c.NowDoing, + c.DoingNext, + c.Goal, + c.DoneHistory, + c.Memories, + ) +} diff --git a/webui/app.go b/webui/app.go index 91cf899..ad9d362 100644 --- a/webui/app.go +++ b/webui/app.go @@ -370,7 +370,7 @@ func (a *App) Chat(pool *state.AgentPool) func(c *fiber.Ctx) error { xlog.Error("Error marshaling status message", "error", err) } else { manager.Send( - sse.NewMessage(string(statusData)).WithEvent("json_status")) + sse.NewMessage(string(statusData)).WithEvent("json_message_status")) } // Process the message asynchronously @@ -417,7 +417,7 @@ func (a *App) Chat(pool *state.AgentPool) func(c *fiber.Ctx) error { xlog.Error("Error marshaling completed status", "error", err) } else { manager.Send( - sse.NewMessage(string(completedData)).WithEvent("json_status")) + sse.NewMessage(string(completedData)).WithEvent("json_message_status")) } }() diff --git a/webui/react-ui/bun.lock b/webui/react-ui/bun.lock index 9347080..c7cf84f 100644 --- a/webui/react-ui/bun.lock +++ b/webui/react-ui/bun.lock @@ -4,6 +4,7 @@ "": { "name": "react-ui", "dependencies": { + "highlight.js": "^11.11.1", "react": "^19.1.0", "react-dom": "^19.1.0", }, @@ -300,6 +301,8 @@ "has-flag": ["has-flag@4.0.0", "", {}, "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ=="], + "highlight.js": ["highlight.js@11.11.1", "", {}, "sha512-Xwwo44whKBVCYoliBQwaPvtd/2tYFkRQtXDWj1nackaV2JPXx3L0+Jvd8/qCJ2p+ML0/XVkJ2q+Mr+UVdpJK5w=="], + "ignore": ["ignore@5.3.2", "", {}, "sha512-hsBTNUqQTDwkWtcdYI2i06Y/nUBEsNEDJKjWdigLvegy8kDuJAS8uRlpkkcQpyEXL0Z/pjDy5HBmMjRCJ2gq+g=="], "import-fresh": ["import-fresh@3.3.1", "", { "dependencies": { "parent-module": "^1.0.0", "resolve-from": "^4.0.0" } }, "sha512-TR3KfrTZTYLPB6jUjfx6MF9WcWrHL9su5TObK4ZkYgBdWKPOFoSoQIdEuTuR82pmtxH2spWG9h6etwfr1pLBqQ=="], diff --git a/webui/react-ui/package.json b/webui/react-ui/package.json index fff9a3f..48f5d42 100644 --- a/webui/react-ui/package.json +++ b/webui/react-ui/package.json @@ -11,7 +11,8 @@ }, "dependencies": { "react": "^19.1.0", - "react-dom": "^19.1.0" + "react-dom": "^19.1.0", + "highlight.js": "^11.11.1" }, "devDependencies": { "@eslint/js": "^9.24.0", diff --git a/webui/react-ui/src/App.css b/webui/react-ui/src/App.css index f09af27..953e2b1 100644 --- a/webui/react-ui/src/App.css +++ b/webui/react-ui/src/App.css @@ -1,4 +1,17 @@ /* Base styles */ +pre.hljs { + background-color: var(--medium-bg); + padding: 1rem; + border-radius: 8px; + overflow-x: auto; + font-family: 'JetBrains Mono', monospace; + line-height: 1.5; +} + +code.json { + display: block; +} + :root { --primary: #00ff95; --secondary: #ff00b1; @@ -1994,16 +2007,62 @@ select.form-control { text-decoration: none; } -.file-button:hover { - background: rgba(0, 255, 149, 0.8); - transform: translateY(-2px); - box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2); -} - .file-button i { font-size: 16px; } +.card { + background: var(--medium-bg); + border: 1px solid var(--border); + border-radius: 8px; + padding: 15px; + margin-bottom: 15px; + transition: all 0.3s ease; + box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1); + cursor: pointer; +} + +.card:hover { + transform: translateY(-2px); + box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2); + background: var(--light-bg); +} + +.spinner { + width: 16px; + height: 16px; + border: 2px solid var(--primary); + border-radius: 50%; + border-top-color: transparent; + animation: spin 1s linear infinite; +} + +@keyframes spin { + 0% { transform: rotate(0deg); } + 100% { transform: rotate(360deg); } +} + +.expand-button { + background: none; + border: none; + color: var(--primary); + cursor: pointer; + font-size: 1.2em; + padding: 5px; + margin-left: 10px; + transition: all 0.3s ease; +} + +.expand-button:hover { + color: var(--success); + transform: scale(1.1); +} + +.expand-button:focus { + outline: none; + box-shadow: 0 0 0 2px var(--primary); +} + .selected-file-info { margin-top: 20px; padding: 20px; diff --git a/webui/react-ui/src/hooks/useSSE.js b/webui/react-ui/src/hooks/useSSE.js index 76409cc..c62c704 100644 --- a/webui/react-ui/src/hooks/useSSE.js +++ b/webui/react-ui/src/hooks/useSSE.js @@ -63,8 +63,8 @@ export function useSSE(agentName) { } }); - // Handle 'json_status' event - eventSource.addEventListener('json_status', (event) => { + // Handle 'json_message_status' event + eventSource.addEventListener('json_message_status', (event) => { try { const data = JSON.parse(event.data); const timestamp = data.timestamp || new Date().toISOString(); diff --git a/webui/react-ui/src/pages/AgentStatus.jsx b/webui/react-ui/src/pages/AgentStatus.jsx index 7f839a9..658aae8 100644 --- a/webui/react-ui/src/pages/AgentStatus.jsx +++ b/webui/react-ui/src/pages/AgentStatus.jsx @@ -1,13 +1,22 @@ import { useState, useEffect } from 'react'; import { useParams, Link } from 'react-router-dom'; +import hljs from 'highlight.js/lib/core'; +import json from 'highlight.js/lib/languages/json'; +import 'highlight.js/styles/monokai.css'; + +hljs.registerLanguage('json', json); function AgentStatus() { + const [showStatus, setShowStatus] = useState(true); const { name } = useParams(); const [statusData, setStatusData] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const [_eventSource, setEventSource] = useState(null); - const [liveUpdates, setLiveUpdates] = useState([]); + // Store all observables by id + const [observableMap, setObservableMap] = useState({}); + const [observableTree, setObservableTree] = useState([]); + const [expandedCards, setExpandedCards] = useState(new Map()); // Update document title useEffect(() => { @@ -39,17 +48,80 @@ function AgentStatus() { fetchStatusData(); + // Helper to build observable tree from map + function buildObservableTree(map) { + const nodes = Object.values(map); + const nodeMap = {}; + nodes.forEach(node => { nodeMap[node.id] = { ...node, children: [] }; }); + const roots = []; + nodes.forEach(node => { + if (!node.parent_id) { + roots.push(nodeMap[node.id]); + } else if (nodeMap[node.parent_id]) { + nodeMap[node.parent_id].children.push(nodeMap[node.id]); + } + }); + return roots; + } + + // Fetch initial observable history + const fetchObservables = async () => { + try { + const response = await fetch(`/api/agent/${name}/observables`); + if (!response.ok) return; + const data = await response.json(); + if (Array.isArray(data.History)) { + const map = {}; + data.History.forEach(obs => { + map[obs.id] = obs; + }); + setObservableMap(map); + setObservableTree(buildObservableTree(map)); + } + } catch (err) { + // Ignore errors for now + } + }; + fetchObservables(); + // Setup SSE connection for live updates const sse = new EventSource(`/sse/${name}`); setEventSource(sse); + sse.addEventListener('observable_update', (event) => { + const data = JSON.parse(event.data); + console.log(data); + setObservableMap(prevMap => { + const prev = prevMap[data.id] || {}; + const updated = { + ...prev, + ...data, + creation: data.creation, + progress: data.progress, + completion: data.completion, + // children are always built client-side + }; + const newMap = { ...prevMap, [data.id]: updated }; + setObservableTree(buildObservableTree(newMap)); + return newMap; + }); + }); + + // Listen for status events and append to statusData.History sse.addEventListener('status', (event) => { - try { - const data = JSON.parse(event.data); - setLiveUpdates(prev => [data, ...prev.slice(0, 19)]); // Keep last 20 updates - } catch (err) { - setLiveUpdates(prev => [event.data, ...prev.slice(0, 19)]); - } + const status = event.data; + setStatusData(prev => { + // If prev is null, start a new object + if (!prev || typeof prev !== 'object') { + return { History: [status] }; + } + // If History not present, add it + if (!Array.isArray(prev.History)) { + return { ...prev, History: [status] }; + } + // Otherwise, append + return { ...prev, History: [...prev.History, status] }; + }); }); sse.onerror = (err) => { @@ -69,7 +141,7 @@ function AgentStatus() { if (value === null || value === undefined) { return 'N/A'; } - + if (typeof value === 'object') { try { return JSON.stringify(value, null, 2); @@ -77,14 +149,14 @@ function AgentStatus() { return '[Complex Object]'; } } - + return String(value); }; if (loading) { return ( -
-
+
+

Loading agent status...

); @@ -92,56 +164,199 @@ function AgentStatus() { if (error) { return ( -
+

Error

{error}

- + Back to Agents
); } - // Combine live updates with history - const allUpdates = [...liveUpdates, ...(statusData?.History || [])]; - return ( -
-
-
-

- - - - Agent Status: {name} -

+
+

Agent Status: {name}

+
+ See what the agent is doing and thinking +
+ {error && ( +
+ {error}
-
+ )} + {loading &&
Loading...
} + {statusData && ( +
+
+
setShowStatus(prev => !prev)}> +

Current Status

+ +
+
+ Summary of the agent's thoughts and actions +
+ {showStatus && ( +
+ {(Array.isArray(statusData?.History) && statusData.History.length === 0) && ( +
No status history available.
+ )} + {Array.isArray(statusData?.History) && statusData.History.map((item, idx) => ( +
+ {/* Replace
tags with newlines, then render as pre-line */} + {typeof item === 'string' + ? item.replace(//gi, '\n') + : JSON.stringify(item)} +
+ ))} +
+ )} +
+ {observableTree.length > 0 && ( +
+

Observable Updates

+
+ Drill down into what the agent is doing and thinking when activated by a connector +
+
+ {observableTree.map((container, idx) => ( +
+
+
{ + const newExpanded = !expandedCards.get(container.id); + setExpandedCards(new Map(expandedCards).set(container.id, newExpanded)); + }} + > +
+ + + {container.name}#{container.id} + +
+
+ + {!container.completion && ( +
+ )} +
+
+
+ {container.children && container.children.length > 0 && ( -
- {/* Chat Messages */} -
- {allUpdates.length > 0 ? ( - allUpdates.map((item, index) => ( -
-
-

Agent Action:

-
-
- {index} - {formatValue(item)} +
+

Nested Observables

+ {container.children.map(child => { + const childKey = `child-${child.id}`; + const isExpanded = expandedCards.get(childKey); + return ( +
+
{ + const newExpanded = !expandedCards.get(childKey); + setExpandedCards(new Map(expandedCards).set(childKey, newExpanded)); + }} + > +
+ + + {child.name}#{child.id} + +
+
+ + {!child.completion && ( +
+ )} +
+
+
+ {child.creation && ( +
+
Creation:
+

+                                          
+
+
+ )} + {child.progress && child.progress.length > 0 && ( +
+
Progress:
+

+                                          
+
+
+ )} + {child.completion && ( +
+
Completion:
+

+                                          
+
+
+ )} +
+
+ ); + })} +
+ )} + {container.creation && ( +
+

Creation:

+

+                              
+
+
+ )} + {container.progress && container.progress.length > 0 && ( +
+

Progress:

+

+                              
+
+
+ )} + {container.completion && ( +
+

Completion:

+

+                              
+
+
+ )} +
-
+ ))}
- )) - ) : ( -
-

No status data available for this agent.

)}
-
+ )}
); } diff --git a/webui/routes.go b/webui/routes.go index 988ba07..6f647cb 100644 --- a/webui/routes.go +++ b/webui/routes.go @@ -241,13 +241,14 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) { entries := []string{} for _, h := range Reverse(history.Results()) { - entries = append(entries, fmt.Sprintf( - "Result: %v Action: %v Params: %v Reasoning: %v", - h.Result, - h.Action.Definition().Name, - h.Params, + entries = append(entries, fmt.Sprintf(`Reasoning: %s + Action taken: %+v + Parameters: %+v + Result: %s`, h.Reasoning, - )) + h.ActionCurrentState.Action.Definition().Name, + h.ActionCurrentState.Params, + h.Result)) } return c.JSON(fiber.Map{ @@ -256,6 +257,21 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) { }) }) + webapp.Get("/api/agent/:name/observables", func(c *fiber.Ctx) error { + name := c.Params("name") + agent := pool.GetAgent(name) + if agent == nil { + return c.Status(fiber.StatusNotFound).JSON(fiber.Map{ + "error": "Agent not found", + }) + } + + return c.JSON(fiber.Map{ + "Name": name, + "History": agent.Observer().History(), + }) + }) + webapp.Post("/settings/import", app.ImportAgent(pool)) webapp.Get("/settings/export/:name", app.ExportAgent(pool))