add one-time process, attach to UI the mcp server json configuration

Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
mudler
2025-04-23 18:28:54 +02:00
parent 0e1106eaf5
commit bd903a3f33
4 changed files with 302 additions and 12 deletions

View File

@@ -1,5 +1,5 @@
# Build stage # Build stage
FROM golang:1.21-alpine AS builder FROM golang:1.24-alpine AS builder
# Install build dependencies # Install build dependencies
RUN apk add --no-cache git RUN apk add --no-cache git
@@ -23,10 +23,10 @@ RUN CGO_ENABLED=0 GOOS=linux go build -o mcpbox ./cmd/mcpbox
FROM alpine:3.19 FROM alpine:3.19
# Install runtime dependencies # Install runtime dependencies
RUN apk add --no-cache ca-certificates tzdata RUN apk add --no-cache ca-certificates tzdata docker
# Create non-root user # Create non-root user
RUN adduser -D -g '' appuser #RUN adduser -D -g '' appuser
# Set working directory # Set working directory
WORKDIR /app WORKDIR /app
@@ -35,7 +35,7 @@ WORKDIR /app
COPY --from=builder /app/mcpbox . COPY --from=builder /app/mcpbox .
# Use non-root user # Use non-root user
USER appuser #USER appuser
# Expose port # Expose port
EXPOSE 8080 EXPOSE 8080

View File

