From 6d9c58e6c0e566f14192093fe971d2ba7b0c8a48 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 22 Apr 2025 23:07:54 +0200 Subject: [PATCH] Attach mcp stdio box to agent Signed-off-by: Ettore Di Giacinto Signed-off-by: mudler --- core/agent/agent.go | 1 + core/agent/mcp.go | 161 +++++++++++++++++++++++++++--------------- core/agent/options.go | 18 ++++- main.go | 1 + 4 files changed, 125 insertions(+), 56 deletions(-) diff --git a/core/agent/agent.go b/core/agent/agent.go index 1e27617..1aa8fdf 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -241,6 +241,7 @@ func (a *Agent) Stop() { a.Lock() defer a.Unlock() xlog.Debug("Stopping agent", "agent", a.Character.Name) + a.closeMCPSTDIOServers() a.context.Cancel() } diff --git a/core/agent/mcp.go b/core/agent/mcp.go index ee67822..0dc19a1 100644 --- a/core/agent/mcp.go +++ b/core/agent/mcp.go @@ -3,12 +3,14 @@ package agent import ( "context" "encoding/json" - "errors" mcp "github.com/metoro-io/mcp-golang" "github.com/metoro-io/mcp-golang/transport/http" + stdioTransport "github.com/metoro-io/mcp-golang/transport/stdio" "github.com/mudler/LocalAGI/core/types" + "github.com/mudler/LocalAGI/pkg/stdio" "github.com/mudler/LocalAGI/pkg/xlog" + "github.com/sashabaranov/go-openai/jsonschema" ) @@ -19,6 +21,12 @@ type MCPServer struct { Token string `json:"token"` } +type MCPSTDIOServer struct { + Args []string `json:"args"` + Env []string `json:"env"` + Cmd string `json:"cmd"` +} + type mcpAction struct { mcpClient *mcp.Client inputSchema ToolInputSchema @@ -79,6 +87,68 @@ type ToolInputSchema struct { Required []string `json:"required,omitempty"` } +func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) { + + var generatedActions types.Actions + xlog.Debug("Initializing client") + // Initialize the client + response, e := client.Initialize(a.context) + if e != nil { + xlog.Error("Failed to initialize client", "error", e.Error()) + return nil, e + } + + xlog.Debug("Client initialized: %v", response.Instructions) + + var cursor *string + for { + tools, err := client.ListTools(a.context, cursor) + if err != nil { + xlog.Error("Failed to list tools", "error", err.Error()) + return nil, err + } + + for _, t := range tools.Tools { + desc := "" + if t.Description != nil { + desc = *t.Description + } + + xlog.Debug("Tool", "name", t.Name, "description", desc) + + dat, err := json.Marshal(t.InputSchema) + if err != nil { + xlog.Error("Failed to marshal input schema", "error", err.Error()) + } + + xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat)) + + // XXX: This is a wild guess, to verify (data types might be incompatible) + var inputSchema ToolInputSchema + err = json.Unmarshal(dat, &inputSchema) + if err != nil { + xlog.Error("Failed to unmarshal input schema", "error", err.Error()) + } + + // Create a new action with Client + tool + generatedActions = append(generatedActions, &mcpAction{ + mcpClient: client, + toolName: t.Name, + inputSchema: inputSchema, + toolDescription: desc, + }) + } + + if tools.NextCursor == nil { + break // No more pages + } + cursor = tools.NextCursor + } + + return generatedActions, nil + +} + func (a *Agent) initMCPActions() error { a.mcpActions = nil @@ -86,6 +156,7 @@ func (a *Agent) initMCPActions() error { generatedActions := types.Actions{} + // MCP HTTP Servers for _, mcpServer := range a.options.mcpServers { transport := http.NewHTTPClientTransport("/mcp") transport.WithBaseURL(mcpServer.URL) @@ -95,70 +166,50 @@ func (a *Agent) initMCPActions() error { // Create a new client client := mcp.NewClient(transport) + xlog.Debug("Adding tools for MCP server", "server", mcpServer) + generatedActions, err = a.addTools(client) + if err != nil { + xlog.Error("Failed to add tools for MCP server", "server", mcpServer, "error", err.Error()) + } + } - xlog.Debug("Initializing client", "server", mcpServer.URL) - // Initialize the client - response, e := client.Initialize(a.context) - if e != nil { - xlog.Error("Failed to initialize client", "error", e.Error(), "server", mcpServer) - if err == nil { - err = e - } else { - err = errors.Join(err, e) - } + // MCP STDIO Servers + a.closeMCPSTDIOServers() // Make sure we stop all previous servers if any is active + for _, mcpStdioServer := range a.options.mcpStdioServers { + client := stdio.NewClient(a.options.mcpBoxURL) + p, err := client.CreateProcess(a.context, + mcpStdioServer.Cmd, + mcpStdioServer.Args, + mcpStdioServer.Env, + a.Character.Name) + if err != nil { + xlog.Error("Failed to create process", "error", err.Error()) + continue + } + read, writer, err := client.GetProcessIO(p.ID) + if err != nil { + xlog.Error("Failed to get process IO", "error", err.Error()) continue } - xlog.Debug("Client initialized: %v", response.Instructions) + transport := stdioTransport.NewStdioServerTransportWithIO(read, writer) - var cursor *string - for { - tools, err := client.ListTools(a.context, cursor) - if err != nil { - xlog.Error("Failed to list tools", "error", err.Error()) - return err - } + // Create a new client + mcpClient := mcp.NewClient(transport) - for _, t := range tools.Tools { - desc := "" - if t.Description != nil { - desc = *t.Description - } - - xlog.Debug("Tool", "mcpServer", mcpServer, "name", t.Name, "description", desc) - - dat, err := json.Marshal(t.InputSchema) - if err != nil { - xlog.Error("Failed to marshal input schema", "error", err.Error()) - } - - xlog.Debug("Input schema", "mcpServer", mcpServer, "tool", t.Name, "schema", string(dat)) - - // XXX: This is a wild guess, to verify (data types might be incompatible) - var inputSchema ToolInputSchema - err = json.Unmarshal(dat, &inputSchema) - if err != nil { - xlog.Error("Failed to unmarshal input schema", "error", err.Error()) - } - - // Create a new action with Client + tool - generatedActions = append(generatedActions, &mcpAction{ - mcpClient: client, - toolName: t.Name, - inputSchema: inputSchema, - toolDescription: desc, - }) - } - - if tools.NextCursor == nil { - break // No more pages - } - cursor = tools.NextCursor + xlog.Debug("Adding tools for MCP server (stdio)", "server", mcpStdioServer) + generatedActions, err = a.addTools(mcpClient) + if err != nil { + xlog.Error("Failed to add tools for MCP server", "server", mcpStdioServer, "error", err.Error()) } - } a.mcpActions = generatedActions return err } + +func (a *Agent) closeMCPSTDIOServers() { + client := stdio.NewClient(a.options.mcpBoxURL) + client.StopGroup(a.Character.Name) +} diff --git a/core/agent/options.go b/core/agent/options.go index 942d062..22d358f 100644 --- a/core/agent/options.go +++ b/core/agent/options.go @@ -50,7 +50,9 @@ type options struct { conversationsPath string - mcpServers []MCPServer + mcpServers []MCPServer + mcpStdioServers []MCPSTDIOServer + mcpBoxURL string newConversationsSubscribers []func(openai.ChatCompletionMessage) @@ -207,6 +209,20 @@ func WithMCPServers(servers ...MCPServer) Option { } } +func WithMCPSTDIOServers(servers ...MCPSTDIOServer) Option { + return func(o *options) error { + o.mcpStdioServers = servers + return nil + } +} + +func WithMCPBoxURL(url string) Option { + return func(o *options) error { + o.mcpBoxURL = url + return nil + } +} + func WithLLMAPIURL(url string) Option { return func(o *options) error { o.LLMAPI.APIURL = url diff --git a/main.go b/main.go index d375486..6e00b07 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ var apiKeysEnv = os.Getenv("LOCALAGI_API_KEYS") var imageModel = os.Getenv("LOCALAGI_IMAGE_MODEL") var conversationDuration = os.Getenv("LOCALAGI_CONVERSATION_DURATION") var localOperatorBaseURL = os.Getenv("LOCALOPERATOR_BASE_URL") +var mcpboxURL = os.Getenv("LOCALAGI_MCPBOX_URL") func init() { if baseModel == "" {