From bd903a3f33717da6171154e4ac339e72745c20d7 Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 23 Apr 2025 18:28:54 +0200 Subject: [PATCH] add one-time process, attach to UI the mcp server json configuration Signed-off-by: mudler --- Dockerfile.mcpbox | 8 +-- core/state/config.go | 163 +++++++++++++++++++++++++++++++++++++++++++ pkg/stdio/client.go | 77 ++++++++++++++++++-- pkg/stdio/server.go | 66 +++++++++++++++++- 4 files changed, 302 insertions(+), 12 deletions(-) diff --git a/Dockerfile.mcpbox b/Dockerfile.mcpbox index b79efe8..e5bea4a 100644 --- a/Dockerfile.mcpbox +++ b/Dockerfile.mcpbox @@ -1,5 +1,5 @@ # Build stage -FROM golang:1.21-alpine AS builder +FROM golang:1.24-alpine AS builder # Install build dependencies RUN apk add --no-cache git @@ -23,10 +23,10 @@ RUN CGO_ENABLED=0 GOOS=linux go build -o mcpbox ./cmd/mcpbox FROM alpine:3.19 # Install runtime dependencies -RUN apk add --no-cache ca-certificates tzdata +RUN apk add --no-cache ca-certificates tzdata docker # Create non-root user -RUN adduser -D -g '' appuser +#RUN adduser -D -g '' appuser # Set working directory WORKDIR /app @@ -35,7 +35,7 @@ WORKDIR /app COPY --from=builder /app/mcpbox . # Use non-root user -USER appuser +#USER appuser # Expose port EXPOSE 8080 diff --git a/core/state/config.go b/core/state/config.go index 77f9839..574be1d 100644 --- a/core/state/config.go +++ b/core/state/config.go @@ -2,6 +2,8 @@ package state import ( "encoding/json" + "fmt" + "strings" "github.com/mudler/LocalAGI/core/agent" "github.com/mudler/LocalAGI/core/types" @@ -273,6 +275,22 @@ func NewAgentConfigMeta( HelpText: "Number of concurrent tasks that can run in parallel", Tags: config.Tags{Section: "AdvancedSettings"}, }, + { + Name: "mcp_stdio_servers", + Label: "MCP STDIO Servers", + Type: "textarea", + DefaultValue: "", + HelpText: "JSON configuration for MCP STDIO servers", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, + { + Name: "mcp_prepare_script", + Label: "MCP Prepare Script", + Type: "textarea", + DefaultValue: "", + HelpText: "Script to prepare the MCP box", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, }, MCPServers: []config.Field{ { @@ -299,3 +317,148 @@ type Connector interface { AgentReasoningCallback() func(state types.ActionCurrentState) bool Start(a *agent.Agent) } + +// UnmarshalJSON implements json.Unmarshaler for AgentConfig +func (a *AgentConfig) UnmarshalJSON(data []byte) error { + // Create a temporary type to avoid infinite recursion + type Alias AgentConfig + aux := &struct { + *Alias + MCPSTDIOServersConfig interface{} `json:"mcp_stdio_servers"` + }{ + Alias: (*Alias)(a), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + // Handle MCP STDIO servers configuration + if aux.MCPSTDIOServersConfig != nil { + switch v := aux.MCPSTDIOServersConfig.(type) { + case string: + // Parse string configuration + var mcpConfig struct { + MCPServers map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + } `json:"mcpServers"` + } + + if err := json.Unmarshal([]byte(v), &mcpConfig); err != nil { + return fmt.Errorf("failed to parse MCP STDIO servers configuration: %w", err) + } + + a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(mcpConfig.MCPServers)) + for _, server := range mcpConfig.MCPServers { + // Convert env map to slice of "KEY=VALUE" strings + envSlice := make([]string, 0, len(server.Env)) + for k, v := range server.Env { + envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v)) + } + + a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{ + Cmd: server.Command, + Args: server.Args, + Env: envSlice, + }) + } + case []interface{}: + // Parse array configuration + a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(v)) + for _, server := range v { + serverMap, ok := server.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid server configuration format") + } + + cmd, _ := serverMap["cmd"].(string) + args := make([]string, 0) + if argsInterface, ok := serverMap["args"].([]interface{}); ok { + for _, arg := range argsInterface { + if argStr, ok := arg.(string); ok { + args = append(args, argStr) + } + } + } + + env := make([]string, 0) + if envInterface, ok := serverMap["env"].([]interface{}); ok { + for _, e := range envInterface { + if envStr, ok := e.(string); ok { + env = append(env, envStr) + } + } + } + + a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{ + Cmd: cmd, + Args: args, + Env: env, + }) + } + } + } + + return nil +} + +// MarshalJSON implements json.Marshaler for AgentConfig +func (a *AgentConfig) MarshalJSON() ([]byte, error) { + // Create a temporary type to avoid infinite recursion + type Alias AgentConfig + aux := &struct { + *Alias + MCPSTDIOServersConfig string `json:"mcp_stdio_servers,omitempty"` + }{ + Alias: (*Alias)(a), + } + + // Convert MCPSTDIOServers back to the expected JSON format + if len(a.MCPSTDIOServers) > 0 { + mcpConfig := struct { + MCPServers map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + } `json:"mcpServers"` + }{ + MCPServers: make(map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + }), + } + + // Convert each MCPSTDIOServer to the expected format + for i, server := range a.MCPSTDIOServers { + // Convert env slice back to map + envMap := make(map[string]string) + for _, env := range server.Env { + if parts := strings.SplitN(env, "=", 2); len(parts) == 2 { + envMap[parts[0]] = parts[1] + } + } + + mcpConfig.MCPServers[fmt.Sprintf("server%d", i)] = struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + }{ + Command: server.Cmd, + Args: server.Args, + Env: envMap, + } + } + + // Marshal the MCP config to JSON string + mcpConfigJSON, err := json.Marshal(mcpConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal MCP STDIO servers configuration: %w", err) + } + aux.MCPSTDIOServersConfig = string(mcpConfigJSON) + } + + return json.Marshal(aux) +} diff --git a/pkg/stdio/client.go b/pkg/stdio/client.go index c461565..01ff256 100644 --- a/pkg/stdio/client.go +++ b/pkg/stdio/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "net/url" "sync" @@ -33,6 +34,8 @@ func NewClient(baseURL string) *Client { // CreateProcess starts a new process in a group func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) { + log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID) + req := struct { Command string `json:"command"` Args []string `json:"args"` @@ -50,23 +53,28 @@ func (c *Client) CreateProcess(ctx context.Context, command string, args []strin return nil, fmt.Errorf("failed to marshal request: %w", err) } - resp, err := http.Post( - fmt.Sprintf("%s/processes", c.baseURL), - "application/json", - bytes.NewReader(reqBody), - ) + url := fmt.Sprintf("%s/processes", c.baseURL) + log.Printf("Sending POST request to %s", url) + + resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody)) if err != nil { return nil, fmt.Errorf("failed to start process: %w", err) } defer resp.Body.Close() + log.Printf("Received response with status: %d", resp.StatusCode) + var result struct { ID string `json:"id"` } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body)) } + log.Printf("Successfully created process with ID: %s", result.ID) + process := &Process{ ID: result.ID, GroupID: groupID, @@ -193,23 +201,35 @@ func (c *Client) ListGroups() []string { // GetProcessIO returns io.Reader and io.Writer for a process func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) { + log.Printf("Getting IO for process: %s", id) + process, err := c.GetProcess(id) if err != nil { return nil, nil, err } + // Parse the base URL to get the host + baseURL, err := url.Parse(c.baseURL) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse base URL: %w", err) + } + // Connect to WebSocket u := url.URL{ Scheme: "ws", - Host: c.baseURL, + Host: baseURL.Host, Path: fmt.Sprintf("/ws/%s", process.ID), } + log.Printf("Connecting to WebSocket at: %s", u.String()) + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err) } + log.Printf("Successfully connected to WebSocket for process: %s", id) + // Create reader and writer reader := &websocketReader{conn: conn} writer := &websocketWriter{conn: conn} @@ -258,3 +278,46 @@ func (c *Client) Close() error { return nil } + +// RunProcess executes a command and returns its output +func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) { + log.Printf("Running one-time process: command=%s, args=%v", command, args) + + req := struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + }{ + Command: command, + Args: args, + Env: env, + } + + reqBody, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/run", c.baseURL) + log.Printf("Sending POST request to %s", url) + + resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody)) + if err != nil { + return "", fmt.Errorf("failed to execute process: %w", err) + } + defer resp.Body.Close() + + log.Printf("Received response with status: %d", resp.StatusCode) + + var result struct { + Output string `json:"output"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body)) + } + + log.Printf("Successfully executed process with output length: %d", len(result.Output)) + return result.Output, nil +} diff --git a/pkg/stdio/server.go b/pkg/stdio/server.go index 1c07578..bafd198 100644 --- a/pkg/stdio/server.go +++ b/pkg/stdio/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "os" "os/exec" @@ -46,6 +47,8 @@ func NewServer() *Server { // StartProcess starts a new process and returns its ID func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) { + log.Printf("Starting process: command=%s, args=%v, groupID=%s", command, args, groupID) + cmd := exec.CommandContext(ctx, command, args...) if len(env) > 0 { @@ -88,6 +91,7 @@ func (s *Server) StartProcess(ctx context.Context, command string, args []string } s.mu.Unlock() + log.Printf("Successfully started process with ID: %s", process.ID) return process.ID, nil } @@ -201,6 +205,22 @@ func (s *Server) ListProcesses() []*Process { return processes } +// RunProcess executes a command and returns its output +func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) { + cmd := exec.CommandContext(ctx, command, args...) + + if len(env) > 0 { + cmd.Env = append(os.Environ(), env...) + } + + output, err := cmd.CombinedOutput() + if err != nil { + return string(output), fmt.Errorf("process failed: %w", err) + } + + return string(output), nil +} + // Start starts the HTTP server func (s *Server) Start(addr string) error { http.HandleFunc("/processes", s.handleProcesses) @@ -208,11 +228,14 @@ func (s *Server) Start(addr string) error { http.HandleFunc("/ws/", s.handleWebSocket) http.HandleFunc("/groups", s.handleGroups) http.HandleFunc("/groups/", s.handleGroup) + http.HandleFunc("/run", s.handleRun) return http.ListenAndServe(addr, nil) } func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) { + log.Printf("Handling /processes request: method=%s", r.Method) + switch r.Method { case http.MethodGet: processes := s.ListProcesses() @@ -266,6 +289,8 @@ func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) { func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { id := r.URL.Path[len("/ws/"):] + log.Printf("Handling WebSocket connection for process: %s", id) + process, err := s.GetProcess(id) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) @@ -279,11 +304,14 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { } defer conn.Close() + log.Printf("WebSocket connection established for process: %s", id) + // Handle stdin go func() { for { _, message, err := conn.ReadMessage() if err != nil { + log.Printf("WebSocket stdin read error for process %s: %v", id, err) return } process.Stdin.Write(message) @@ -294,9 +322,9 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { go func() { buf := make([]byte, 1024) for { - n, err := process.Stdout.Read(buf) if err != nil { + log.Printf("WebSocket stdout read error for process %s: %v", id, err) return } conn.WriteMessage(websocket.TextMessage, buf[:n]) @@ -309,6 +337,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { for { n, err := process.Stderr.Read(buf) if err != nil { + log.Printf("WebSocket stderr read error for process %s: %v", id, err) return } conn.WriteMessage(websocket.TextMessage, buf[:n]) @@ -317,6 +346,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { // Wait for process to exit process.Cmd.Wait() + log.Printf("Process %s exited", id) } // Add new handlers for group management @@ -351,3 +381,37 @@ func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } + +func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + log.Printf("Handling /run request") + + var req struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args) + + output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.Printf("One-time process completed with output length: %d", len(output)) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "output": output, + }) +}