Make it working, expose MCP prepare script to UI

Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
mudler
2025-04-24 12:31:48 +02:00
parent 9b7344fbdf
commit 34d0954171
8 changed files with 121 additions and 61 deletions

View File

@@ -174,7 +174,15 @@ func (a *Agent) initMCPActions() error {
}
// MCP STDIO Servers
a.closeMCPSTDIOServers() // Make sure we stop all previous servers if any is active
if a.options.mcpPrepareScript != "" {
xlog.Debug("Preparing MCP box", "script", a.options.mcpPrepareScript)
client := stdio.NewClient(a.options.mcpBoxURL)
client.RunProcess(a.context, "/bin/bash", []string{"-c", a.options.mcpPrepareScript}, []string{})
}
for _, mcpStdioServer := range a.options.mcpStdioServers {
client := stdio.NewClient(a.options.mcpBoxURL)
p, err := client.CreateProcess(a.context,

View File

@@ -53,7 +53,7 @@ type options struct {
mcpServers []MCPServer
mcpStdioServers []MCPSTDIOServer
mcpBoxURL string
mcpPrepareScript string
newConversationsSubscribers []func(openai.ChatCompletionMessage)
observer Observer
@@ -223,6 +223,13 @@ func WithMCPBoxURL(url string) Option {
}
}
func WithMCPPrepareScript(script string) Option {
return func(o *options) error {
o.mcpPrepareScript = script
return nil
}
}
func WithLLMAPIURL(url string) Option {
return func(o *options) error {
o.LLMAPI.APIURL = url

View File

@@ -37,6 +37,7 @@ type AgentConfig struct {
DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"`
MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"`
MCPSTDIOServers []agent.MCPSTDIOServer `json:"mcp_stdio_servers" form:"mcp_stdio_servers"`
MCPPrepareScript string `json:"mcp_prepare_script" form:"mcp_prepare_script"`
MCPBoxURL string `json:"mcp_box_url" form:"mcp_box_url"`
Description string `json:"description" form:"description"`

View File

@@ -406,6 +406,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
WithMCPSTDIOServers(config.MCPSTDIOServers...),
WithMCPBoxURL(a.mcpBoxURL),
WithPrompts(promptBlocks...),
WithMCPPrepareScript(config.MCPPrepareScript),
// WithDynamicPrompts(dynamicPrompts...),
WithCharacter(Character{
Name: name,

View File

@@ -58,8 +58,7 @@ func (c *Client) CreateProcess(ctx context.Context, command string, args []strin
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
if err != nil {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to start process: %w. body: %s", err, string(body))
return nil, fmt.Errorf("failed to start process: %w", err)
}
defer resp.Body.Close()
@@ -259,9 +258,10 @@ type websocketWriter struct {
}
func (w *websocketWriter) Write(p []byte) (n int, err error) {
err = w.conn.WriteMessage(websocket.TextMessage, p)
// Use BinaryMessage type for better compatibility
err = w.conn.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
return 0, fmt.Errorf("failed to write WebSocket message: %w", err)
}
return len(p), nil
}

View File

@@ -1,6 +1,7 @@
package stdio
import (
"os"
"testing"
. "github.com/onsi/ginkgo/v2"
@@ -11,3 +12,17 @@ func TestSTDIOTransport(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "STDIOTransport test suite")
}
var baseURL string
func init() {
baseURL = os.Getenv("STDIO_SERVER_URL")
if baseURL == "" {
baseURL = "http://localhost:8080"
}
}
var _ = AfterSuite(func() {
client := NewClient(baseURL)
client.StopGroup("test-group")
})

View File

@@ -2,9 +2,11 @@ package stdio
import (
"context"
"os"
"time"
mcp "github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/stdio"
"github.com/mudler/LocalAGI/pkg/xlog"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
@@ -12,14 +14,9 @@ import (
var _ = Describe("Client", func() {
var (
client *Client
baseURL string
)
BeforeEach(func() {
baseURL = os.Getenv("STDIO_SERVER_URL")
if baseURL == "" {
baseURL = "http://localhost:8080"
}
client = NewClient(baseURL)
})
@@ -183,5 +180,56 @@ var _ = Describe("Client", func() {
err = client.StopProcess(process.ID)
Expect(err).NotTo(HaveOccurred())
})
It("MCP", func() {
ctx := context.Background()
process, err := client.CreateProcess(ctx,
"docker", []string{"run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"},
[]string{"GITHUB_PERSONAL_ACCESS_TOKEN=test"}, "test-group")
Expect(err).NotTo(HaveOccurred())
Expect(process).NotTo(BeNil())
Expect(process.ID).NotTo(BeEmpty())
defer client.StopProcess(process.ID)
// MCP client
read, writer, err := client.GetProcessIO(process.ID)
Expect(err).NotTo(HaveOccurred())
Expect(read).NotTo(BeNil())
Expect(writer).NotTo(BeNil())
transport := stdio.NewStdioServerTransportWithIO(read, writer)
// Create a new client
mcpClient := mcp.NewClient(transport)
// Initialize the client
response, e := mcpClient.Initialize(ctx)
Expect(e).NotTo(HaveOccurred())
Expect(response).NotTo(BeNil())
Expect(mcpClient.Ping(ctx)).To(Succeed())
xlog.Debug("Client initialized: %v", response.Instructions)
alltools := []mcp.ToolRetType{}
var cursor *string
for {
tools, err := mcpClient.ListTools(ctx, cursor)
Expect(err).NotTo(HaveOccurred())
Expect(tools).NotTo(BeNil())
Expect(tools.Tools).NotTo(BeEmpty())
alltools = append(alltools, tools.Tools...)
if tools.NextCursor == nil {
break // No more pages
}
cursor = tools.NextCursor
}
for _, tool := range alltools {
xlog.Debug("Tool: %v", tool)
}
})
})
})

View File

@@ -1,7 +1,6 @@
package stdio
import (
"bufio"
"context"
"encoding/json"
"fmt"
@@ -41,9 +40,7 @@ func NewServer() *Server {
return &Server{
processes: make(map[string]*Process),
groups: make(map[string][]string),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
},
upgrader: websocket.Upgrader{},
}
}
@@ -354,7 +351,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
}
}()
// Handle stdout and stderr using bufio.Scanner
// Handle stdout and stderr
go func() {
defer func() {
select {
@@ -365,46 +362,29 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
}
}()
// Create a scanner that reads from both stdout and stderr
scanner := bufio.NewScanner(io.MultiReader(process.Stdout, process.Stderr))
// Set a larger buffer size for JSON-RPC messages (10MB)
scanner.Buffer(make([]byte, 10*1024*1024), 10*1024*1024)
// Use a custom split function to handle JSON-RPC messages
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
// Create a buffer for reading
buf := make([]byte, 4096)
reader := io.MultiReader(process.Stdout, process.Stderr)
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
xlog.Debug("Read error", "processID", id, "error", err)
}
return
}
// Look for the end of a JSON-RPC message
for i := 0; i < len(data); i++ {
if data[i] == '\n' {
return i + 1, data[:i], nil
}
}
// If we're at EOF, return the remaining data
if atEOF {
return len(data), data, nil
}
// Request more data
return 0, nil, nil
})
for scanner.Scan() {
line := scanner.Text()
xlog.Debug("Sending message", "processID", id, "message", line)
if err := conn.WriteMessage(websocket.TextMessage, []byte(line)); err != nil {
if n > 0 {
xlog.Debug("Sending message", "processID", id, "size", n)
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
xlog.Debug("WebSocket output write error", "processID", id, "error", err)
}
return
}
xlog.Debug("Message sent to client", "processID", id, "message", line)
xlog.Debug("Message sent to client", "processID", id, "size", n)
}
if err := scanner.Err(); err != nil {
xlog.Debug("Scanner error", "processID", id, "error", err)
}
}()