@@ -2,6 +2,8 @@ package state
import ( import (
"encoding/json" "encoding/json"
"fmt"
"strings"
"github.com/mudler/LocalAGI/core/agent" "github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/types" "github.com/mudler/LocalAGI/core/types"
@@ -273,6 +275,22 @@ func NewAgentConfigMeta(
HelpText: "Number of concurrent tasks that can run in parallel", HelpText: "Number of concurrent tasks that can run in parallel",
Tags: config.Tags{Section: "AdvancedSettings"}, Tags: config.Tags{Section: "AdvancedSettings"},
}, },
{
Name: "mcp_stdio_servers",
Label: "MCP STDIO Servers",
Type: "textarea",
DefaultValue: "",
HelpText: "JSON configuration for MCP STDIO servers",
Tags: config.Tags{Section: "AdvancedSettings"},
},
{
Name: "mcp_prepare_script",
Label: "MCP Prepare Script",
Type: "textarea",
DefaultValue: "",
HelpText: "Script to prepare the MCP box",
Tags: config.Tags{Section: "AdvancedSettings"},
},
}, },
MCPServers: []config.Field{ MCPServers: []config.Field{
{ {
@@ -299,3 +317,148 @@ type Connector interface {
AgentReasoningCallback() func(state types.ActionCurrentState) bool AgentReasoningCallback() func(state types.ActionCurrentState) bool
Start(a *agent.Agent) Start(a *agent.Agent)
} }
// UnmarshalJSON implements json.Unmarshaler for AgentConfig
func (a *AgentConfig) UnmarshalJSON(data []byte) error {
// Create a temporary type to avoid infinite recursion
type Alias AgentConfig
aux := &struct {
*Alias
MCPSTDIOServersConfig interface{} `json:"mcp_stdio_servers"`
}{
Alias: (*Alias)(a),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
// Handle MCP STDIO servers configuration
if aux.MCPSTDIOServersConfig != nil {
switch v := aux.MCPSTDIOServersConfig.(type) {
case string:
// Parse string configuration
var mcpConfig struct {
MCPServers map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
} `json:"mcpServers"`
}
if err := json.Unmarshal([]byte(v), &mcpConfig); err != nil {
return fmt.Errorf("failed to parse MCP STDIO servers configuration: %w", err)
}
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(mcpConfig.MCPServers))
for _, server := range mcpConfig.MCPServers {
// Convert env map to slice of "KEY=VALUE" strings
envSlice := make([]string, 0, len(server.Env))
for k, v := range server.Env {
envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v))
}
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
Cmd: server.Command,
Args: server.Args,
Env: envSlice,
})
}
case []interface{}:
// Parse array configuration
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(v))
for _, server := range v {
serverMap, ok := server.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid server configuration format")
}
cmd, _ := serverMap["cmd"].(string)
args := make([]string, 0)
if argsInterface, ok := serverMap["args"].([]interface{}); ok {
for _, arg := range argsInterface {
if argStr, ok := arg.(string); ok {
args = append(args, argStr)
}
}
}
env := make([]string, 0)
if envInterface, ok := serverMap["env"].([]interface{}); ok {
for _, e := range envInterface {
if envStr, ok := e.(string); ok {
env = append(env, envStr)
}
}
}
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
Cmd: cmd,
Args: args,
Env: env,
})
}
}
}
return nil
}
// MarshalJSON implements json.Marshaler for AgentConfig
func (a *AgentConfig) MarshalJSON() ([]byte, error) {
// Create a temporary type to avoid infinite recursion
type Alias AgentConfig
aux := &struct {
*Alias
MCPSTDIOServersConfig string `json:"mcp_stdio_servers,omitempty"`
}{
Alias: (*Alias)(a),
}
// Convert MCPSTDIOServers back to the expected JSON format
if len(a.MCPSTDIOServers) > 0 {
mcpConfig := struct {
MCPServers map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
} `json:"mcpServers"`
}{
MCPServers: make(map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}),
}
// Convert each MCPSTDIOServer to the expected format
for i, server := range a.MCPSTDIOServers {
// Convert env slice back to map
envMap := make(map[string]string)
for _, env := range server.Env {
if parts := strings.SplitN(env, "=", 2); len(parts) == 2 {
envMap[parts[0]] = parts[1]
}
}
mcpConfig.MCPServers[fmt.Sprintf("server%d", i)] = struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}{
Command: server.Cmd,
Args: server.Args,
Env: envMap,
}
}
// Marshal the MCP config to JSON string
mcpConfigJSON, err := json.Marshal(mcpConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal MCP STDIO servers configuration: %w", err)
}
aux.MCPSTDIOServersConfig = string(mcpConfigJSON)
}
return json.Marshal(aux)
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/url" "net/url"
"sync" "sync"
@@ -33,6 +34,8 @@ func NewClient(baseURL string) *Client {
// CreateProcess starts a new process in a group // CreateProcess starts a new process in a group
func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) { func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) {
log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID)
req := struct { req := struct {
Command string `json:"command"` Command string `json:"command"`
Args []string `json:"args"` Args []string `json:"args"`
@@ -50,23 +53,28 @@ func (c *Client) CreateProcess(ctx context.Context, command string, args []strin
return nil, fmt.Errorf("failed to marshal request: %w", err) return nil, fmt.Errorf("failed to marshal request: %w", err)
} }
resp, err := http.Post( url := fmt.Sprintf("%s/processes", c.baseURL)
fmt.Sprintf("%s/processes", c.baseURL), log.Printf("Sending POST request to %s", url)
"application/json",
bytes.NewReader(reqBody), resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to start process: %w", err) return nil, fmt.Errorf("failed to start process: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
log.Printf("Received response with status: %d", resp.StatusCode)
var result struct { var result struct {
ID string `json:"id"` ID string `json:"id"`
} }
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err) body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
} }
log.Printf("Successfully created process with ID: %s", result.ID)
process := &Process{ process := &Process{
ID: result.ID, ID: result.ID,
GroupID: groupID, GroupID: groupID,
@@ -193,23 +201,35 @@ func (c *Client) ListGroups() []string {
// GetProcessIO returns io.Reader and io.Writer for a process // GetProcessIO returns io.Reader and io.Writer for a process
func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) { func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) {
log.Printf("Getting IO for process: %s", id)
process, err := c.GetProcess(id) process, err := c.GetProcess(id)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// Parse the base URL to get the host
baseURL, err := url.Parse(c.baseURL)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse base URL: %w", err)
}
// Connect to WebSocket // Connect to WebSocket
u := url.URL{ u := url.URL{
Scheme: "ws", Scheme: "ws",
Host: c.baseURL, Host: baseURL.Host,
Path: fmt.Sprintf("/ws/%s", process.ID), Path: fmt.Sprintf("/ws/%s", process.ID),
} }
log.Printf("Connecting to WebSocket at: %s", u.String())
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err) return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err)
} }
log.Printf("Successfully connected to WebSocket for process: %s", id)
// Create reader and writer // Create reader and writer
reader := &websocketReader{conn: conn} reader := &websocketReader{conn: conn}
writer := &websocketWriter{conn: conn} writer := &websocketWriter{conn: conn}
@@ -258,3 +278,46 @@ func (c *Client) Close() error {
return nil return nil
} }
// RunProcess executes a command and returns its output
func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
log.Printf("Running one-time process: command=%s, args=%v", command, args)
req := struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
}{
Command: command,
Args: args,
Env: env,
}
reqBody, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("failed to marshal request: %w", err)
}
url := fmt.Sprintf("%s/run", c.baseURL)
log.Printf("Sending POST request to %s", url)
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
if err != nil {
return "", fmt.Errorf("failed to execute process: %w", err)
}
defer resp.Body.Close()
log.Printf("Received response with status: %d", resp.StatusCode)
var result struct {
Output string `json:"output"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
}
log.Printf("Successfully executed process with output length: %d", len(result.Output))
return result.Output, nil
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
@@ -46,6 +47,8 @@ func NewServer() *Server {
// StartProcess starts a new process and returns its ID // StartProcess starts a new process and returns its ID
func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) { func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) {
log.Printf("Starting process: command=%s, args=%v, groupID=%s", command, args, groupID)
cmd := exec.CommandContext(ctx, command, args...) cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 { if len(env) > 0 {
@@ -88,6 +91,7 @@ func (s *Server) StartProcess(ctx context.Context, command string, args []string
} }
s.mu.Unlock() s.mu.Unlock()
log.Printf("Successfully started process with ID: %s", process.ID)
return process.ID, nil return process.ID, nil
} }
@@ -201,6 +205,22 @@ func (s *Server) ListProcesses() []*Process {
return processes return processes
} }
// RunProcess executes a command and returns its output
func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}
output, err := cmd.CombinedOutput()
if err != nil {
return string(output), fmt.Errorf("process failed: %w", err)
}
return string(output), nil
}
// Start starts the HTTP server // Start starts the HTTP server
func (s *Server) Start(addr string) error { func (s *Server) Start(addr string) error {
http.HandleFunc("/processes", s.handleProcesses) http.HandleFunc("/processes", s.handleProcesses)
@@ -208,11 +228,14 @@ func (s *Server) Start(addr string) error {
http.HandleFunc("/ws/", s.handleWebSocket) http.HandleFunc("/ws/", s.handleWebSocket)
http.HandleFunc("/groups", s.handleGroups) http.HandleFunc("/groups", s.handleGroups)
http.HandleFunc("/groups/", s.handleGroup) http.HandleFunc("/groups/", s.handleGroup)
http.HandleFunc("/run", s.handleRun)
return http.ListenAndServe(addr, nil) return http.ListenAndServe(addr, nil)
} }
func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) { func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) {
log.Printf("Handling /processes request: method=%s", r.Method)
switch r.Method { switch r.Method {
case http.MethodGet: case http.MethodGet:
processes := s.ListProcesses() processes := s.ListProcesses()
@@ -266,6 +289,8 @@ func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
id := r.URL.Path[len("/ws/"):] id := r.URL.Path[len("/ws/"):]
log.Printf("Handling WebSocket connection for process: %s", id)
process, err := s.GetProcess(id) process, err := s.GetProcess(id)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
@@ -279,11 +304,14 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
} }
defer conn.Close() defer conn.Close()
log.Printf("WebSocket connection established for process: %s", id)
// Handle stdin // Handle stdin
go func() { go func() {
for { for {
_, message, err := conn.ReadMessage() _, message, err := conn.ReadMessage()
if err != nil { if err != nil {
log.Printf("WebSocket stdin read error for process %s: %v", id, err)
return return
} }
process.Stdin.Write(message) process.Stdin.Write(message)
@@ -294,9 +322,9 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
buf := make([]byte, 1024) buf := make([]byte, 1024)
for { for {
n, err := process.Stdout.Read(buf) n, err := process.Stdout.Read(buf)
if err != nil { if err != nil {
log.Printf("WebSocket stdout read error for process %s: %v", id, err)
return return
} }
conn.WriteMessage(websocket.TextMessage, buf[:n]) conn.WriteMessage(websocket.TextMessage, buf[:n])
@@ -309,6 +337,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
for { for {
n, err := process.Stderr.Read(buf) n, err := process.Stderr.Read(buf)
if err != nil { if err != nil {
log.Printf("WebSocket stderr read error for process %s: %v", id, err)
return return
} }
conn.WriteMessage(websocket.TextMessage, buf[:n]) conn.WriteMessage(websocket.TextMessage, buf[:n])
@@ -317,6 +346,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
// Wait for process to exit // Wait for process to exit
process.Cmd.Wait() process.Cmd.Wait()
log.Printf("Process %s exited", id)
} }
// Add new handlers for group management // Add new handlers for group management
@@ -351,3 +381,37 @@ func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
} }
} }
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
log.Printf("Handling /run request")
var req struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args)
output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("One-time process completed with output length: %d", len(output))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"output": output,
})
}