package connectors import ( "context" "errors" "os" "os/signal" "slices" "strings" "time" "github.com/go-telegram/bot" "github.com/go-telegram/bot/models" "github.com/mudler/LocalAgent/core/agent" "github.com/mudler/LocalAgent/core/types" "github.com/mudler/LocalAgent/pkg/xlog" "github.com/sashabaranov/go-openai" ) type Telegram struct { Token string bot *bot.Bot agent *agent.Agent currentconversation map[int64][]openai.ChatCompletionMessage lastMessageTime map[int64]time.Time lastMessageDuration time.Duration admins []string conversationTracker *ConversationTracker[int64] } // Send any text message to the bot after the bot has been started func (t *Telegram) AgentResultCallback() func(state types.ActionState) { return func(state types.ActionState) { t.bot.SetMyDescription(t.agent.Context(), &bot.SetMyDescriptionParams{ Description: state.Reasoning, }) } } func (t *Telegram) AgentReasoningCallback() func(state types.ActionCurrentState) bool { return func(state types.ActionCurrentState) bool { t.bot.SetMyDescription(t.agent.Context(), &bot.SetMyDescriptionParams{ Description: state.Reasoning, }) return true } } func (t *Telegram) handleUpdate(ctx context.Context, b *bot.Bot, a *agent.Agent, update *models.Update) { username := update.Message.From.Username if len(t.admins) > 0 && !slices.Contains(t.admins, username) { xlog.Info("Unauthorized user", "username", username) return } currentConv := t.conversationTracker.GetConversation(update.Message.From.ID) currentConv = append(currentConv, openai.ChatCompletionMessage{ Content: update.Message.Text, Role: "user", }) res := a.Ask( types.WithConversationHistory(currentConv), ) if res.Response == "" { return } t.conversationTracker.AddMessage( update.Message.From.ID, openai.ChatCompletionMessage{ Content: res.Response, Role: "assistant", }, ) b.SendMessage(ctx, &bot.SendMessageParams{ ParseMode: models.ParseModeMarkdown, ChatID: update.Message.Chat.ID, Text: res.Response, }) } // func (t *Telegram) handleNewMessage(ctx context.Context, b *bot.Bot, m openai.ChatCompletionMessage) { // if t.lastChatID == 0 { // return // } // b.SendMessage(ctx, &bot.SendMessageParams{ // ChatID: t.lastChatID, // Text: m.Content, // }) // } func (t *Telegram) Start(a *agent.Agent) { ctx, cancel := signal.NotifyContext(a.Context(), os.Interrupt) defer cancel() opts := []bot.Option{ bot.WithDefaultHandler(func(ctx context.Context, b *bot.Bot, update *models.Update) { go t.handleUpdate(ctx, b, a, update) }), } b, err := bot.New(t.Token, opts...) if err != nil { panic(err) } t.bot = b t.agent = a // go func() { // for m := range a.ConversationChannel() { // t.handleNewMessage(ctx, b, m) // } // }() b.Start(ctx) } func NewTelegramConnector(config map[string]string) (*Telegram, error) { token, ok := config["token"] if !ok { return nil, errors.New("token is required") } duration, err := time.ParseDuration(config["lastMessageDuration"]) if err != nil { duration = 5 * time.Minute } admins := []string{} if _, ok := config["admins"]; ok { admins = append(admins, strings.Split(config["admins"], ",")...) } return &Telegram{ Token: token, lastMessageDuration: duration, admins: admins, currentconversation: map[int64][]openai.ChatCompletionMessage{}, lastMessageTime: map[int64]time.Time{}, conversationTracker: NewConversationTracker[int64](duration), }, nil }