From 8e9b87bcb1fd5a2b1976d2a7e314688b992361a6 Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 26 Mar 2025 16:57:46 +0100 Subject: [PATCH 1/4] chore(Makefile): build react dist if missing Signed-off-by: mudler --- Makefile | 8 +++- services/connectors/slack.go | 66 ++++++++++++++++++++++++++++++--- services/connectors/telegram.go | 19 ++++++++-- webui/app.go | 2 +- 4 files changed, 83 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index e69f7f3..191e5f7 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ GOCMD?=go IMAGE_NAME?=webui +ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) prepare-tests: docker compose up -d @@ -13,12 +14,15 @@ tests: prepare-tests run-nokb: $(MAKE) run KBDISABLEINDEX=true +webui/react-ui/dist: + docker run --entrypoint /bin/bash -v $(ROOT_DIR):/app oven/bun:1 -c "cd /app/webui/react-ui && bun install && bun run build" + .PHONY: build -build: +build: webui/react-ui/dist $(GOCMD) build -o localagent ./ .PHONY: run -run: +run: webui/react-ui/dist $(GOCMD) run ./ build-image: diff --git a/services/connectors/slack.go b/services/connectors/slack.go index 7bf19fc..5521369 100644 --- a/services/connectors/slack.go +++ b/services/connectors/slack.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/base64" "fmt" - "io/ioutil" "log" "os" "strings" @@ -38,6 +37,11 @@ type Slack struct { placeholderMutex sync.RWMutex apiClient *slack.Client + // Track active jobs for cancellation + activeJobs map[string]bool // map[channelID]bool to track if a channel has active processing + activeJobsMutex sync.RWMutex + agent *agent.Agent // Reference to the agent to call StopAction + conversationTracker *ConversationTracker[string] } @@ -57,13 +61,20 @@ func NewSlack(config map[string]string) *Slack { alwaysReply: config["alwaysReply"] == "true", conversationTracker: NewConversationTracker[string](duration), placeholders: make(map[string]string), + activeJobs: make(map[string]bool), } } func (t *Slack) AgentResultCallback() func(state types.ActionState) { return func(state types.ActionState) { - // The final result callback is intentionally empty as we're handling - // the final update in the handleMention function directly + // Mark the job as completed when we get the final result + if state.ActionCurrentState.Job != nil && state.ActionCurrentState.Job.Metadata != nil { + if channel, ok := state.ActionCurrentState.Job.Metadata["channel"].(string); ok && channel != "" { + t.activeJobsMutex.Lock() + delete(t.activeJobs, channel) + t.activeJobsMutex.Unlock() + } + } } } @@ -102,6 +113,23 @@ func (t *Slack) AgentReasoningCallback() func(state types.ActionCurrentState) bo } } +// cancelActiveJobForChannel cancels any active job for the given channel +func (t *Slack) cancelActiveJobForChannel(channelID string) { + t.activeJobsMutex.RLock() + isActive := t.activeJobs[channelID] + t.activeJobsMutex.RUnlock() + + if isActive && t.agent != nil { + xlog.Info(fmt.Sprintf("Cancelling active job for channel: %s", channelID)) + t.agent.StopAction() + + // Mark the job as inactive + t.activeJobsMutex.Lock() + delete(t.activeJobs, channelID) + t.activeJobsMutex.Unlock() + } +} + func cleanUpUsernameFromMessage(message string, b *slack.AuthTestResponse) string { cleaned := strings.ReplaceAll(message, "<@"+b.UserID+">", "") cleaned = strings.ReplaceAll(cleaned, "<@"+b.BotID+">", "") @@ -214,11 +242,26 @@ func (t *Slack) handleChannelMessage( return } + // Cancel any active job for this channel before starting a new one + t.cancelActiveJobForChannel(ev.Channel) + currentConv := t.conversationTracker.GetConversation(t.channelID) message := replaceUserIDsWithNamesInMessage(api, cleanUpUsernameFromMessage(ev.Text, b)) go func() { + // Mark this channel as having an active job + t.activeJobsMutex.Lock() + t.activeJobs[ev.Channel] = true + t.activeJobsMutex.Unlock() + + defer func() { + // Mark job as complete + t.activeJobsMutex.Lock() + delete(t.activeJobs, ev.Channel) + t.activeJobsMutex.Unlock() + }() + imageBytes, mimeType := scanImagesInMessages(api, ev) agentOptions := []types.JobOption{ @@ -259,6 +302,12 @@ func (t *Slack) handleChannelMessage( agentOptions = append(agentOptions, types.WithConversationHistory(currentConv)) + // Add channel to metadata for tracking + metadata := map[string]interface{}{ + "channel": ev.Channel, + } + agentOptions = append(agentOptions, types.WithMetadata(metadata)) + res := a.Ask( agentOptions..., ) @@ -292,9 +341,6 @@ func (t *Slack) handleChannelMessage( // Function to download the image from a URL and encode it to base64 func encodeImageFromURL(imageBytes bytes.Buffer) (string, error) { - // WRITE THIS SOMEWHERE - ioutil.WriteFile("image.jpg", imageBytes.Bytes(), 0644) - // Encode the image data to base64 base64Image := base64.StdEncoding.EncodeToString(imageBytes.Bytes()) return base64Image, nil @@ -639,10 +685,18 @@ func (t *Slack) handleMention( } func (t *Slack) Start(a *agent.Agent) { +<<<<<<< Updated upstream +<<<<<<< Updated upstream postMessageParams := slack.PostMessageParameters{ LinkNames: 1, Markdown: true, } +======= +>>>>>>> Stashed changes +======= +>>>>>>> Stashed changes + // Store the agent reference for use in cancellation + t.agent = a api := slack.New( t.botToken, diff --git a/services/connectors/telegram.go b/services/connectors/telegram.go index 66e0e20..bfa620b 100644 --- a/services/connectors/telegram.go +++ b/services/connectors/telegram.go @@ -3,6 +3,7 @@ package connectors import ( "context" "errors" + "net/http" "os" "os/signal" "slices" @@ -99,12 +100,24 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, // coming from the gen image actions if imagesUrls, exists := res.Metadata[actions.MetadataImages]; exists { for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { - b.SendPhoto(ctx, &bot.SendPhotoParams{ + xlog.Debug("Sending photo", "url", url) + + resp, err := http.Get(url) + if err != nil { + xlog.Error("Error downloading image", "error", err.Error()) + continue + } + defer resp.Body.Close() + _, err = b.SendPhoto(ctx, &bot.SendPhotoParams{ ChatID: update.Message.Chat.ID, - Photo: models.InputFileString{ - Data: url, + Photo: models.InputFileUpload{ + Filename: "image.jpg", + Data: resp.Body, }, }) + if err != nil { + xlog.Error("Error sending photo", "error", err.Error()) + } } } diff --git a/webui/app.go b/webui/app.go index 364f71f..9f8fd3f 100644 --- a/webui/app.go +++ b/webui/app.go @@ -138,7 +138,7 @@ func (a *App) Create(pool *state.AgentPool) func(c *fiber.Ctx) error { if err := pool.CreateAgent(config.Name, &config); err != nil { return errorJSONMessage(c, err.Error()) } - + return statusJSONMessage(c, "ok") } } From d5df14a714eea6fcc76486a8189f708eac58abd4 Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 26 Mar 2025 16:58:11 +0100 Subject: [PATCH 2/4] fix(planning): don't loose results Signed-off-by: mudler --- core/agent/actions.go | 14 +++++++------- core/agent/agent.go | 4 +++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/agent/actions.go b/core/agent/actions.go index 9b7279c..fe02fbe 100644 --- a/core/agent/actions.go +++ b/core/agent/actions.go @@ -174,17 +174,17 @@ func (a *Agent) generateParameters(ctx context.Context, pickTemplate string, act ) } -func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction types.Action, actionParams types.ActionParams, reasoning string, pickTemplate string, conv Messages) error { +func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction types.Action, actionParams types.ActionParams, reasoning string, pickTemplate string, conv Messages) (Messages, error) { // Planning: run all the actions in sequence if !chosenAction.Definition().Name.Is(action.PlanActionName) { xlog.Debug("no plan action") - return nil + return conv, nil } xlog.Debug("[planning]...") planResult := action.PlanResult{} if err := actionParams.Unmarshal(&planResult); err != nil { - return fmt.Errorf("error unmarshalling plan result: %w", err) + return conv, fmt.Errorf("error unmarshalling plan result: %w", err) } stateResult := types.ActionState{ @@ -207,7 +207,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction } if len(planResult.Subtasks) == 0 { - return fmt.Errorf("no subtasks") + return conv, fmt.Errorf("no subtasks") } // Execute all subtasks in sequence @@ -223,7 +223,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction params, err := a.generateParameters(ctx, pickTemplate, subTaskAction, conv, subTaskReasoning) if err != nil { - return fmt.Errorf("error generating action's parameters: %w", err) + return conv, fmt.Errorf("error generating action's parameters: %w", err) } actionParams = params.actionParams @@ -252,7 +252,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction result, err := a.runAction(subTaskAction, actionParams) if err != nil { - return fmt.Errorf("error running action: %w", err) + return conv, fmt.Errorf("error running action: %w", err) } stateResult := types.ActionState{ @@ -270,7 +270,7 @@ func (a *Agent) handlePlanning(ctx context.Context, job *types.Job, chosenAction conv = a.addFunctionResultToConversation(subTaskAction, actionParams, result, conv) } - return nil + return conv, nil } func (a *Agent) availableActions() types.Actions { diff --git a/core/agent/agent.go b/core/agent/agent.go index 5780bf9..57d9adb 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -528,7 +528,9 @@ func (a *Agent) consumeJob(job *types.Job, role string) { return } - if err := a.handlePlanning(ctx, job, chosenAction, actionParams, reasoning, pickTemplate, conv); err != nil { + var err error + conv, err = a.handlePlanning(ctx, job, chosenAction, actionParams, reasoning, pickTemplate, conv) + if err != nil { job.Result.Finish(fmt.Errorf("error running action: %w", err)) return } From 9d6b81d9c24be9f42b1e1c123945889b58aabf74 Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 26 Mar 2025 16:58:25 +0100 Subject: [PATCH 3/4] fix(slack): track user messages when writing on channel Signed-off-by: mudler --- services/connectors/slack.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/services/connectors/slack.go b/services/connectors/slack.go index 5521369..61631e0 100644 --- a/services/connectors/slack.go +++ b/services/connectors/slack.go @@ -300,6 +300,10 @@ func (t *Slack) handleChannelMessage( }) } + t.conversationTracker.AddMessage( + t.channelID, currentConv[len(currentConv)-1], + ) + agentOptions = append(agentOptions, types.WithConversationHistory(currentConv)) // Add channel to metadata for tracking @@ -685,16 +689,12 @@ func (t *Slack) handleMention( } func (t *Slack) Start(a *agent.Agent) { -<<<<<<< Updated upstream -<<<<<<< Updated upstream + postMessageParams := slack.PostMessageParameters{ LinkNames: 1, Markdown: true, } -======= ->>>>>>> Stashed changes -======= ->>>>>>> Stashed changes + // Store the agent reference for use in cancellation t.agent = a From dd6739cbbf2ff0a19880c1a74a6a5574509a396c Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 26 Mar 2025 17:11:09 +0100 Subject: [PATCH 4/4] fix: consistently track user message in connector Signed-off-by: mudler --- services/connectors/discord.go | 16 ++++++++++------ services/connectors/irc.go | 6 ++++++ services/connectors/telegram.go | 8 ++++++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/services/connectors/discord.go b/services/connectors/discord.go index 4ed17e5..15b5e08 100644 --- a/services/connectors/discord.go +++ b/services/connectors/discord.go @@ -122,6 +122,11 @@ func (d *Discord) handleChannelMessage(a *agent.Agent, s *discordgo.Session, m * conv := d.conversationTracker.GetConversation(m.ChannelID) + d.conversationTracker.AddMessage(m.ChannelID, openai.ChatCompletionMessage{ + Role: "user", + Content: m.Content, + }) + jobResult := a.Ask( types.WithConversationHistory(conv), ) @@ -131,16 +136,15 @@ func (d *Discord) handleChannelMessage(a *agent.Agent, s *discordgo.Session, m * return } + d.conversationTracker.AddMessage(m.ChannelID, openai.ChatCompletionMessage{ + Role: "assistant", + Content: jobResult.Response, + }) + _, err := s.ChannelMessageSend(m.ChannelID, jobResult.Response) if err != nil { xlog.Info("error sending message,", err) } - - d.conversationTracker.AddMessage(m.ChannelID, openai.ChatCompletionMessage{ - Role: "user", - Content: m.Content, - }) - } // This function will be called (due to AddHandler above) every time a new diff --git a/services/connectors/irc.go b/services/connectors/irc.go index 026aea0..58bc54e 100644 --- a/services/connectors/irc.go +++ b/services/connectors/irc.go @@ -122,6 +122,12 @@ func (i *IRC) Start(a *agent.Agent) { }, ) + // Update the conversation history + i.conversationTracker.AddMessage(channel, openai.ChatCompletionMessage{ + Content: cleanedMessage, + Role: "user", + }) + res := a.Ask( types.WithConversationHistory(conv), ) diff --git a/services/connectors/telegram.go b/services/connectors/telegram.go index bfa620b..65e378b 100644 --- a/services/connectors/telegram.go +++ b/services/connectors/telegram.go @@ -67,6 +67,14 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, Role: "user", }) + t.conversationTracker.AddMessage( + update.Message.From.ID, + openai.ChatCompletionMessage{ + Content: update.Message.Text, + Role: "user", + }, + ) + xlog.Info("New message", "username", username, "conversation", currentConv) res := a.Ask( types.WithConversationHistory(currentConv),