feat(telegram): Add support for groups (#183)
Add support for groups Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
committed by
GitHub
parent
9a90153dc6
commit
37aa532cc2
19
README.md
19
README.md
@@ -491,9 +491,26 @@ Get a token from @botfather, then:
|
|||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"token": "your-bot-father-token"
|
"token": "your-bot-father-token",
|
||||||
|
"group_mode": "true",
|
||||||
|
"mention_only": "true",
|
||||||
|
"admins": "username1,username2"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Configuration options:
|
||||||
|
- `token`: Your bot token from BotFather
|
||||||
|
- `group_mode`: Enable/disable group chat functionality
|
||||||
|
- `mention_only`: When enabled, bot only responds when mentioned in groups
|
||||||
|
- `admins`: Comma-separated list of Telegram usernames allowed to use the bot in private chats
|
||||||
|
- `channel_id`: Optional channel ID for the bot to send messages to
|
||||||
|
|
||||||
|
> **Important**: For group functionality to work properly:
|
||||||
|
> 1. Go to @BotFather
|
||||||
|
> 2. Select your bot
|
||||||
|
> 3. Go to "Bot Settings" > "Group Privacy"
|
||||||
|
> 4. Select "Turn off" to allow the bot to read all messages in groups
|
||||||
|
> 5. Restart your bot after changing this setting
|
||||||
</details>
|
</details>
|
||||||
|
|
||||||
<details>
|
<details>
|
||||||
|
|||||||
@@ -45,6 +45,239 @@ type Telegram struct {
|
|||||||
activeJobsMutex sync.RWMutex
|
activeJobsMutex sync.RWMutex
|
||||||
|
|
||||||
channelID string
|
channelID string
|
||||||
|
groupMode bool
|
||||||
|
mentionOnly bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// isBotMentioned checks if the bot is mentioned in the message
|
||||||
|
func (t *Telegram) isBotMentioned(message string, botUsername string) bool {
|
||||||
|
return strings.Contains(message, "@"+botUsername)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGroupMessage handles messages in group chats
|
||||||
|
func (t *Telegram) handleGroupMessage(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) {
|
||||||
|
xlog.Debug("Handling group message", "update", update)
|
||||||
|
if !t.groupMode {
|
||||||
|
xlog.Debug("Group mode is disabled, skipping group message", "chatID", update.Message.Chat.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get bot info to check username
|
||||||
|
botInfo, err := b.GetMe(ctx)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error getting bot info", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip messages from ourselves
|
||||||
|
if update.Message.From.Username == botInfo.Username {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If mention-only mode is enabled, check if bot is mentioned
|
||||||
|
if t.mentionOnly && !t.isBotMentioned(update.Message.Text, botInfo.Username) {
|
||||||
|
xlog.Debug("Bot not mentioned in message, skipping", "chatID", update.Message.Chat.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel any active job for this chat before starting a new one
|
||||||
|
t.cancelActiveJobForChat(update.Message.Chat.ID)
|
||||||
|
|
||||||
|
currentConv := a.SharedState().ConversationTracker.GetConversation(fmt.Sprintf("telegram:%d", update.Message.Chat.ID))
|
||||||
|
|
||||||
|
// Clean up the message by removing bot mentions
|
||||||
|
message := strings.ReplaceAll(update.Message.Text, "@"+botInfo.Username, "")
|
||||||
|
message = strings.TrimSpace(message)
|
||||||
|
|
||||||
|
// Send initial placeholder message
|
||||||
|
msg, err := b.SendMessage(ctx, &bot.SendMessageParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
Text: bot.EscapeMarkdown(telegramThinkingMessage),
|
||||||
|
ParseMode: models.ParseModeMarkdown,
|
||||||
|
ReplyParameters: &models.ReplyParameters{
|
||||||
|
MessageID: update.Message.ID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error sending initial message", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the UUID->placeholder message mapping
|
||||||
|
jobUUID := fmt.Sprintf("%d", msg.ID)
|
||||||
|
|
||||||
|
t.placeholderMutex.Lock()
|
||||||
|
t.placeholders[jobUUID] = msg.ID
|
||||||
|
t.placeholderMutex.Unlock()
|
||||||
|
|
||||||
|
// Add chat ID to metadata for tracking
|
||||||
|
metadata := map[string]interface{}{
|
||||||
|
"chatID": update.Message.Chat.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle images if present
|
||||||
|
if len(update.Message.Photo) > 0 {
|
||||||
|
// Get the largest photo
|
||||||
|
photo := update.Message.Photo[len(update.Message.Photo)-1]
|
||||||
|
|
||||||
|
// Download the photo
|
||||||
|
file, err := b.GetFile(ctx, &bot.GetFileParams{
|
||||||
|
FileID: photo.FileID,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error getting file", "error", err)
|
||||||
|
} else {
|
||||||
|
// Download the file content
|
||||||
|
resp, err := http.Get(file.FilePath)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error downloading file", "error", err)
|
||||||
|
} else {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
imageBytes, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error reading image", "error", err)
|
||||||
|
} else {
|
||||||
|
// Encode to base64
|
||||||
|
imgBase64 := base64.StdEncoding.EncodeToString(imageBytes)
|
||||||
|
|
||||||
|
// Add to conversation as multi-content message
|
||||||
|
currentConv = append(currentConv, openai.ChatCompletionMessage{
|
||||||
|
Role: "user",
|
||||||
|
MultiContent: []openai.ChatMessagePart{
|
||||||
|
{
|
||||||
|
Text: message,
|
||||||
|
Type: openai.ChatMessagePartTypeText,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: openai.ChatMessagePartTypeImageURL,
|
||||||
|
ImageURL: &openai.ChatMessageImageURL{
|
||||||
|
URL: fmt.Sprintf("data:image/jpeg;base64,%s", imgBase64),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
currentConv = append(currentConv, openai.ChatCompletionMessage{
|
||||||
|
Content: message,
|
||||||
|
Role: "user",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
a.SharedState().ConversationTracker.AddMessage(
|
||||||
|
fmt.Sprintf("telegram:%d", update.Message.Chat.ID),
|
||||||
|
currentConv[len(currentConv)-1],
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create a new job with the conversation history and metadata
|
||||||
|
job := types.NewJob(
|
||||||
|
types.WithConversationHistory(currentConv),
|
||||||
|
types.WithUUID(jobUUID),
|
||||||
|
types.WithMetadata(metadata),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Mark this chat as having an active job
|
||||||
|
t.activeJobsMutex.Lock()
|
||||||
|
t.activeJobs[update.Message.Chat.ID] = append(t.activeJobs[update.Message.Chat.ID], job)
|
||||||
|
t.activeJobsMutex.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// Mark job as complete
|
||||||
|
t.activeJobsMutex.Lock()
|
||||||
|
job.Cancel()
|
||||||
|
for i, j := range t.activeJobs[update.Message.Chat.ID] {
|
||||||
|
if j.UUID == job.UUID {
|
||||||
|
t.activeJobs[update.Message.Chat.ID] = append(t.activeJobs[update.Message.Chat.ID][:i], t.activeJobs[update.Message.Chat.ID][i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.activeJobsMutex.Unlock()
|
||||||
|
|
||||||
|
// Clean up the placeholder map
|
||||||
|
t.placeholderMutex.Lock()
|
||||||
|
delete(t.placeholders, jobUUID)
|
||||||
|
t.placeholderMutex.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
res := a.Ask(
|
||||||
|
types.WithConversationHistory(currentConv),
|
||||||
|
types.WithUUID(jobUUID),
|
||||||
|
types.WithMetadata(metadata),
|
||||||
|
)
|
||||||
|
|
||||||
|
if res.Response == "" {
|
||||||
|
xlog.Error("Empty response from agent")
|
||||||
|
_, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
MessageID: msg.ID,
|
||||||
|
Text: "there was an internal error. try again!",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error updating error message", "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.SharedState().ConversationTracker.AddMessage(
|
||||||
|
fmt.Sprintf("telegram:%d", update.Message.Chat.ID),
|
||||||
|
openai.ChatCompletionMessage{
|
||||||
|
Content: res.Response,
|
||||||
|
Role: "assistant",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Handle any multimedia content in the response and collect URLs
|
||||||
|
urls, err := t.handleMultimediaContent(ctx, update.Message.Chat.ID, res)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error handling multimedia content", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the message with the final response
|
||||||
|
formattedResponse := formatResponseWithURLs(res.Response, urls)
|
||||||
|
|
||||||
|
// Split the message if it's too long
|
||||||
|
messages := xstrings.SplitParagraph(formattedResponse, telegramMaxMessageLength)
|
||||||
|
|
||||||
|
if len(messages) == 0 {
|
||||||
|
_, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
MessageID: msg.ID,
|
||||||
|
Text: "there was an internal error. try again!",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error updating error message", "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the first message
|
||||||
|
_, err = b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
MessageID: msg.ID,
|
||||||
|
Text: messages[0],
|
||||||
|
ParseMode: models.ParseModeMarkdown,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error updating message", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send additional chunks as new messages
|
||||||
|
for i := 1; i < len(messages); i++ {
|
||||||
|
_, err = b.SendMessage(ctx, &bot.SendMessageParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
Text: messages[i],
|
||||||
|
ParseMode: models.ParseModeMarkdown,
|
||||||
|
ReplyParameters: &models.ReplyParameters{
|
||||||
|
MessageID: update.Message.ID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error sending additional message", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send any text message to the bot after the bot has been started
|
// Send any text message to the bot after the bot has been started
|
||||||
@@ -212,10 +445,14 @@ func formatResponseWithURLs(response string, urls []string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) {
|
func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) {
|
||||||
|
if update.Message == nil || update.Message.From == nil {
|
||||||
|
xlog.Debug("Message or user is nil", "update", update)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
username := update.Message.From.Username
|
username := update.Message.From.Username
|
||||||
|
|
||||||
xlog.Debug("Received message from user", "username", username, "chatID", update.Message.Chat.ID, "message", update.Message.Text)
|
xlog.Debug("Received message from user", "username", username, "chatID", update.Message.Chat.ID, "message", update.Message.Text)
|
||||||
|
|
||||||
internalError := func(err error, msg *models.Message) {
|
internalError := func(err error, msg *models.Message) {
|
||||||
xlog.Error("Error updating final message", "error", err)
|
xlog.Error("Error updating final message", "error", err)
|
||||||
b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
||||||
@@ -224,6 +461,15 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent,
|
|||||||
Text: "there was an internal error. try again!",
|
Text: "there was an internal error. try again!",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Handling message", "update", update)
|
||||||
|
// Handle group messages
|
||||||
|
if update.Message.Chat.Type == "group" || update.Message.Chat.Type == "supergroup" {
|
||||||
|
t.handleGroupMessage(ctx, b, a, update)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle private messages
|
||||||
if len(t.admins) > 0 && !slices.Contains(t.admins, username) {
|
if len(t.admins) > 0 && !slices.Contains(t.admins, username) {
|
||||||
xlog.Info("Unauthorized user", "username", username, "admins", t.admins)
|
xlog.Info("Unauthorized user", "username", username, "admins", t.admins)
|
||||||
_, err := b.SendMessage(ctx, &bot.SendMessageParams{
|
_, err := b.SendMessage(ctx, &bot.SendMessageParams{
|
||||||
@@ -346,7 +592,15 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent,
|
|||||||
messages := xstrings.SplitParagraph(formattedResponse, telegramMaxMessageLength)
|
messages := xstrings.SplitParagraph(formattedResponse, telegramMaxMessageLength)
|
||||||
|
|
||||||
if len(messages) == 0 {
|
if len(messages) == 0 {
|
||||||
internalError(errors.New("empty response from agent"), msg)
|
_, err := b.EditMessageText(ctx, &bot.EditMessageTextParams{
|
||||||
|
ChatID: update.Message.Chat.ID,
|
||||||
|
MessageID: msg.ID,
|
||||||
|
Text: "there was an internal error. try again!",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Error updating error message", "error", err)
|
||||||
|
internalError(fmt.Errorf("error updating error message: %w", err), msg)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -358,7 +612,7 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent,
|
|||||||
ParseMode: models.ParseModeMarkdown,
|
ParseMode: models.ParseModeMarkdown,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internalError(fmt.Errorf("internal error: %w", err), msg)
|
xlog.Error("Error updating message", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -370,7 +624,7 @@ func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent,
|
|||||||
ParseMode: models.ParseModeMarkdown,
|
ParseMode: models.ParseModeMarkdown,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internalError(fmt.Errorf("internal error: %w", err), msg)
|
xlog.Error("Error sending additional message", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -454,6 +708,8 @@ func NewTelegramConnector(config map[string]string) (*Telegram, error) {
|
|||||||
placeholders: make(map[string]int),
|
placeholders: make(map[string]int),
|
||||||
activeJobs: make(map[int64][]*types.Job),
|
activeJobs: make(map[int64][]*types.Job),
|
||||||
channelID: config["channel_id"],
|
channelID: config["channel_id"],
|
||||||
|
groupMode: config["group_mode"] == "true",
|
||||||
|
mentionOnly: config["mention_only"] == "true",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -478,5 +734,17 @@ func TelegramConfigMeta() []config.Field {
|
|||||||
Type: config.FieldTypeText,
|
Type: config.FieldTypeText,
|
||||||
HelpText: "Telegram channel ID to send messages to if the agent needs to initiate a conversation",
|
HelpText: "Telegram channel ID to send messages to if the agent needs to initiate a conversation",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "group_mode",
|
||||||
|
Label: "Group Mode",
|
||||||
|
Type: config.FieldTypeCheckbox,
|
||||||
|
HelpText: "Enable bot to respond in group chats",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "mention_only",
|
||||||
|
Label: "Mention Only",
|
||||||
|
Type: config.FieldTypeCheckbox,
|
||||||
|
HelpText: "Bot will only respond when mentioned in group chats",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user