diff --git a/core/types/job.go b/core/types/job.go index 167e2e6..5c4fd52 100644 --- a/core/types/job.go +++ b/core/types/job.go @@ -20,6 +20,7 @@ type Job struct { resultCallback func(ActionState) ConversationHistory []openai.ChatCompletionMessage UUID string + Metadata map[string]interface{} } // JobResult is the result of a job @@ -54,6 +55,12 @@ func WithResultCallback(f func(ActionState)) JobOption { } } +func WithMetadata(metadata map[string]interface{}) JobOption { + return func(j *Job) { + j.Metadata = metadata + } +} + // NewJobResult creates a new job result func NewJobResult() *JobResult { r := &JobResult{ diff --git a/services/connectors/slack.go b/services/connectors/slack.go index f84e56c..5555ff1 100644 --- a/services/connectors/slack.go +++ b/services/connectors/slack.go @@ -8,6 +8,7 @@ import ( "log" "os" "strings" + "sync" "github.com/mudler/LocalAgent/pkg/xlog" "github.com/mudler/LocalAgent/services/actions" @@ -29,26 +30,63 @@ type Slack struct { botToken string channelID string alwaysReply bool + + // To track placeholder messages + placeholders map[string]string // map[jobUUID]messageTS + placeholderMutex sync.RWMutex + apiClient *slack.Client } +const thinkingMessage = "thinking..." + func NewSlack(config map[string]string) *Slack { return &Slack{ - appToken: config["appToken"], - botToken: config["botToken"], - channelID: config["channelID"], - alwaysReply: config["alwaysReply"] == "true", + appToken: config["appToken"], + botToken: config["botToken"], + channelID: config["channelID"], + alwaysReply: config["alwaysReply"] == "true", + placeholders: make(map[string]string), } } func (t *Slack) AgentResultCallback() func(state types.ActionState) { return func(state types.ActionState) { - // Send the result to the bot + // The final result callback is intentionally empty as we're handling + // the final update in the handleMention function directly } } func (t *Slack) AgentReasoningCallback() func(state types.ActionCurrentState) bool { return func(state types.ActionCurrentState) bool { - // Send the reasoning to the bot + // Check if we have a placeholder message for this job + t.placeholderMutex.RLock() + msgTs, exists := t.placeholders[state.Job.UUID] + channel := "" + if state.Job.Metadata != nil { + if ch, ok := state.Job.Metadata["channel"].(string); ok { + channel = ch + } + } + t.placeholderMutex.RUnlock() + + if !exists || msgTs == "" || channel == "" || t.apiClient == nil { + return true // Skip if we don't have a message to update + } + + thought := thinkingMessage + "\n\n" + if state.Reasoning != "" { + thought += "Current thought process:\n" + state.Reasoning + } + + // Update the placeholder message with the current reasoning + _, _, _, err := t.apiClient.UpdateMessage( + channel, + msgTs, + slack.MsgOptionText(githubmarkdownconvertergo.Slack(thought), false), + ) + if err != nil { + xlog.Error(fmt.Sprintf("Error updating reasoning message: %v", err)) + } return true } } @@ -149,7 +187,10 @@ func (t *Slack) handleChannelMessage( } } - agentOptions := []types.JobOption{types.WithText(message)} + agentOptions := []types.JobOption{ + types.WithText(message), + types.WithUUID(ev.ThreadTimeStamp), + } // If the last message has an image, we send it as a multi content message if len(imageBytes.Bytes()) > 0 { @@ -207,6 +248,49 @@ func (t *Slack) handleMention( go func() { ts := ev.ThreadTimeStamp + var msgTs string // Timestamp of our placeholder message + var err error + + // Store the API client for use in the callbacks + t.apiClient = api + + // Send initial placeholder message + if ts != "" { + // If we're in a thread, post the placeholder there + _, respTs, err := api.PostMessage(ev.Channel, + slack.MsgOptionText(thinkingMessage, false), + slack.MsgOptionPostMessageParameters(postMessageParams), + slack.MsgOptionTS(ts)) + if err != nil { + xlog.Error(fmt.Sprintf("Error posting initial message: %v", err)) + } else { + msgTs = respTs + } + } else { + // Starting a new thread + _, respTs, err := api.PostMessage(ev.Channel, + slack.MsgOptionText(thinkingMessage, false), + slack.MsgOptionPostMessageParameters(postMessageParams), + slack.MsgOptionTS(ev.TimeStamp)) + if err != nil { + xlog.Error(fmt.Sprintf("Error posting initial message: %v", err)) + } else { + msgTs = respTs + // We're creating a new thread, so use this as our thread timestamp + ts = ev.TimeStamp + } + } + + // Store the UUID->placeholder message mapping + // We'll use the thread timestamp as our UUID + jobUUID := ts + if jobUUID == "" { + jobUUID = ev.TimeStamp + } + + t.placeholderMutex.Lock() + t.placeholders[jobUUID] = msgTs + t.placeholderMutex.Unlock() var threadMessages []openai.ChatCompletionMessage @@ -222,6 +306,11 @@ func (t *Slack) handleMention( xlog.Error(fmt.Sprintf("Error fetching thread messages: %v", err)) } else { for i, msg := range messages { + // Skip our placeholder message + if msg.Timestamp == msgTs { + continue + } + role := "assistant" if msg.User != b.UserID { role = "user" @@ -350,32 +439,41 @@ func (t *Slack) handleMention( } } + // Add channel to job metadata for use in callbacks + metadata := map[string]interface{}{ + "channel": ev.Channel, + } + + // Call the agent with the conversation history res := a.Ask( - // types.WithText(message), types.WithConversationHistory(threadMessages), + types.WithUUID(jobUUID), + types.WithMetadata(metadata), ) - res.Response = githubmarkdownconvertergo.Slack(res.Response) - var err error - if ts != "" { - _, _, err = api.PostMessage(ev.Channel, - slack.MsgOptionText(res.Response, true), - slack.MsgOptionPostMessageParameters( - postMessageParams, - ), + // Format the final response + finalResponse := githubmarkdownconvertergo.Slack(res.Response) + + // Update the placeholder message with the final result + t.placeholderMutex.RLock() + msgTs, exists := t.placeholders[jobUUID] + t.placeholderMutex.RUnlock() + + if exists && msgTs != "" { + _, _, _, err = api.UpdateMessage( + ev.Channel, + msgTs, + slack.MsgOptionText(finalResponse, true), slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...), - slack.MsgOptionTS(ts)) - } else { - _, _, err = api.PostMessage(ev.Channel, - slack.MsgOptionText(res.Response, true), - slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...), - slack.MsgOptionPostMessageParameters( - postMessageParams, - ), - slack.MsgOptionTS(ev.TimeStamp)) - } - if err != nil { - xlog.Error(fmt.Sprintf("Error posting message: %v", err)) + ) + if err != nil { + xlog.Error(fmt.Sprintf("Error updating final message: %v", err)) + } + + // Clean up the placeholder map + t.placeholderMutex.Lock() + delete(t.placeholders, jobUUID) + t.placeholderMutex.Unlock() } }() } @@ -388,6 +486,8 @@ func (t *Slack) Start(a *agent.Agent) { slack.OptionAppLevelToken(t.appToken), ) + t.apiClient = api + postMessageParams := slack.PostMessageParameters{ LinkNames: 1, Markdown: true,