feat(ui): Structured observability/status view (#40)
* refactor(ui): Make message status SSE name more specific Signed-off-by: Richard Palethorpe <io@richiejp.com> * feat(ui): Add structured observability events Signed-off-by: Richard Palethorpe <io@richiejp.com> --------- Signed-off-by: Richard Palethorpe <io@richiejp.com>
This commit is contained in:
committed by
GitHub
parent
70e749b53a
commit
b5a12a1da6
@@ -30,7 +30,7 @@ type Agent struct {
|
||||
jobQueue chan *types.Job
|
||||
context *types.ActionContext
|
||||
|
||||
currentState *action.AgentInternalState
|
||||
currentState *types.AgentInternalState
|
||||
|
||||
selfEvaluationInProgress bool
|
||||
pause bool
|
||||
@@ -41,6 +41,8 @@ type Agent struct {
|
||||
|
||||
subscriberMutex sync.Mutex
|
||||
newMessagesSubscribers []func(openai.ChatCompletionMessage)
|
||||
|
||||
observer Observer
|
||||
}
|
||||
|
||||
type RAGDB interface {
|
||||
@@ -69,12 +71,17 @@ func New(opts ...Option) (*Agent, error) {
|
||||
options: options,
|
||||
client: client,
|
||||
Character: options.character,
|
||||
currentState: &action.AgentInternalState{},
|
||||
currentState: &types.AgentInternalState{},
|
||||
context: types.NewActionContext(ctx, cancel),
|
||||
newConversations: make(chan openai.ChatCompletionMessage),
|
||||
newMessagesSubscribers: options.newConversationsSubscribers,
|
||||
}
|
||||
|
||||
// Initialize observer if provided
|
||||
if options.observer != nil {
|
||||
a.observer = options.observer
|
||||
}
|
||||
|
||||
if a.options.statefile != "" {
|
||||
if _, err := os.Stat(a.options.statefile); err == nil {
|
||||
if err = a.LoadState(a.options.statefile); err != nil {
|
||||
@@ -146,6 +153,14 @@ func (a *Agent) Ask(opts ...types.JobOption) *types.JobResult {
|
||||
xlog.Debug("Agent has finished being asked", "agent", a.Character.Name)
|
||||
}()
|
||||
|
||||
if a.observer != nil {
|
||||
obs := a.observer.NewObservable()
|
||||
obs.Name = "job"
|
||||
obs.Icon = "plug"
|
||||
a.observer.Update(*obs)
|
||||
opts = append(opts, types.WithObservable(obs))
|
||||
}
|
||||
|
||||
return a.Execute(types.NewJob(
|
||||
append(
|
||||
opts,
|
||||
@@ -163,6 +178,20 @@ func (a *Agent) Execute(j *types.Job) *types.JobResult {
|
||||
xlog.Debug("Agent has finished", "agent", a.Character.Name)
|
||||
}()
|
||||
|
||||
if j.Obs != nil {
|
||||
j.Result.AddFinalizer(func(ccm []openai.ChatCompletionMessage) {
|
||||
j.Obs.Completion = &types.Completion{
|
||||
Conversation: ccm,
|
||||
}
|
||||
|
||||
if j.Result.Error != nil {
|
||||
j.Obs.Completion.Error = j.Result.Error.Error()
|
||||
}
|
||||
|
||||
a.observer.Update(*j.Obs)
|
||||
})
|
||||
}
|
||||
|
||||
a.Enqueue(j)
|
||||
return j.Result.WaitResult()
|
||||
}
|
||||
@@ -237,41 +266,90 @@ func (a *Agent) Memory() RAGDB {
|
||||
return a.options.ragdb
|
||||
}
|
||||
|
||||
func (a *Agent) runAction(ctx context.Context, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) {
|
||||
func (a *Agent) runAction(job *types.Job, chosenAction types.Action, params types.ActionParams) (result types.ActionResult, err error) {
|
||||
var obs *types.Observable
|
||||
if job.Obs != nil {
|
||||
obs = a.observer.NewObservable()
|
||||
obs.Name = "action"
|
||||
obs.Icon = "bolt"
|
||||
obs.ParentID = job.Obs.ID
|
||||
obs.Creation = &types.Creation{
|
||||
FunctionDefinition: chosenAction.Definition().ToFunctionDefinition(),
|
||||
FunctionParams: params,
|
||||
}
|
||||
a.observer.Update(*obs)
|
||||
}
|
||||
|
||||
xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String())
|
||||
|
||||
for _, act := range a.availableActions() {
|
||||
if act.Definition().Name == chosenAction.Definition().Name {
|
||||
res, err := act.Run(ctx, params)
|
||||
res, err := act.Run(job.GetContext(), params)
|
||||
if err != nil {
|
||||
if obs != nil {
|
||||
obs.Completion = &types.Completion{
|
||||
Error: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return types.ActionResult{}, fmt.Errorf("error running action: %w", err)
|
||||
}
|
||||
|
||||
if obs != nil {
|
||||
obs.Progress = append(obs.Progress, types.Progress{
|
||||
ActionResult: res.Result,
|
||||
})
|
||||
a.observer.Update(*obs)
|
||||
}
|
||||
|
||||
result = res
|
||||
}
|
||||
}
|
||||
|
||||
xlog.Info("[runAction] Running action", "action", chosenAction.Definition().Name, "agent", a.Character.Name, "params", params.String())
|
||||
|
||||
if chosenAction.Definition().Name.Is(action.StateActionName) {
|
||||
// We need to store the result in the state
|
||||
state := action.AgentInternalState{}
|
||||
state := types.AgentInternalState{}
|
||||
|
||||
err = params.Unmarshal(&state)
|
||||
if err != nil {
|
||||
return types.ActionResult{}, fmt.Errorf("error unmarshalling state of the agent: %w", err)
|
||||
werr := fmt.Errorf("error unmarshalling state of the agent: %w", err)
|
||||
if obs != nil {
|
||||
obs.Completion = &types.Completion{
|
||||
Error: werr.Error(),
|
||||
}
|
||||
}
|
||||
return types.ActionResult{}, werr
|
||||
}
|
||||
// update the current state with the one we just got from the action
|
||||
a.currentState = &state
|
||||
if obs != nil {
|
||||
obs.Progress = append(obs.Progress, types.Progress{
|
||||
AgentState: &state,
|
||||
})
|
||||
a.observer.Update(*obs)
|
||||
}
|
||||
|
||||
// update the state file
|
||||
if a.options.statefile != "" {
|
||||
if err := a.SaveState(a.options.statefile); err != nil {
|
||||
if obs != nil {
|
||||
obs.Completion = &types.Completion{
|
||||
Error: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return types.ActionResult{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
xlog.Debug("[runAction] Action result", "action", chosenAction.Definition().Name, "params", params.String(), "result", result.Result)
|
||||
|
||||
|
||||
if obs != nil {
|
||||
obs.MakeLastProgressCompletion()
|
||||
a.observer.Update(*obs)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -468,7 +546,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
|
||||
chosenAction = *action
|
||||
reasoning = reason
|
||||
if params == nil {
|
||||
p, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
|
||||
p, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries)
|
||||
if err != nil {
|
||||
xlog.Error("Error generating parameters, trying again", "error", err)
|
||||
// try again
|
||||
@@ -483,7 +561,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
|
||||
job.ResetNextAction()
|
||||
} else {
|
||||
var err error
|
||||
chosenAction, actionParams, reasoning, err = a.pickAction(job.GetContext(), pickTemplate, conv, maxRetries)
|
||||
chosenAction, actionParams, reasoning, err = a.pickAction(job, pickTemplate, conv, maxRetries)
|
||||
if err != nil {
|
||||
xlog.Error("Error picking action", "error", err)
|
||||
job.Result.Finish(err)
|
||||
@@ -557,7 +635,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
|
||||
"reasoning", reasoning,
|
||||
)
|
||||
|
||||
params, err := a.generateParameters(job.GetContext(), pickTemplate, chosenAction, conv, reasoning, maxRetries)
|
||||
params, err := a.generateParameters(job, pickTemplate, chosenAction, conv, reasoning, maxRetries)
|
||||
if err != nil {
|
||||
xlog.Error("Error generating parameters, trying again", "error", err)
|
||||
// try again
|
||||
@@ -652,7 +730,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
|
||||
}
|
||||
|
||||
if !chosenAction.Definition().Name.Is(action.PlanActionName) {
|
||||
result, err := a.runAction(job.GetContext(), chosenAction, actionParams)
|
||||
result, err := a.runAction(job, chosenAction, actionParams)
|
||||
if err != nil {
|
||||
//job.Result.Finish(fmt.Errorf("error running action: %w", err))
|
||||
//return
|
||||
@@ -677,7 +755,7 @@ func (a *Agent) consumeJob(job *types.Job, role string) {
|
||||
}
|
||||
|
||||
// given the result, we can now re-evaluate the conversation
|
||||
followingAction, followingParams, reasoning, err := a.pickAction(job.GetContext(), reEvaluationTemplate, conv, maxRetries)
|
||||
followingAction, followingParams, reasoning, err := a.pickAction(job, reEvaluationTemplate, conv, maxRetries)
|
||||
if err != nil {
|
||||
job.Result.Conversation = conv
|
||||
job.Result.Finish(fmt.Errorf("error picking action: %w", err))
|
||||
@@ -955,3 +1033,7 @@ func (a *Agent) loop(timer *time.Timer, job *types.Job) {
|
||||
xlog.Debug("Agent is consuming a job", "agent", a.Character.Name, "job", job)
|
||||
a.consumeJob(job, UserRole)
|
||||
}
|
||||
|
||||
func (a *Agent) Observer() Observer {
|
||||
return a.observer
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user