diff --git a/services/actions.go b/services/actions.go index c0748ec..f7bc8e8 100644 --- a/services/actions.go +++ b/services/actions.go @@ -27,6 +27,7 @@ const ( ActionScraper = "scraper" ActionWikipedia = "wikipedia" ActionBrowse = "browse" + ActionTwitterPost = "twitter-post" ActionSendMail = "send_mail" ActionGenerateImage = "generate_image" ActionCounter = "counter" @@ -48,6 +49,7 @@ var AvailableActions = []string{ ActionWikipedia, ActionSendMail, ActionGenerateImage, + ActionTwitterPost, ActionCounter, } @@ -98,6 +100,8 @@ func Actions(a *state.AgentConfig) func(ctx context.Context) []agent.Action { allActions = append(allActions, actions.NewBrowse(config)) case ActionSendMail: allActions = append(allActions, actions.NewSendMail(config)) + case ActionTwitterPost: + allActions = append(allActions, actions.NewPostTweet(config)) case ActionCounter: allActions = append(allActions, actions.NewCounter(config)) } diff --git a/services/actions/twitter_post.go b/services/actions/twitter_post.go new file mode 100644 index 0000000..26c09c9 --- /dev/null +++ b/services/actions/twitter_post.go @@ -0,0 +1,60 @@ +package actions + +import ( + "context" + "fmt" + + "github.com/mudler/LocalAgent/core/action" + "github.com/mudler/LocalAgent/services/connectors/twitter" + "github.com/sashabaranov/go-openai/jsonschema" +) + +func NewPostTweet(config map[string]string) *PostTweetAction { + return &PostTweetAction{ + token: config["token"], + noCharacterLimit: config["noCharacterLimits"] == "true", + } +} + +type PostTweetAction struct { + token string + noCharacterLimit bool +} + +func (a *PostTweetAction) Run(ctx context.Context, params action.ActionParams) (action.ActionResult, error) { + result := struct { + Text string `json:"text"` + }{} + err := params.Unmarshal(&result) + if err != nil { + fmt.Printf("error: %v", err) + + return action.ActionResult{}, err + } + + if !a.noCharacterLimit && len(result.Text) > 280 { + return action.ActionResult{}, fmt.Errorf("tweet is too long, max 280 characters") + } + + client := twitter.NewTwitterClient(a.token) + + if err := client.Post(result.Text); err != nil { + return action.ActionResult{}, err + } + + return action.ActionResult{Result: fmt.Sprintf("twitter post created")}, nil +} + +func (a *PostTweetAction) Definition() action.ActionDefinition { + return action.ActionDefinition{ + Name: "post_tweet", + Description: "Post a tweet", + Properties: map[string]jsonschema.Definition{ + "text": { + Type: jsonschema.String, + Description: "The text to send.", + }, + }, + Required: []string{"text"}, + } +} diff --git a/services/connectors.go b/services/connectors.go index ae1aff4..50317b0 100644 --- a/services/connectors.go +++ b/services/connectors.go @@ -17,6 +17,7 @@ const ( ConnectorDiscord = "discord" ConnectorGithubIssues = "github-issues" ConnectorGithubPRs = "github-prs" + ConnectorTwitter = "twitter" ) var AvailableConnectors = []string{ @@ -26,6 +27,7 @@ var AvailableConnectors = []string{ ConnectorDiscord, ConnectorGithubIssues, ConnectorGithubPRs, + ConnectorTwitter, } func Connectors(a *state.AgentConfig) []state.Connector { @@ -56,6 +58,13 @@ func Connectors(a *state.AgentConfig) []state.Connector { conns = append(conns, connectors.NewGithubPRWatcher(config)) case ConnectorIRC: conns = append(conns, connectors.NewIRC(config)) + case ConnectorTwitter: + cc, err := connectors.NewTwitterConnector(config) + if err != nil { + xlog.Info("Error creating twitter connector", err) + continue + } + conns = append(conns, cc) } } return conns diff --git a/services/connectors/twitter.go b/services/connectors/twitter.go new file mode 100644 index 0000000..32b9850 --- /dev/null +++ b/services/connectors/twitter.go @@ -0,0 +1,135 @@ +package connectors + +import ( + "context" + "fmt" + "os" + "os/signal" + + "github.com/mudler/LocalAgent/core/agent" + "github.com/mudler/LocalAgent/pkg/xlog" + "github.com/mudler/LocalAgent/services/connectors/twitter" + "github.com/sashabaranov/go-openai" +) + +type Twitter struct { + token string + botUsername string + client *twitter.TwitterClient + noCharacterLimit bool +} + +func (t *Twitter) AgentResultCallback() func(state agent.ActionState) { + return func(state agent.ActionState) { + + } +} + +func (t *Twitter) AgentReasoningCallback() func(state agent.ActionCurrentState) bool { + return func(state agent.ActionCurrentState) bool { + + return true + } +} + +func NewTwitterConnector(config map[string]string) (*Twitter, error) { + return &Twitter{ + token: config["token"], + botUsername: config["botUsername"], + client: twitter.NewTwitterClient(config["token"]), + noCharacterLimit: config["noCharacterLimit"] == "true", + }, nil +} + +func (t *Twitter) Start(a *agent.Agent) { + ctx, cancel := signal.NotifyContext(a.Context(), os.Interrupt) + defer cancel() + + // Step 1: Setup stream rules + xlog.Info("Setting up stream rules...") + err := t.client.AddStreamRule(t.botUsername) + if err != nil { + xlog.Error("Failed to add stream rule:", err) + } + + // Step 2: Listen for mentions and respond + fmt.Println("Listening for mentions...") + + go t.loop(ctx, a) + +} + +func (t *Twitter) loop(ctx context.Context, a *agent.Agent) { + + for { + select { + case <-ctx.Done(): + xlog.Info("Shutting down Twitter connector...") + return + + default: + if err := t.run(a); err != nil { + xlog.Error("Error running Twitter connector", "err", err) + return + } + } + } + +} + +func (t *Twitter) run(a *agent.Agent) error { + tweet, err := t.client.ListenForMentions() + if err != nil { + xlog.Error("Error getting mention", "error", err) + return nil + } + + xlog.Info("Got mention", "tweet", tweet) + // Check if bot has already replied + hasReplied, err := t.client.HasReplied(tweet.ID, t.botUsername) + if err != nil { + xlog.Error("Error checking if bot has replied", "error", err) + return nil + } + + if hasReplied { + xlog.Info("Bot has already replied to this tweet") + return nil + } + + res := a.Ask( + agent.WithConversationHistory( + []openai.ChatCompletionMessage{ + { + Role: "system", + Content: "You are replying to a twitter mention, keep answer short", + }, + { + Role: "user", + Content: tweet.Text, + }, + }, + ), + ) + + if res.Error != nil { + xlog.Error("Error getting response from agent", "error", res.Error) + return nil + } + + if len(res.Response) > 280 && !t.noCharacterLimit { + xlog.Error("Tweet is too long, max 280 characters") + return nil + } + + // Reply to tweet + err = t.client.ReplyToTweet(tweet.ID, res.Response) + if err != nil { + xlog.Error("Error replying to tweet", "error", err) + return nil + } + + xlog.Debug("Replied successfully!") + + return nil +} diff --git a/services/connectors/twitter/client.go b/services/connectors/twitter/client.go new file mode 100644 index 0000000..36e6cd4 --- /dev/null +++ b/services/connectors/twitter/client.go @@ -0,0 +1,171 @@ +package twitter + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" +) + +// TwitterAPIBase is the base URL for Twitter API v2 +const TwitterAPIBase = "https://api.twitter.com/2" + +// TwitterClient represents a Twitter API client +type TwitterClient struct { + BearerToken string + Client *http.Client +} + +// NewTwitterClient initializes a new Twitter API client +func NewTwitterClient(bearerToken string) *TwitterClient { + return &TwitterClient{ + BearerToken: bearerToken, + Client: &http.Client{Timeout: 10 * time.Second}, + } +} + +// makeRequest is a helper for making authenticated HTTP requests +func (t *TwitterClient) makeRequest(method, url string, body map[string]interface{}) ([]byte, error) { + var req *http.Request + var err error + + if body != nil { + jsonBody, _ := json.Marshal(body) + req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonBody)) + req.Header.Set("Content-Type", "application/json") + } else { + req, err = http.NewRequest(method, url, nil) + } + + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", "Bearer "+t.BearerToken) + resp, err := t.Client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf("Twitter API error: %s", string(body)) + } + + return ioutil.ReadAll(resp.Body) +} + +// GetStreamRules fetches existing stream rules +func (t *TwitterClient) GetStreamRules() ([]byte, error) { + url := TwitterAPIBase + "/tweets/search/stream/rules" + return t.makeRequest("GET", url, nil) +} + +// AddStreamRule adds a rule to listen for mentions +func (t *TwitterClient) AddStreamRule(username string) error { + url := TwitterAPIBase + "/tweets/search/stream/rules" + body := map[string]interface{}{ + "add": []map[string]string{ + {"value": "@" + username, "tag": "Listen for mentions"}, + }, + } + + _, err := t.makeRequest("POST", url, body) + return err +} + +// DeleteStreamRules removes specific stream rules +func (t *TwitterClient) DeleteStreamRules(ruleIDs []string) error { + url := TwitterAPIBase + "/tweets/search/stream/rules" + body := map[string]interface{}{ + "delete": map[string]interface{}{ + "ids": ruleIDs, + }, + } + + _, err := t.makeRequest("POST", url, body) + return err +} + +// ListenForMentions listens to the stream for mentions +func (t *TwitterClient) ListenForMentions() (*Tweet, error) { + url := TwitterAPIBase + "/tweets/search/stream" + resp, err := t.makeRequest("GET", url, nil) + if err != nil { + return nil, err + } + + var tweetResponse struct { + Data Tweet `json:"data"` + } + + err = json.Unmarshal(resp, &tweetResponse) + if err != nil { + return nil, err + } + + return &tweetResponse.Data, nil +} + +// GetReplies fetches all replies to a tweet +func (t *TwitterClient) GetReplies(tweetID, botUsername string) ([]Tweet, error) { + url := fmt.Sprintf("%s/tweets/search/recent?query=conversation_id:%s from:%s", TwitterAPIBase, tweetID, botUsername) + resp, err := t.makeRequest("GET", url, nil) + if err != nil { + return nil, err + } + + var result struct { + Data []Tweet `json:"data"` + } + + err = json.Unmarshal(resp, &result) + if err != nil { + return nil, err + } + + return result.Data, nil +} + +// HasReplied checks if the bot has already replied to a tweet +func (t *TwitterClient) HasReplied(tweetID, botUsername string) (bool, error) { + replies, err := t.GetReplies(tweetID, botUsername) + if err != nil { + return false, err + } + + return len(replies) > 0, nil +} + +// ReplyToTweet replies to a given tweet +func (t *TwitterClient) ReplyToTweet(tweetID, message string) error { + url := TwitterAPIBase + "/tweets" + body := map[string]interface{}{ + "text": message, + "reply": map[string]string{ + "in_reply_to_tweet_id": tweetID, + }, + } + + _, err := t.makeRequest("POST", url, body) + return err +} + +func (t *TwitterClient) Post(message string) error { + url := TwitterAPIBase + "/tweets" + body := map[string]interface{}{ + "text": message, + } + + _, err := t.makeRequest("POST", url, body) + return err +} + +// Tweet represents a tweet object +type Tweet struct { + ID string `json:"id"` + Text string `json:"text"` +}