Correction de la migration MCP - Utilisation correcte des transports
- Ajout de l'import client/transport pour accéder aux transports - Correction de initMCPActions pour utiliser NewStreamableHTTP avec support des headers d'autorisation - Correction de la partie STDIO pour utiliser NewStdio avec transport.NewStdio() - Ajout de l'appel Start() sur les clients avant utilisation (requis par la nouvelle API) - Correction des types: utilisation de *client.Client au lieu de client.MCPClient - La migration corrige l'erreur 'transport not started yet' observée dans les logs
This commit is contained in:
@@ -4,9 +4,9 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
mcp "github.com/metoro-io/mcp-golang"
|
||||
"github.com/metoro-io/mcp-golang/transport/http"
|
||||
stdioTransport "github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/mark3labs/mcp-go/client"
|
||||
"github.com/mark3labs/mcp-go/client/transport"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
"github.com/mudler/LocalAGI/core/types"
|
||||
"github.com/mudler/LocalAGI/pkg/stdio"
|
||||
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||
@@ -28,7 +28,7 @@ type MCPSTDIOServer struct {
|
||||
}
|
||||
|
||||
type mcpAction struct {
|
||||
mcpClient *mcp.Client
|
||||
mcpClient *client.Client
|
||||
inputSchema ToolInputSchema
|
||||
toolName string
|
||||
toolDescription string
|
||||
@@ -39,23 +39,41 @@ func (a *mcpAction) Plannable() bool {
|
||||
}
|
||||
|
||||
func (m *mcpAction) Run(ctx context.Context, sharedState *types.AgentSharedState, params types.ActionParams) (types.ActionResult, error) {
|
||||
resp, err := m.mcpClient.CallTool(ctx, m.toolName, params)
|
||||
// Convertir params en format attendu par mark3labs/mcp-go
|
||||
args := make(map[string]interface{})
|
||||
if err := params.Unmarshal(&args); err != nil {
|
||||
return types.ActionResult{}, err
|
||||
}
|
||||
|
||||
// Créer une requête d'appel d'outil
|
||||
request := mcp.CallToolRequest{
|
||||
Params: mcp.CallToolParams{
|
||||
Name: m.toolName,
|
||||
Arguments: args,
|
||||
},
|
||||
}
|
||||
|
||||
// Appeler l'outil
|
||||
result, err := m.mcpClient.CallTool(ctx, request)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to call tool", "error", err.Error())
|
||||
return types.ActionResult{}, err
|
||||
}
|
||||
|
||||
xlog.Debug("MCP response", "response", resp)
|
||||
xlog.Debug("MCP response", "response", result)
|
||||
|
||||
// Traiter le résultat
|
||||
textResult := ""
|
||||
for _, c := range resp.Content {
|
||||
switch c.Type {
|
||||
case mcp.ContentTypeText:
|
||||
textResult += c.TextContent.Text + "\n"
|
||||
case mcp.ContentTypeImage:
|
||||
xlog.Error("Image content not supported yet")
|
||||
case mcp.ContentTypeEmbeddedResource:
|
||||
xlog.Error("Embedded resource content not supported yet")
|
||||
if result.IsError {
|
||||
return types.ActionResult{}, err
|
||||
}
|
||||
|
||||
// Extraire le texte du résultat selon le format de mark3labs/mcp-go
|
||||
for _, content := range result.Content {
|
||||
if textContent, ok := content.(*mcp.TextContent); ok {
|
||||
textResult += textContent.Text + "\n"
|
||||
} else {
|
||||
xlog.Error("Unsupported content type", "type", content)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,12 +105,24 @@ type ToolInputSchema struct {
|
||||
Required []string `json:"required,omitempty"`
|
||||
}
|
||||
|
||||
func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) {
|
||||
func (a *Agent) addTools(mcpClient *client.Client) (types.Actions, error) {
|
||||
|
||||
var generatedActions types.Actions
|
||||
xlog.Debug("Initializing client")
|
||||
|
||||
// Initialize the client
|
||||
response, e := client.Initialize(a.context)
|
||||
initRequest := mcp.InitializeRequest{
|
||||
Params: mcp.InitializeParams{
|
||||
ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
|
||||
Capabilities: mcp.ClientCapabilities{},
|
||||
ClientInfo: mcp.Implementation{
|
||||
Name: "LocalAGI",
|
||||
Version: "1.0.0",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
response, e := mcpClient.Initialize(a.context, initRequest)
|
||||
if e != nil {
|
||||
xlog.Error("Failed to initialize client", "error", e.Error())
|
||||
return nil, e
|
||||
@@ -100,49 +130,40 @@ func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) {
|
||||
|
||||
xlog.Debug("Client initialized: %v", response.Instructions)
|
||||
|
||||
var cursor *string
|
||||
for {
|
||||
tools, err := client.ListTools(a.context, cursor)
|
||||
// List tools using the new API
|
||||
listRequest := mcp.ListToolsRequest{}
|
||||
tools, err := mcpClient.ListTools(a.context, listRequest)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to list tools", "error", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, t := range tools.Tools {
|
||||
desc := t.Description
|
||||
|
||||
xlog.Debug("Tool", "name", t.Name, "description", desc)
|
||||
|
||||
dat, err := json.Marshal(t.InputSchema)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to list tools", "error", err.Error())
|
||||
return nil, err
|
||||
xlog.Error("Failed to marshal input schema", "error", err.Error())
|
||||
}
|
||||
|
||||
for _, t := range tools.Tools {
|
||||
desc := ""
|
||||
if t.Description != nil {
|
||||
desc = *t.Description
|
||||
}
|
||||
xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat))
|
||||
|
||||
xlog.Debug("Tool", "name", t.Name, "description", desc)
|
||||
|
||||
dat, err := json.Marshal(t.InputSchema)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to marshal input schema", "error", err.Error())
|
||||
}
|
||||
|
||||
xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat))
|
||||
|
||||
// XXX: This is a wild guess, to verify (data types might be incompatible)
|
||||
var inputSchema ToolInputSchema
|
||||
err = json.Unmarshal(dat, &inputSchema)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
|
||||
}
|
||||
|
||||
// Create a new action with Client + tool
|
||||
generatedActions = append(generatedActions, &mcpAction{
|
||||
mcpClient: client,
|
||||
toolName: t.Name,
|
||||
inputSchema: inputSchema,
|
||||
toolDescription: desc,
|
||||
})
|
||||
// XXX: This is a wild guess, to verify (data types might be incompatible)
|
||||
var inputSchema ToolInputSchema
|
||||
err = json.Unmarshal(dat, &inputSchema)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
|
||||
}
|
||||
|
||||
if tools.NextCursor == nil {
|
||||
break // No more pages
|
||||
}
|
||||
cursor = tools.NextCursor
|
||||
// Create a new action with Client + tool
|
||||
generatedActions = append(generatedActions, &mcpAction{
|
||||
mcpClient: mcpClient,
|
||||
toolName: t.Name,
|
||||
inputSchema: inputSchema,
|
||||
toolDescription: desc,
|
||||
})
|
||||
}
|
||||
|
||||
return generatedActions, nil
|
||||
@@ -158,16 +179,36 @@ func (a *Agent) initMCPActions() error {
|
||||
|
||||
// MCP HTTP Servers
|
||||
for _, mcpServer := range a.options.mcpServers {
|
||||
transport := http.NewHTTPClientTransport("/mcp")
|
||||
transport.WithBaseURL(mcpServer.URL)
|
||||
// Créer un transport HTTP avec les options appropriées
|
||||
var httpTransport *transport.StreamableHTTP
|
||||
var err error
|
||||
|
||||
if mcpServer.Token != "" {
|
||||
transport.WithHeader("Authorization", "Bearer "+mcpServer.Token)
|
||||
// Utiliser les headers avec token
|
||||
headers := map[string]string{
|
||||
"Authorization": "Bearer " + mcpServer.Token,
|
||||
}
|
||||
httpTransport, err = transport.NewStreamableHTTP(mcpServer.URL, transport.WithHTTPHeaders(headers))
|
||||
} else {
|
||||
httpTransport, err = transport.NewStreamableHTTP(mcpServer.URL)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
xlog.Error("Failed to create HTTP transport", "server", mcpServer, "error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// Créer le client avec le transport
|
||||
mcpClient := client.NewClient(httpTransport)
|
||||
|
||||
// Démarrer le client
|
||||
if err := mcpClient.Start(a.context); err != nil {
|
||||
xlog.Error("Failed to start MCP client", "server", mcpServer, "error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// Create a new client
|
||||
client := mcp.NewClient(transport)
|
||||
xlog.Debug("Adding tools for MCP server", "server", mcpServer)
|
||||
actions, err := a.addTools(client)
|
||||
actions, err := a.addTools(mcpClient)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to add tools for MCP server", "server", mcpServer, "error", err.Error())
|
||||
}
|
||||
@@ -185,26 +226,17 @@ func (a *Agent) initMCPActions() error {
|
||||
}
|
||||
|
||||
for _, mcpStdioServer := range a.options.mcpStdioServers {
|
||||
client := stdio.NewClient(a.options.mcpBoxURL)
|
||||
p, err := client.CreateProcess(a.context,
|
||||
mcpStdioServer.Cmd,
|
||||
mcpStdioServer.Args,
|
||||
mcpStdioServer.Env,
|
||||
a.Character.Name)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to create process", "error", err.Error())
|
||||
// Créer un transport STDIO
|
||||
stdioTransport := transport.NewStdio(mcpStdioServer.Cmd, mcpStdioServer.Env, mcpStdioServer.Args...)
|
||||
|
||||
// Créer le client avec le transport
|
||||
mcpClient := client.NewClient(stdioTransport)
|
||||
|
||||
// Démarrer le client
|
||||
if err := mcpClient.Start(a.context); err != nil {
|
||||
xlog.Error("Failed to start MCP STDIO client", "error", err.Error())
|
||||
continue
|
||||
}
|
||||
read, writer, err := client.GetProcessIO(p.ID)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to get process IO", "error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
transport := stdioTransport.NewStdioServerTransportWithIO(read, writer)
|
||||
|
||||
// Create a new client
|
||||
mcpClient := mcp.NewClient(transport)
|
||||
|
||||
xlog.Debug("Adding tools for MCP server (stdio)", "server", mcpStdioServer)
|
||||
actions, err := a.addTools(mcpClient)
|
||||
|
||||
Reference in New Issue
Block a user