From 37aa532cc2bf077522d964eac95473246e205ff1 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 26 May 2025 09:40:41 +0200 Subject: [PATCH] feat(telegram): Add support for groups (#183) Add support for groups Signed-off-by: mudler --- README.md | 19 ++- services/connectors/telegram.go | 278 +++++++++++++++++++++++++++++++- 2 files changed, 291 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 206c5ac..2b549e8 100644 --- a/README.md +++ b/README.md @@ -491,9 +491,26 @@ Get a token from @botfather, then: ```json { - "token": "your-bot-father-token" + "token": "your-bot-father-token", + "group_mode": "true", + "mention_only": "true", + "admins": "username1,username2" } ``` + +Configuration options: +- `token`: Your bot token from BotFather +- `group_mode`: Enable/disable group chat functionality +- `mention_only`: When enabled, bot only responds when mentioned in groups +- `admins`: Comma-separated list of Telegram usernames allowed to use the bot in private chats +- `channel_id`: Optional channel ID for the bot to send messages to + +> **Important**: For group functionality to work properly: +> 1. Go to @BotFather +> 2. Select your bot +> 3. Go to "Bot Settings" > "Group Privacy" +> 4. Select "Turn off" to allow the bot to read all messages in groups +> 5. Restart your bot after changing this setting
diff --git a/services/connectors/telegram.go b/services/connectors/telegram.go index 0884c68..8d9e406 100644 --- a/services/connectors/telegram.go +++ b/services/connectors/telegram.go @@ -44,7 +44,240 @@ type Telegram struct { activeJobs map[int64][]*types.Job // map[chatID]bool to track if a chat has active processing activeJobsMutex sync.RWMutex - channelID string + channelID string + groupMode bool + mentionOnly bool +} + +// isBotMentioned checks if the bot is mentioned in the message +func (t *Telegram) isBotMentioned(message string, botUsername string) bool { + return strings.Contains(message, "@"+botUsername) +} + +// handleGroupMessage handles messages in group chats +func (t *Telegram) handleGroupMessage(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) { + xlog.Debug("Handling group message", "update", update) + if !t.groupMode { + xlog.Debug("Group mode is disabled, skipping group message", "chatID", update.Message.Chat.ID) + return + } + + // Get bot info to check username + botInfo, err := b.GetMe(ctx) + if err != nil { + xlog.Error("Error getting bot info", "error", err) + return + } + + // Skip messages from ourselves + if update.Message.From.Username == botInfo.Username { + return + } + + // If mention-only mode is enabled, check if bot is mentioned + if t.mentionOnly && !t.isBotMentioned(update.Message.Text, botInfo.Username) { + xlog.Debug("Bot not mentioned in message, skipping", "chatID", update.Message.Chat.ID) + return + } + + // Cancel any active job for this chat before starting a new one + t.cancelActiveJobForChat(update.Message.Chat.ID) + + currentConv := a.SharedState().ConversationTracker.GetConversation(fmt.Sprintf("telegram:%d", update.Message.Chat.ID)) + + // Clean up the message by removing bot mentions + message := strings.ReplaceAll(update.Message.Text, "@"+botInfo.Username, "") + message = strings.TrimSpace(message) + + // Send initial placeholder message + msg, err := b.SendMessage(ctx, &bot.SendMessageParams{ + ChatID: update.Message.Chat.ID, + Text: bot.EscapeMarkdown(telegramThinkingMessage), + ParseMode: models.ParseModeMarkdown, + ReplyParameters: &models.ReplyParameters{ + MessageID: update.Message.ID, + }, + }) + if err != nil { + xlog.Error("Error sending initial message", "error", err) + return + } + + // Store the UUID->placeholder message mapping + jobUUID := fmt.Sprintf("%d", msg.ID) + + t.placeholderMutex.Lock() + t.placeholders[jobUUID] = msg.ID + t.placeholderMutex.Unlock() + + // Add chat ID to metadata for tracking + metadata := map[string]interface{}{ + "chatID": update.Message.Chat.ID, + } + + // Handle images if present + if len(update.Message.Photo) > 0 { + // Get the largest photo + photo := update.Message.Photo[len(update.Message.Photo)-1] + + // Download the photo + file, err := b.GetFile(ctx, &bot.GetFileParams{ + FileID: photo.FileID, + }) + if err != nil { + xlog.Error("Error getting file", "error", err) + } else { + // Download the file content + resp, err := http.Get(file.FilePath) + if err != nil { + xlog.Error("Error downloading file", "error", err) + } else { + defer resp.Body.Close() + imageBytes, err := io.ReadAll(resp.Body) + if err != nil { + xlog.Error("Error reading image", "error", err) + } else { + // Encode to base64 + imgBase64 := base64.StdEncoding.EncodeToString(imageBytes) + + // Add to conversation as multi-content message + currentConv = append(currentConv, openai.ChatCompletionMessage{ + Role: "user", + MultiContent: []openai.ChatMessagePart{ + { + Text: message, + Type: openai.ChatMessagePartTypeText, + }, + { + Type: openai.ChatMessagePartTypeImageURL, + ImageURL: &openai.ChatMessageImageURL{ + URL: fmt.Sprintf("data:image/jpeg;base64,%s", imgBase64), + }, + }, + }, + }) + } + } + } + } else { + currentConv = append(currentConv, openai.ChatCompletionMessage{ + Content: message, + Role: "user", + }) + } + + a.SharedState().ConversationTracker.AddMessage( + fmt.Sprintf("telegram:%d", update.Message.Chat.ID), + currentConv[len(currentConv)-1], + ) + + // Create a new job with the conversation history and metadata + job := types.NewJob( + types.WithConversationHistory(currentConv), + types.WithUUID(jobUUID), + types.WithMetadata(metadata), + ) + + // Mark this chat as having an active job + t.activeJobsMutex.Lock() + t.activeJobs[update.Message.Chat.ID] = append(t.activeJobs[update.Message.Chat.ID], job) + t.activeJobsMutex.Unlock() + + defer func() { + // Mark job as complete + t.activeJobsMutex.Lock() + job.Cancel() + for i, j := range t.activeJobs[update.Message.Chat.ID] { + if j.UUID == job.UUID { + t.activeJobs[update.Message.Chat.ID] = append(t.activeJobs[update.Message.Chat.ID][:i], t.activeJobs[update.Message.Chat.ID][i+1:]...) + break + } + } + t.activeJobsMutex.Unlock() + + // Clean up the placeholder map + t.placeholderMutex.Lock() + delete(t.placeholders, jobUUID) + t.placeholderMutex.Unlock() + }() + + res := a.Ask( + types.WithConversationHistory(currentConv), + types.WithUUID(jobUUID), + types.WithMetadata(metadata), + ) + + if res.Response == "" { + xlog.Error("Empty response from agent") + _, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{ + ChatID: update.Message.Chat.ID, + MessageID: msg.ID, + Text: "there was an internal error. try again!", + }) + if err != nil { + xlog.Error("Error updating error message", "error", err) + } + return + } + + a.SharedState().ConversationTracker.AddMessage( + fmt.Sprintf("telegram:%d", update.Message.Chat.ID), + openai.ChatCompletionMessage{ + Content: res.Response, + Role: "assistant", + }, + ) + + // Handle any multimedia content in the response and collect URLs + urls, err := t.handleMultimediaContent(ctx, update.Message.Chat.ID, res) + if err != nil { + xlog.Error("Error handling multimedia content", "error", err) + } + + // Update the message with the final response + formattedResponse := formatResponseWithURLs(res.Response, urls) + + // Split the message if it's too long + messages := xstrings.SplitParagraph(formattedResponse, telegramMaxMessageLength) + + if len(messages) == 0 { + _, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{ + ChatID: update.Message.Chat.ID, + MessageID: msg.ID, + Text: "there was an internal error. try again!", + }) + if err != nil { + xlog.Error("Error updating error message", "error", err) + } + return + } + + // Update the first message + _, err = b.EditMessageText(ctx, &bot.EditMessageTextParams{ + ChatID: update.Message.Chat.ID, + MessageID: msg.ID, + Text: messages[0], + ParseMode: models.ParseModeMarkdown, + }) + if err != nil { + xlog.Error("Error updating message", "error", err) + return + } + + // Send additional chunks as new messages + for i := 1; i < len(messages); i++ { + _, err = b.SendMessage(ctx, &bot.SendMessageParams{ + ChatID: update.Message.Chat.ID, + Text: messages[i], + ParseMode: models.ParseModeMarkdown, + ReplyParameters: &models.ReplyParameters{ + MessageID: update.Message.ID, + }, + }) + if err != nil { + xlog.Error("Error sending additional message", "error", err) + } + } } // Send any text message to the bot after the bot has been started @@ -212,10 +445,14 @@ func formatResponseWithURLs(response string, urls []string) string { } func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) { + if update.Message == nil || update.Message.From == nil { + xlog.Debug("Message or user is nil", "update", update) + return + } + username := update.Message.From.Username xlog.Debug("Received message from user", "username", username, "chatID", update.Message.Chat.ID, "message", update.Message.Text) - internalError := func(err error, msg *models.Message) { xlog.Error("Error updating final message", "error", err) b.EditMessageText(ctx, &bot.EditMessageTextParams{ @@ -224,6 +461,15 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, Text: "there was an internal error. try again!", }) } + + xlog.Debug("Handling message", "update", update) + // Handle group messages + if update.Message.Chat.Type == "group" || update.Message.Chat.Type == "supergroup" { + t.handleGroupMessage(ctx, b, a, update) + return + } + + // Handle private messages if len(t.admins) > 0 && !slices.Contains(t.admins, username) { xlog.Info("Unauthorized user", "username", username, "admins", t.admins) _, err := b.SendMessage(ctx, &bot.SendMessageParams{ @@ -346,7 +592,15 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, messages := xstrings.SplitParagraph(formattedResponse, telegramMaxMessageLength) if len(messages) == 0 { - internalError(errors.New("empty response from agent"), msg) + _, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{ + ChatID: update.Message.Chat.ID, + MessageID: msg.ID, + Text: "there was an internal error. try again!", + }) + if err != nil { + xlog.Error("Error updating error message", "error", err) + internalError(fmt.Errorf("error updating error message: %w", err), msg) + } return } @@ -358,7 +612,7 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, ParseMode: models.ParseModeMarkdown, }) if err != nil { - internalError(fmt.Errorf("internal error: %w", err), msg) + xlog.Error("Error updating message", "error", err) return } @@ -370,7 +624,7 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, ParseMode: models.ParseModeMarkdown, }) if err != nil { - internalError(fmt.Errorf("internal error: %w", err), msg) + xlog.Error("Error sending additional message", "error", err) } } } @@ -454,6 +708,8 @@ func NewTelegramConnector(config map[string]string) (*Telegram, error) { placeholders: make(map[string]int), activeJobs: make(map[int64][]*types.Job), channelID: config["channel_id"], + groupMode: config["group_mode"] == "true", + mentionOnly: config["mention_only"] == "true", }, nil } @@ -478,5 +734,17 @@ func TelegramConfigMeta() []config.Field { Type: config.FieldTypeText, HelpText: "Telegram channel ID to send messages to if the agent needs to initiate a conversation", }, + { + Name: "group_mode", + Label: "Group Mode", + Type: config.FieldTypeCheckbox, + HelpText: "Enable bot to respond in group chats", + }, + { + Name: "mention_only", + Label: "Mention Only", + Type: config.FieldTypeCheckbox, + HelpText: "Bot will only respond when mentioned in group chats", + }, } }