diff --git a/Makefile b/Makefile index e69f7f3..191e5f7 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ GOCMD?=go IMAGE_NAME?=webui +ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) prepare-tests: docker compose up -d @@ -13,12 +14,15 @@ tests: prepare-tests run-nokb: $(MAKE) run KBDISABLEINDEX=true +webui/react-ui/dist: + docker run --entrypoint /bin/bash -v $(ROOT_DIR):/app oven/bun:1 -c "cd /app/webui/react-ui && bun install && bun run build" + .PHONY: build -build: +build: webui/react-ui/dist $(GOCMD) build -o localagent ./ .PHONY: run -run: +run: webui/react-ui/dist $(GOCMD) run ./ build-image: diff --git a/services/connectors/slack.go b/services/connectors/slack.go index 7bf19fc..5521369 100644 --- a/services/connectors/slack.go +++ b/services/connectors/slack.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/base64" "fmt" - "io/ioutil" "log" "os" "strings" @@ -38,6 +37,11 @@ type Slack struct { placeholderMutex sync.RWMutex apiClient *slack.Client + // Track active jobs for cancellation + activeJobs map[string]bool // map[channelID]bool to track if a channel has active processing + activeJobsMutex sync.RWMutex + agent *agent.Agent // Reference to the agent to call StopAction + conversationTracker *ConversationTracker[string] } @@ -57,13 +61,20 @@ func NewSlack(config map[string]string) *Slack { alwaysReply: config["alwaysReply"] == "true", conversationTracker: NewConversationTracker[string](duration), placeholders: make(map[string]string), + activeJobs: make(map[string]bool), } } func (t *Slack) AgentResultCallback() func(state types.ActionState) { return func(state types.ActionState) { - // The final result callback is intentionally empty as we're handling - // the final update in the handleMention function directly + // Mark the job as completed when we get the final result + if state.ActionCurrentState.Job != nil && state.ActionCurrentState.Job.Metadata != nil { + if channel, ok := state.ActionCurrentState.Job.Metadata["channel"].(string); ok && channel != "" { + t.activeJobsMutex.Lock() + delete(t.activeJobs, channel) + t.activeJobsMutex.Unlock() + } + } } } @@ -102,6 +113,23 @@ func (t *Slack) AgentReasoningCallback() func(state types.ActionCurrentState) bo } } +// cancelActiveJobForChannel cancels any active job for the given channel +func (t *Slack) cancelActiveJobForChannel(channelID string) { + t.activeJobsMutex.RLock() + isActive := t.activeJobs[channelID] + t.activeJobsMutex.RUnlock() + + if isActive && t.agent != nil { + xlog.Info(fmt.Sprintf("Cancelling active job for channel: %s", channelID)) + t.agent.StopAction() + + // Mark the job as inactive + t.activeJobsMutex.Lock() + delete(t.activeJobs, channelID) + t.activeJobsMutex.Unlock() + } +} + func cleanUpUsernameFromMessage(message string, b *slack.AuthTestResponse) string { cleaned := strings.ReplaceAll(message, "<@"+b.UserID+">", "") cleaned = strings.ReplaceAll(cleaned, "<@"+b.BotID+">", "") @@ -214,11 +242,26 @@ func (t *Slack) handleChannelMessage( return } + // Cancel any active job for this channel before starting a new one + t.cancelActiveJobForChannel(ev.Channel) + currentConv := t.conversationTracker.GetConversation(t.channelID) message := replaceUserIDsWithNamesInMessage(api, cleanUpUsernameFromMessage(ev.Text, b)) go func() { + // Mark this channel as having an active job + t.activeJobsMutex.Lock() + t.activeJobs[ev.Channel] = true + t.activeJobsMutex.Unlock() + + defer func() { + // Mark job as complete + t.activeJobsMutex.Lock() + delete(t.activeJobs, ev.Channel) + t.activeJobsMutex.Unlock() + }() + imageBytes, mimeType := scanImagesInMessages(api, ev) agentOptions := []types.JobOption{ @@ -259,6 +302,12 @@ func (t *Slack) handleChannelMessage( agentOptions = append(agentOptions, types.WithConversationHistory(currentConv)) + // Add channel to metadata for tracking + metadata := map[string]interface{}{ + "channel": ev.Channel, + } + agentOptions = append(agentOptions, types.WithMetadata(metadata)) + res := a.Ask( agentOptions..., ) @@ -292,9 +341,6 @@ func (t *Slack) handleChannelMessage( // Function to download the image from a URL and encode it to base64 func encodeImageFromURL(imageBytes bytes.Buffer) (string, error) { - // WRITE THIS SOMEWHERE - ioutil.WriteFile("image.jpg", imageBytes.Bytes(), 0644) - // Encode the image data to base64 base64Image := base64.StdEncoding.EncodeToString(imageBytes.Bytes()) return base64Image, nil @@ -639,10 +685,18 @@ func (t *Slack) handleMention( } func (t *Slack) Start(a *agent.Agent) { +<<<<<<< Updated upstream +<<<<<<< Updated upstream postMessageParams := slack.PostMessageParameters{ LinkNames: 1, Markdown: true, } +======= +>>>>>>> Stashed changes +======= +>>>>>>> Stashed changes + // Store the agent reference for use in cancellation + t.agent = a api := slack.New( t.botToken, diff --git a/services/connectors/telegram.go b/services/connectors/telegram.go index 66e0e20..bfa620b 100644 --- a/services/connectors/telegram.go +++ b/services/connectors/telegram.go @@ -3,6 +3,7 @@ package connectors import ( "context" "errors" + "net/http" "os" "os/signal" "slices" @@ -99,12 +100,24 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, // coming from the gen image actions if imagesUrls, exists := res.Metadata[actions.MetadataImages]; exists { for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { - b.SendPhoto(ctx, &bot.SendPhotoParams{ + xlog.Debug("Sending photo", "url", url) + + resp, err := http.Get(url) + if err != nil { + xlog.Error("Error downloading image", "error", err.Error()) + continue + } + defer resp.Body.Close() + _, err = b.SendPhoto(ctx, &bot.SendPhotoParams{ ChatID: update.Message.Chat.ID, - Photo: models.InputFileString{ - Data: url, + Photo: models.InputFileUpload{ + Filename: "image.jpg", + Data: resp.Body, }, }) + if err != nil { + xlog.Error("Error sending photo", "error", err.Error()) + } } } diff --git a/webui/app.go b/webui/app.go index 364f71f..9f8fd3f 100644 --- a/webui/app.go +++ b/webui/app.go @@ -138,7 +138,7 @@ func (a *App) Create(pool *state.AgentPool) func(c *fiber.Ctx) error { if err := pool.CreateAgent(config.Name, &config); err != nil { return errorJSONMessage(c, err.Error()) } - + return statusJSONMessage(c, "ok") } }