diff --git a/.github/workflows/image.yml b/.github/workflows/image.yml index ca9f70c..1c61e75 100644 --- a/.github/workflows/image.yml +++ b/.github/workflows/image.yml @@ -84,3 +84,77 @@ jobs: #tags: ${{ steps.prep.outputs.tags }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + mcpbox-build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Prepare + id: prep + run: | + DOCKER_IMAGE=quay.io/mudler/localagi-mcpbox + # Use branch name as default + VERSION=${GITHUB_REF#refs/heads/} + BINARY_VERSION=$(git describe --always --tags --dirty) + SHORTREF=${GITHUB_SHA::8} + # If this is git tag, use the tag name as a docker tag + if [[ $GITHUB_REF == refs/tags/* ]]; then + VERSION=${GITHUB_REF#refs/tags/} + fi + TAGS="${DOCKER_IMAGE}:${VERSION},${DOCKER_IMAGE}:${SHORTREF}" + # If the VERSION looks like a version number, assume that + # this is the most recent version of the image and also + # tag it 'latest'. + if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then + TAGS="$TAGS,${DOCKER_IMAGE}:latest" + fi + # Set output parameters. + echo ::set-output name=binary_version::${BINARY_VERSION} + echo ::set-output name=tags::${TAGS} + echo ::set-output name=docker_image::${DOCKER_IMAGE} + - name: Set up QEMU + uses: docker/setup-qemu-action@master + with: + platforms: all + + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@master + + - name: Login to DockerHub + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: quay.io + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@902fa8ec7d6ecbf8d84d538b9b233a880e428804 + with: + images: quay.io/mudler/localagi-mcpbox + tags: | + type=ref,event=branch,suffix=-{{date 'YYYYMMDDHHmmss'}} + type=semver,pattern={{raw}} + type=sha,suffix=-{{date 'YYYYMMDDHHmmss'}} + type=ref,event=branch + flavor: | + latest=auto + prefix= + suffix= + + - name: Build + uses: docker/build-push-action@v6 + with: + builder: ${{ steps.buildx.outputs.name }} + build-args: | + VERSION=${{ steps.prep.outputs.binary_version }} + context: ./ + file: ./Dockerfile.mcpbox + #platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 + push: true + #tags: ${{ steps.prep.outputs.tags }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/Dockerfile.mcpbox b/Dockerfile.mcpbox new file mode 100644 index 0000000..e5bea4a --- /dev/null +++ b/Dockerfile.mcpbox @@ -0,0 +1,47 @@ +# Build stage +FROM golang:1.24-alpine AS builder + +# Install build dependencies +RUN apk add --no-cache git + +# Set working directory +WORKDIR /app + +# Copy go mod files +COPY go.mod go.sum ./ + +# Download dependencies +RUN go mod download + +# Copy source code +COPY . . + +# Build the application +RUN CGO_ENABLED=0 GOOS=linux go build -o mcpbox ./cmd/mcpbox + +# Final stage +FROM alpine:3.19 + +# Install runtime dependencies +RUN apk add --no-cache ca-certificates tzdata docker + +# Create non-root user +#RUN adduser -D -g '' appuser + +# Set working directory +WORKDIR /app + +# Copy binary from builder +COPY --from=builder /app/mcpbox . + +# Use non-root user +#USER appuser + +# Expose port +EXPOSE 8080 + +# Set entrypoint +ENTRYPOINT ["/app/mcpbox"] + +# Default command +CMD ["-addr", ":8080"] diff --git a/Makefile b/Makefile index 6edd743..9c978c7 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,17 @@ GOCMD?=go IMAGE_NAME?=webui +MCPBOX_IMAGE_NAME?=mcpbox ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) -prepare-tests: +prepare-tests: build-mcpbox docker compose up -d --build + docker run -d -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 --rm -ti $(MCPBOX_IMAGE_NAME) cleanup-tests: docker compose down tests: prepare-tests - LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./... + LOCALAGI_MCPBOX_URL="http://localhost:9090" LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./... run-nokb: $(MAKE) run KBDISABLEINDEX=true @@ -23,10 +25,16 @@ build: webui/react-ui/dist .PHONY: run run: webui/react-ui/dist - $(GOCMD) run ./ + LOCALAGI_MCPBOX_URL="http://localhost:9090" $(GOCMD) run ./ build-image: docker build -t $(IMAGE_NAME) -f Dockerfile.webui . image-push: docker push $(IMAGE_NAME) + +build-mcpbox: + docker build -t $(MCPBOX_IMAGE_NAME) -f Dockerfile.mcpbox . + +run-mcpbox: + docker run -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 -ti mcpbox \ No newline at end of file diff --git a/cmd/mcpbox/main.go b/cmd/mcpbox/main.go new file mode 100644 index 0000000..b39b1f3 --- /dev/null +++ b/cmd/mcpbox/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "syscall" + + "github.com/mudler/LocalAGI/pkg/stdio" +) + +func main() { + // Parse command line flags + addr := flag.String("addr", ":8080", "HTTP server address") + flag.Parse() + + // Create and start the server + server := stdio.NewServer() + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + log.Printf("Starting server on %s", *addr) + if err := server.Start(*addr); err != nil { + log.Fatalf("Failed to start server: %v", err) + } + }() + + // Wait for shutdown signal + <-sigChan + log.Println("Shutting down server...") + + // TODO: Implement graceful shutdown if needed + os.Exit(0) +} diff --git a/core/agent/agent.go b/core/agent/agent.go index 1e27617..1aa8fdf 100644 --- a/core/agent/agent.go +++ b/core/agent/agent.go @@ -241,6 +241,7 @@ func (a *Agent) Stop() { a.Lock() defer a.Unlock() xlog.Debug("Stopping agent", "agent", a.Character.Name) + a.closeMCPSTDIOServers() a.context.Cancel() } diff --git a/core/agent/mcp.go b/core/agent/mcp.go index ee67822..0d3987b 100644 --- a/core/agent/mcp.go +++ b/core/agent/mcp.go @@ -3,12 +3,14 @@ package agent import ( "context" "encoding/json" - "errors" mcp "github.com/metoro-io/mcp-golang" "github.com/metoro-io/mcp-golang/transport/http" + stdioTransport "github.com/metoro-io/mcp-golang/transport/stdio" "github.com/mudler/LocalAGI/core/types" + "github.com/mudler/LocalAGI/pkg/stdio" "github.com/mudler/LocalAGI/pkg/xlog" + "github.com/sashabaranov/go-openai/jsonschema" ) @@ -19,6 +21,12 @@ type MCPServer struct { Token string `json:"token"` } +type MCPSTDIOServer struct { + Args []string `json:"args"` + Env []string `json:"env"` + Cmd string `json:"cmd"` +} + type mcpAction struct { mcpClient *mcp.Client inputSchema ToolInputSchema @@ -79,6 +87,68 @@ type ToolInputSchema struct { Required []string `json:"required,omitempty"` } +func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) { + + var generatedActions types.Actions + xlog.Debug("Initializing client") + // Initialize the client + response, e := client.Initialize(a.context) + if e != nil { + xlog.Error("Failed to initialize client", "error", e.Error()) + return nil, e + } + + xlog.Debug("Client initialized: %v", response.Instructions) + + var cursor *string + for { + tools, err := client.ListTools(a.context, cursor) + if err != nil { + xlog.Error("Failed to list tools", "error", err.Error()) + return nil, err + } + + for _, t := range tools.Tools { + desc := "" + if t.Description != nil { + desc = *t.Description + } + + xlog.Debug("Tool", "name", t.Name, "description", desc) + + dat, err := json.Marshal(t.InputSchema) + if err != nil { + xlog.Error("Failed to marshal input schema", "error", err.Error()) + } + + xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat)) + + // XXX: This is a wild guess, to verify (data types might be incompatible) + var inputSchema ToolInputSchema + err = json.Unmarshal(dat, &inputSchema) + if err != nil { + xlog.Error("Failed to unmarshal input schema", "error", err.Error()) + } + + // Create a new action with Client + tool + generatedActions = append(generatedActions, &mcpAction{ + mcpClient: client, + toolName: t.Name, + inputSchema: inputSchema, + toolDescription: desc, + }) + } + + if tools.NextCursor == nil { + break // No more pages + } + cursor = tools.NextCursor + } + + return generatedActions, nil + +} + func (a *Agent) initMCPActions() error { a.mcpActions = nil @@ -86,6 +156,7 @@ func (a *Agent) initMCPActions() error { generatedActions := types.Actions{} + // MCP HTTP Servers for _, mcpServer := range a.options.mcpServers { transport := http.NewHTTPClientTransport("/mcp") transport.WithBaseURL(mcpServer.URL) @@ -95,70 +166,60 @@ func (a *Agent) initMCPActions() error { // Create a new client client := mcp.NewClient(transport) + xlog.Debug("Adding tools for MCP server", "server", mcpServer) + actions, err := a.addTools(client) + if err != nil { + xlog.Error("Failed to add tools for MCP server", "server", mcpServer, "error", err.Error()) + } + generatedActions = append(generatedActions, actions...) + } - xlog.Debug("Initializing client", "server", mcpServer.URL) - // Initialize the client - response, e := client.Initialize(a.context) - if e != nil { - xlog.Error("Failed to initialize client", "error", e.Error(), "server", mcpServer) - if err == nil { - err = e - } else { - err = errors.Join(err, e) - } + // MCP STDIO Servers + + a.closeMCPSTDIOServers() // Make sure we stop all previous servers if any is active + + if a.options.mcpPrepareScript != "" { + xlog.Debug("Preparing MCP box", "script", a.options.mcpPrepareScript) + client := stdio.NewClient(a.options.mcpBoxURL) + client.RunProcess(a.context, "/bin/bash", []string{"-c", a.options.mcpPrepareScript}, []string{}) + } + + for _, mcpStdioServer := range a.options.mcpStdioServers { + client := stdio.NewClient(a.options.mcpBoxURL) + p, err := client.CreateProcess(a.context, + mcpStdioServer.Cmd, + mcpStdioServer.Args, + mcpStdioServer.Env, + a.Character.Name) + if err != nil { + xlog.Error("Failed to create process", "error", err.Error()) + continue + } + read, writer, err := client.GetProcessIO(p.ID) + if err != nil { + xlog.Error("Failed to get process IO", "error", err.Error()) continue } - xlog.Debug("Client initialized: %v", response.Instructions) + transport := stdioTransport.NewStdioServerTransportWithIO(read, writer) - var cursor *string - for { - tools, err := client.ListTools(a.context, cursor) - if err != nil { - xlog.Error("Failed to list tools", "error", err.Error()) - return err - } + // Create a new client + mcpClient := mcp.NewClient(transport) - for _, t := range tools.Tools { - desc := "" - if t.Description != nil { - desc = *t.Description - } - - xlog.Debug("Tool", "mcpServer", mcpServer, "name", t.Name, "description", desc) - - dat, err := json.Marshal(t.InputSchema) - if err != nil { - xlog.Error("Failed to marshal input schema", "error", err.Error()) - } - - xlog.Debug("Input schema", "mcpServer", mcpServer, "tool", t.Name, "schema", string(dat)) - - // XXX: This is a wild guess, to verify (data types might be incompatible) - var inputSchema ToolInputSchema - err = json.Unmarshal(dat, &inputSchema) - if err != nil { - xlog.Error("Failed to unmarshal input schema", "error", err.Error()) - } - - // Create a new action with Client + tool - generatedActions = append(generatedActions, &mcpAction{ - mcpClient: client, - toolName: t.Name, - inputSchema: inputSchema, - toolDescription: desc, - }) - } - - if tools.NextCursor == nil { - break // No more pages - } - cursor = tools.NextCursor + xlog.Debug("Adding tools for MCP server (stdio)", "server", mcpStdioServer) + actions, err := a.addTools(mcpClient) + if err != nil { + xlog.Error("Failed to add tools for MCP server", "server", mcpStdioServer, "error", err.Error()) } - + generatedActions = append(generatedActions, actions...) } a.mcpActions = generatedActions return err } + +func (a *Agent) closeMCPSTDIOServers() { + client := stdio.NewClient(a.options.mcpBoxURL) + client.StopGroup(a.Character.Name) +} diff --git a/core/agent/options.go b/core/agent/options.go index 942d062..9f3f250 100644 --- a/core/agent/options.go +++ b/core/agent/options.go @@ -50,8 +50,10 @@ type options struct { conversationsPath string - mcpServers []MCPServer - + mcpServers []MCPServer + mcpStdioServers []MCPSTDIOServer + mcpBoxURL string + mcpPrepareScript string newConversationsSubscribers []func(openai.ChatCompletionMessage) observer Observer @@ -207,6 +209,27 @@ func WithMCPServers(servers ...MCPServer) Option { } } +func WithMCPSTDIOServers(servers ...MCPSTDIOServer) Option { + return func(o *options) error { + o.mcpStdioServers = servers + return nil + } +} + +func WithMCPBoxURL(url string) Option { + return func(o *options) error { + o.mcpBoxURL = url + return nil + } +} + +func WithMCPPrepareScript(script string) Option { + return func(o *options) error { + o.mcpPrepareScript = script + return nil + } +} + func WithLLMAPIURL(url string) Option { return func(o *options) error { o.LLMAPI.APIURL = url diff --git a/core/state/config.go b/core/state/config.go index c510384..9da851c 100644 --- a/core/state/config.go +++ b/core/state/config.go @@ -2,6 +2,8 @@ package state import ( "encoding/json" + "fmt" + "strings" "github.com/mudler/LocalAGI/core/agent" "github.com/mudler/LocalAGI/core/types" @@ -30,10 +32,13 @@ func (d DynamicPromptsConfig) ToMap() map[string]string { } type AgentConfig struct { - Connector []ConnectorConfig `json:"connectors" form:"connectors" ` - Actions []ActionsConfig `json:"actions" form:"actions"` - DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"` - MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"` + Connector []ConnectorConfig `json:"connectors" form:"connectors" ` + Actions []ActionsConfig `json:"actions" form:"actions"` + DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"` + MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"` + MCPSTDIOServers []agent.MCPSTDIOServer `json:"mcp_stdio_servers" form:"mcp_stdio_servers"` + MCPPrepareScript string `json:"mcp_prepare_script" form:"mcp_prepare_script"` + MCPBoxURL string `json:"mcp_box_url" form:"mcp_box_url"` Description string `json:"description" form:"description"` @@ -271,6 +276,22 @@ func NewAgentConfigMeta( HelpText: "Number of concurrent tasks that can run in parallel", Tags: config.Tags{Section: "AdvancedSettings"}, }, + { + Name: "mcp_stdio_servers", + Label: "MCP STDIO Servers", + Type: "textarea", + DefaultValue: "", + HelpText: "JSON configuration for MCP STDIO servers", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, + { + Name: "mcp_prepare_script", + Label: "MCP Prepare Script", + Type: "textarea", + DefaultValue: "", + HelpText: "Script to prepare the MCP box", + Tags: config.Tags{Section: "AdvancedSettings"}, + }, }, MCPServers: []config.Field{ { @@ -297,3 +318,148 @@ type Connector interface { AgentReasoningCallback() func(state types.ActionCurrentState) bool Start(a *agent.Agent) } + +// UnmarshalJSON implements json.Unmarshaler for AgentConfig +func (a *AgentConfig) UnmarshalJSON(data []byte) error { + // Create a temporary type to avoid infinite recursion + type Alias AgentConfig + aux := &struct { + *Alias + MCPSTDIOServersConfig interface{} `json:"mcp_stdio_servers"` + }{ + Alias: (*Alias)(a), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + // Handle MCP STDIO servers configuration + if aux.MCPSTDIOServersConfig != nil { + switch v := aux.MCPSTDIOServersConfig.(type) { + case string: + // Parse string configuration + var mcpConfig struct { + MCPServers map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + } `json:"mcpServers"` + } + + if err := json.Unmarshal([]byte(v), &mcpConfig); err != nil { + return fmt.Errorf("failed to parse MCP STDIO servers configuration: %w", err) + } + + a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(mcpConfig.MCPServers)) + for _, server := range mcpConfig.MCPServers { + // Convert env map to slice of "KEY=VALUE" strings + envSlice := make([]string, 0, len(server.Env)) + for k, v := range server.Env { + envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v)) + } + + a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{ + Cmd: server.Command, + Args: server.Args, + Env: envSlice, + }) + } + case []interface{}: + // Parse array configuration + a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(v)) + for _, server := range v { + serverMap, ok := server.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid server configuration format") + } + + cmd, _ := serverMap["cmd"].(string) + args := make([]string, 0) + if argsInterface, ok := serverMap["args"].([]interface{}); ok { + for _, arg := range argsInterface { + if argStr, ok := arg.(string); ok { + args = append(args, argStr) + } + } + } + + env := make([]string, 0) + if envInterface, ok := serverMap["env"].([]interface{}); ok { + for _, e := range envInterface { + if envStr, ok := e.(string); ok { + env = append(env, envStr) + } + } + } + + a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{ + Cmd: cmd, + Args: args, + Env: env, + }) + } + } + } + + return nil +} + +// MarshalJSON implements json.Marshaler for AgentConfig +func (a *AgentConfig) MarshalJSON() ([]byte, error) { + // Create a temporary type to avoid infinite recursion + type Alias AgentConfig + aux := &struct { + *Alias + MCPSTDIOServersConfig string `json:"mcp_stdio_servers,omitempty"` + }{ + Alias: (*Alias)(a), + } + + // Convert MCPSTDIOServers back to the expected JSON format + if len(a.MCPSTDIOServers) > 0 { + mcpConfig := struct { + MCPServers map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + } `json:"mcpServers"` + }{ + MCPServers: make(map[string]struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + }), + } + + // Convert each MCPSTDIOServer to the expected format + for i, server := range a.MCPSTDIOServers { + // Convert env slice back to map + envMap := make(map[string]string) + for _, env := range server.Env { + if parts := strings.SplitN(env, "=", 2); len(parts) == 2 { + envMap[parts[0]] = parts[1] + } + } + + mcpConfig.MCPServers[fmt.Sprintf("server%d", i)] = struct { + Command string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + }{ + Command: server.Cmd, + Args: server.Args, + Env: envMap, + } + } + + // Marshal the MCP config to JSON string + mcpConfigJSON, err := json.Marshal(mcpConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal MCP STDIO servers configuration: %w", err) + } + aux.MCPSTDIOServersConfig = string(mcpConfigJSON) + } + + return json.Marshal(aux) +} diff --git a/core/state/pool.go b/core/state/pool.go index 5511760..5f4f6ef 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -33,6 +33,7 @@ type AgentPool struct { managers map[string]sse.Manager agentStatus map[string]*Status apiURL, defaultModel, defaultMultimodalModel string + mcpBoxURL string imageModel, localRAGAPI, localRAGKey, apiKey string availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action connectors func(*AgentConfig) []Connector @@ -72,7 +73,7 @@ func loadPoolFromFile(path string) (*AgentPoolData, error) { } func NewAgentPool( - defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory string, + defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory, mcpBoxURL string, LocalRAGAPI string, availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action, connectors func(*AgentConfig) []Connector, @@ -98,6 +99,7 @@ func NewAgentPool( apiURL: apiURL, defaultModel: defaultModel, defaultMultimodalModel: defaultMultimodalModel, + mcpBoxURL: mcpBoxURL, imageModel: imageModel, localRAGAPI: LocalRAGAPI, apiKey: apiKey, @@ -123,6 +125,7 @@ func NewAgentPool( pooldir: directory, defaultModel: defaultModel, defaultMultimodalModel: defaultMultimodalModel, + mcpBoxURL: mcpBoxURL, imageModel: imageModel, apiKey: apiKey, agents: make(map[string]*Agent), @@ -336,6 +339,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O model = config.Model } + if config.MCPBoxURL != "" { + a.mcpBoxURL = config.MCPBoxURL + } + if config.PeriodicRuns == "" { config.PeriodicRuns = "10m" } @@ -396,7 +403,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O WithMCPServers(config.MCPServers...), WithPeriodicRuns(config.PeriodicRuns), WithPermanentGoal(config.PermanentGoal), + WithMCPSTDIOServers(config.MCPSTDIOServers...), + WithMCPBoxURL(a.mcpBoxURL), WithPrompts(promptBlocks...), + WithMCPPrepareScript(config.MCPPrepareScript), // WithDynamicPrompts(dynamicPrompts...), WithCharacter(Character{ Name: name, diff --git a/docker-compose.yaml b/docker-compose.yaml index ba10c72..9728b53 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -46,12 +46,30 @@ services: image: busybox command: ["sh", "-c", "until wget -q -O - http://localrecall:8080 > /dev/null 2>&1; do echo 'Waiting for localrecall...'; sleep 1; done; echo 'localrecall is up!'"] + mcpbox: + build: + context: . + dockerfile: Dockerfile.mcpbox + ports: + - "8080" + volumes: + - ./volumes/mcpbox:/app/data + # share docker socket if you want it to be able to run docker commands + - /var/run/docker.sock:/var/run/docker.sock + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://localhost:8080/processes"] + interval: 30s + timeout: 10s + retries: 3 + localagi: depends_on: localai: condition: service_healthy localrecall-healthcheck: condition: service_completed_successfully + mcpbox: + condition: service_healthy build: context: . dockerfile: Dockerfile.webui @@ -68,6 +86,7 @@ services: - LOCALAGI_STATE_DIR=/pool - LOCALAGI_TIMEOUT=5m - LOCALAGI_ENABLE_CONVERSATIONS_LOGGING=false + - LOCALAGI_MCPBOX_URL=http://mcpbox:8080 extra_hosts: - "host.docker.internal:host-gateway" volumes: diff --git a/main.go b/main.go index d375486..c6bb66b 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ var apiKeysEnv = os.Getenv("LOCALAGI_API_KEYS") var imageModel = os.Getenv("LOCALAGI_IMAGE_MODEL") var conversationDuration = os.Getenv("LOCALAGI_CONVERSATION_DURATION") var localOperatorBaseURL = os.Getenv("LOCALOPERATOR_BASE_URL") +var mcpboxURL = os.Getenv("LOCALAGI_MCPBOX_URL") func init() { if baseModel == "" { @@ -61,6 +62,7 @@ func main() { apiURL, apiKey, stateDir, + mcpboxURL, localRAG, services.Actions(map[string]string{ "browser-agent-runner-base-url": localOperatorBaseURL, diff --git a/pkg/stdio/client.go b/pkg/stdio/client.go new file mode 100644 index 0000000..19f70c3 --- /dev/null +++ b/pkg/stdio/client.go @@ -0,0 +1,325 @@ +package stdio + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// Client implements the transport.Interface for stdio processes +type Client struct { + baseURL string + processes map[string]*Process + groups map[string][]string + mu sync.RWMutex +} + +// NewClient creates a new stdio transport client +func NewClient(baseURL string) *Client { + return &Client{ + baseURL: baseURL, + processes: make(map[string]*Process), + groups: make(map[string][]string), + } +} + +// CreateProcess starts a new process in a group +func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) { + log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID) + + req := struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + GroupID string `json:"group_id"` + }{ + Command: command, + Args: args, + Env: env, + GroupID: groupID, + } + + reqBody, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/processes", c.baseURL) + log.Printf("Sending POST request to %s", url) + + resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody)) + if err != nil { + return nil, fmt.Errorf("failed to start process: %w", err) + } + defer resp.Body.Close() + + log.Printf("Received response with status: %d", resp.StatusCode) + + var result struct { + ID string `json:"id"` + } + + body, _ := io.ReadAll(resp.Body) + + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body)) + } + + log.Printf("Successfully created process with ID: %s", result.ID) + + process := &Process{ + ID: result.ID, + GroupID: groupID, + CreatedAt: time.Now(), + } + + c.mu.Lock() + c.processes[process.ID] = process + if groupID != "" { + c.groups[groupID] = append(c.groups[groupID], process.ID) + } + c.mu.Unlock() + + return process, nil +} + +// GetProcess returns a process by ID +func (c *Client) GetProcess(id string) (*Process, error) { + c.mu.RLock() + process, exists := c.processes[id] + c.mu.RUnlock() + + if !exists { + return nil, fmt.Errorf("process not found: %s", id) + } + + return process, nil +} + +// GetGroupProcesses returns all processes in a group +func (c *Client) GetGroupProcesses(groupID string) ([]*Process, error) { + c.mu.RLock() + processIDs, exists := c.groups[groupID] + if !exists { + c.mu.RUnlock() + return nil, fmt.Errorf("group not found: %s", groupID) + } + + processes := make([]*Process, 0, len(processIDs)) + for _, pid := range processIDs { + if process, exists := c.processes[pid]; exists { + processes = append(processes, process) + } + } + c.mu.RUnlock() + + return processes, nil +} + +// StopProcess stops a single process +func (c *Client) StopProcess(id string) error { + c.mu.Lock() + process, exists := c.processes[id] + if !exists { + c.mu.Unlock() + return fmt.Errorf("process not found: %s", id) + } + + // Remove from group if it exists + if process.GroupID != "" { + groupProcesses := c.groups[process.GroupID] + for i, pid := range groupProcesses { + if pid == id { + c.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...) + break + } + } + if len(c.groups[process.GroupID]) == 0 { + delete(c.groups, process.GroupID) + } + } + + delete(c.processes, id) + c.mu.Unlock() + + req, err := http.NewRequest( + "DELETE", + fmt.Sprintf("%s/processes/%s", c.baseURL, id), + nil, + ) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to stop process: %w", err) + } + resp.Body.Close() + + return nil +} + +// StopGroup stops all processes in a group +func (c *Client) StopGroup(groupID string) error { + c.mu.Lock() + processIDs, exists := c.groups[groupID] + if !exists { + c.mu.Unlock() + return fmt.Errorf("group not found: %s", groupID) + } + c.mu.Unlock() + + for _, pid := range processIDs { + if err := c.StopProcess(pid); err != nil { + return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err) + } + } + + return nil +} + +// ListGroups returns all group IDs +func (c *Client) ListGroups() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + groups := make([]string, 0, len(c.groups)) + for groupID := range c.groups { + groups = append(groups, groupID) + } + return groups +} + +// GetProcessIO returns io.Reader and io.Writer for a process +func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) { + log.Printf("Getting IO for process: %s", id) + + process, err := c.GetProcess(id) + if err != nil { + return nil, nil, err + } + + // Parse the base URL to get the host + baseURL, err := url.Parse(c.baseURL) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse base URL: %w", err) + } + + // Connect to WebSocket + u := url.URL{ + Scheme: "ws", + Host: baseURL.Host, + Path: fmt.Sprintf("/ws/%s", process.ID), + } + + log.Printf("Connecting to WebSocket at: %s", u.String()) + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err) + } + + log.Printf("Successfully connected to WebSocket for process: %s", id) + + // Create reader and writer + reader := &websocketReader{conn: conn} + writer := &websocketWriter{conn: conn} + + return reader, writer, nil +} + +// websocketReader implements io.Reader for WebSocket +type websocketReader struct { + conn *websocket.Conn +} + +func (r *websocketReader) Read(p []byte) (n int, err error) { + _, message, err := r.conn.ReadMessage() + if err != nil { + return 0, err + } + n = copy(p, message) + return n, nil +} + +// websocketWriter implements io.Writer for WebSocket +type websocketWriter struct { + conn *websocket.Conn +} + +func (w *websocketWriter) Write(p []byte) (n int, err error) { + // Use BinaryMessage type for better compatibility + err = w.conn.WriteMessage(websocket.BinaryMessage, p) + if err != nil { + return 0, fmt.Errorf("failed to write WebSocket message: %w", err) + } + return len(p), nil +} + +// Close closes all connections and stops all processes +func (c *Client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + // Stop all processes + for id := range c.processes { + if err := c.StopProcess(id); err != nil { + return fmt.Errorf("failed to stop process %s: %w", id, err) + } + } + + return nil +} + +// RunProcess executes a command and returns its output +func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) { + log.Printf("Running one-time process: command=%s, args=%v", command, args) + + req := struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + }{ + Command: command, + Args: args, + Env: env, + } + + reqBody, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/run", c.baseURL) + log.Printf("Sending POST request to %s", url) + + resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody)) + if err != nil { + return "", fmt.Errorf("failed to execute process: %w", err) + } + defer resp.Body.Close() + + log.Printf("Received response with status: %d", resp.StatusCode) + + var result struct { + Output string `json:"output"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body)) + } + + log.Printf("Successfully executed process with output length: %d", len(result.Output)) + return result.Output, nil +} diff --git a/pkg/stdio/client_suite_test.go b/pkg/stdio/client_suite_test.go new file mode 100644 index 0000000..d24dba5 --- /dev/null +++ b/pkg/stdio/client_suite_test.go @@ -0,0 +1,28 @@ +package stdio + +import ( + "os" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSTDIOTransport(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "STDIOTransport test suite") +} + +var baseURL string + +func init() { + baseURL = os.Getenv("LOCALAGI_MCPBOX_URL") + if baseURL == "" { + baseURL = "http://localhost:8080" + } +} + +var _ = AfterSuite(func() { + client := NewClient(baseURL) + client.StopGroup("test-group") +}) diff --git a/pkg/stdio/client_test.go b/pkg/stdio/client_test.go new file mode 100644 index 0000000..4b67d26 --- /dev/null +++ b/pkg/stdio/client_test.go @@ -0,0 +1,235 @@ +package stdio + +import ( + "context" + "time" + + mcp "github.com/metoro-io/mcp-golang" + "github.com/metoro-io/mcp-golang/transport/stdio" + "github.com/mudler/LocalAGI/pkg/xlog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Client", func() { + var ( + client *Client + ) + + BeforeEach(func() { + client = NewClient(baseURL) + }) + + AfterEach(func() { + if client != nil { + Expect(client.Close()).To(Succeed()) + } + }) + + Context("Process Management", func() { + It("should create and stop a process", func() { + ctx := context.Background() + // Use a command that doesn't exit immediately + process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Hello, World!'; sleep 10"}, []string{}, "test-group") + Expect(err).NotTo(HaveOccurred()) + Expect(process).NotTo(BeNil()) + Expect(process.ID).NotTo(BeEmpty()) + + // Get process IO + reader, writer, err := client.GetProcessIO(process.ID) + Expect(err).NotTo(HaveOccurred()) + Expect(reader).NotTo(BeNil()) + Expect(writer).NotTo(BeNil()) + + // Write to process + _, err = writer.Write([]byte("test input\n")) + Expect(err).NotTo(HaveOccurred()) + + // Read from process with timeout + buf := make([]byte, 1024) + readDone := make(chan struct{}) + var readErr error + var readN int + + go func() { + readN, readErr = reader.Read(buf) + close(readDone) + }() + + // Wait for read with timeout + select { + case <-readDone: + Expect(readErr).NotTo(HaveOccurred()) + Expect(readN).To(BeNumerically(">", 0)) + Expect(string(buf[:readN])).To(ContainSubstring("Hello, World!")) + case <-time.After(5 * time.Second): + Fail("Timeout waiting for process output") + } + + // Stop the process + err = client.StopProcess(process.ID) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should manage process groups", func() { + ctx := context.Background() + groupID := "test-group" + + // Create multiple processes in the same group + process1, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 1'; sleep 1"}, []string{}, groupID) + Expect(err).NotTo(HaveOccurred()) + Expect(process1).NotTo(BeNil()) + + process2, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 2'; sleep 1"}, []string{}, groupID) + Expect(err).NotTo(HaveOccurred()) + Expect(process2).NotTo(BeNil()) + + // Get group processes + processes, err := client.GetGroupProcesses(groupID) + Expect(err).NotTo(HaveOccurred()) + Expect(processes).To(HaveLen(2)) + + // List groups + groups := client.ListGroups() + Expect(groups).To(ContainElement(groupID)) + + // Stop the group + err = client.StopGroup(groupID) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should run a one-time process", func() { + ctx := context.Background() + output, err := client.RunProcess(ctx, "echo", []string{"One-time process"}, []string{}) + Expect(err).NotTo(HaveOccurred()) + Expect(output).To(ContainSubstring("One-time process")) + }) + + It("should handle process with environment variables", func() { + ctx := context.Background() + env := []string{"TEST_VAR=test_value"} + process, err := client.CreateProcess(ctx, "sh", []string{"-c", "env | grep TEST_VAR; sleep 1"}, env, "test-group") + Expect(err).NotTo(HaveOccurred()) + Expect(process).NotTo(BeNil()) + + // Get process IO + reader, _, err := client.GetProcessIO(process.ID) + Expect(err).NotTo(HaveOccurred()) + + // Read environment variables with timeout + buf := make([]byte, 1024) + readDone := make(chan struct{}) + var readErr error + var readN int + + go func() { + readN, readErr = reader.Read(buf) + close(readDone) + }() + + // Wait for read with timeout + select { + case <-readDone: + Expect(readErr).NotTo(HaveOccurred()) + Expect(readN).To(BeNumerically(">", 0)) + Expect(string(buf[:readN])).To(ContainSubstring("TEST_VAR=test_value")) + case <-time.After(5 * time.Second): + Fail("Timeout waiting for process output") + } + + // Stop the process + err = client.StopProcess(process.ID) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle long-running processes", func() { + ctx := context.Background() + process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Starting long process'; sleep 5"}, []string{}, "test-group") + Expect(err).NotTo(HaveOccurred()) + Expect(process).NotTo(BeNil()) + + // Get process IO + reader, _, err := client.GetProcessIO(process.ID) + Expect(err).NotTo(HaveOccurred()) + + // Read initial output + buf := make([]byte, 1024) + readDone := make(chan struct{}) + var readErr error + var readN int + + go func() { + readN, readErr = reader.Read(buf) + close(readDone) + }() + + // Wait for read with timeout + select { + case <-readDone: + Expect(readErr).NotTo(HaveOccurred()) + Expect(readN).To(BeNumerically(">", 0)) + Expect(string(buf[:readN])).To(ContainSubstring("Starting long process")) + case <-time.After(5 * time.Second): + Fail("Timeout waiting for process output") + } + + // Wait a bit to ensure process is running + time.Sleep(time.Second) + + // Stop the process + err = client.StopProcess(process.ID) + Expect(err).NotTo(HaveOccurred()) + }) + + It("MCP", func() { + ctx := context.Background() + process, err := client.CreateProcess(ctx, + "docker", []string{"run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"}, + []string{"GITHUB_PERSONAL_ACCESS_TOKEN=test"}, "test-group") + Expect(err).NotTo(HaveOccurred()) + Expect(process).NotTo(BeNil()) + Expect(process.ID).NotTo(BeEmpty()) + + defer client.StopProcess(process.ID) + + // MCP client + + read, writer, err := client.GetProcessIO(process.ID) + Expect(err).NotTo(HaveOccurred()) + Expect(read).NotTo(BeNil()) + Expect(writer).NotTo(BeNil()) + + transport := stdio.NewStdioServerTransportWithIO(read, writer) + + // Create a new client + mcpClient := mcp.NewClient(transport) + // Initialize the client + response, e := mcpClient.Initialize(ctx) + Expect(e).NotTo(HaveOccurred()) + Expect(response).NotTo(BeNil()) + + Expect(mcpClient.Ping(ctx)).To(Succeed()) + + xlog.Debug("Client initialized: %v", response.Instructions) + + alltools := []mcp.ToolRetType{} + var cursor *string + for { + tools, err := mcpClient.ListTools(ctx, cursor) + Expect(err).NotTo(HaveOccurred()) + Expect(tools).NotTo(BeNil()) + Expect(tools.Tools).NotTo(BeEmpty()) + alltools = append(alltools, tools.Tools...) + + if tools.NextCursor == nil { + break // No more pages + } + cursor = tools.NextCursor + } + + for _, tool := range alltools { + xlog.Debug("Tool: %v", tool) + } + }) + }) +}) diff --git a/pkg/stdio/server.go b/pkg/stdio/server.go new file mode 100644 index 0000000..2941937 --- /dev/null +++ b/pkg/stdio/server.go @@ -0,0 +1,473 @@ +package stdio + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/mudler/LocalAGI/pkg/xlog" +) + +// Process represents a running process with its stdio streams +type Process struct { + ID string + GroupID string + Cmd *exec.Cmd + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + CreatedAt time.Time +} + +// Server handles process management and stdio streaming +type Server struct { + processes map[string]*Process + groups map[string][]string // maps group ID to process IDs + mu sync.RWMutex + upgrader websocket.Upgrader +} + +// NewServer creates a new stdio server +func NewServer() *Server { + return &Server{ + processes: make(map[string]*Process), + groups: make(map[string][]string), + upgrader: websocket.Upgrader{}, + } +} + +// StartProcess starts a new process and returns its ID +func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) { + xlog.Debug("Starting process", "command", command, "args", args, "groupID", groupID) + + cmd := exec.CommandContext(ctx, command, args...) + + if len(env) > 0 { + cmd.Env = append(os.Environ(), env...) + xlog.Debug("Process environment", "env", cmd.Env) + } + + stdin, err := cmd.StdinPipe() + if err != nil { + return "", fmt.Errorf("failed to create stdin pipe: %w", err) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("failed to create stdout pipe: %w", err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return "", fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return "", fmt.Errorf("failed to start process: %w", err) + } + + process := &Process{ + ID: fmt.Sprintf("%d", time.Now().UnixNano()), + GroupID: groupID, + Cmd: cmd, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + CreatedAt: time.Now(), + } + + s.mu.Lock() + s.processes[process.ID] = process + if groupID != "" { + s.groups[groupID] = append(s.groups[groupID], process.ID) + } + s.mu.Unlock() + + xlog.Debug("Successfully started process", "id", process.ID, "pid", cmd.Process.Pid) + return process.ID, nil +} + +// StopProcess stops a running process +func (s *Server) StopProcess(id string) error { + s.mu.Lock() + process, exists := s.processes[id] + if !exists { + s.mu.Unlock() + return fmt.Errorf("process not found: %s", id) + } + + xlog.Debug("Stopping process", "processID", id, "pid", process.Cmd.Process.Pid) + + // Remove from group if it exists + if process.GroupID != "" { + groupProcesses := s.groups[process.GroupID] + for i, pid := range groupProcesses { + if pid == id { + s.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...) + break + } + } + if len(s.groups[process.GroupID]) == 0 { + delete(s.groups, process.GroupID) + } + } + + delete(s.processes, id) + s.mu.Unlock() + + if err := process.Cmd.Process.Kill(); err != nil { + xlog.Debug("Failed to kill process", "processID", id, "pid", process.Cmd.Process.Pid, "error", err) + return fmt.Errorf("failed to kill process: %w", err) + } + + xlog.Debug("Successfully killed process", "processID", id, "pid", process.Cmd.Process.Pid) + return nil +} + +// StopGroup stops all processes in a group +func (s *Server) StopGroup(groupID string) error { + s.mu.Lock() + processIDs, exists := s.groups[groupID] + if !exists { + s.mu.Unlock() + return fmt.Errorf("group not found: %s", groupID) + } + s.mu.Unlock() + + for _, pid := range processIDs { + if err := s.StopProcess(pid); err != nil { + return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err) + } + } + + return nil +} + +// GetGroupProcesses returns all processes in a group +func (s *Server) GetGroupProcesses(groupID string) ([]*Process, error) { + s.mu.RLock() + processIDs, exists := s.groups[groupID] + if !exists { + s.mu.RUnlock() + return nil, fmt.Errorf("group not found: %s", groupID) + } + + processes := make([]*Process, 0, len(processIDs)) + for _, pid := range processIDs { + if process, exists := s.processes[pid]; exists { + processes = append(processes, process) + } + } + s.mu.RUnlock() + + return processes, nil +} + +// ListGroups returns all group IDs +func (s *Server) ListGroups() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + groups := make([]string, 0, len(s.groups)) + for groupID := range s.groups { + groups = append(groups, groupID) + } + return groups +} + +// GetProcess returns a process by ID +func (s *Server) GetProcess(id string) (*Process, error) { + s.mu.RLock() + process, exists := s.processes[id] + s.mu.RUnlock() + + if !exists { + return nil, fmt.Errorf("process not found: %s", id) + } + + return process, nil +} + +// ListProcesses returns all running processes +func (s *Server) ListProcesses() []*Process { + s.mu.RLock() + defer s.mu.RUnlock() + + processes := make([]*Process, 0, len(s.processes)) + for _, p := range s.processes { + processes = append(processes, p) + } + + return processes +} + +// RunProcess executes a command and returns its output +func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) { + cmd := exec.CommandContext(ctx, command, args...) + + if len(env) > 0 { + cmd.Env = append(os.Environ(), env...) + } + + output, err := cmd.CombinedOutput() + if err != nil { + return string(output), fmt.Errorf("process failed: %w", err) + } + + return string(output), nil +} + +// Start starts the HTTP server +func (s *Server) Start(addr string) error { + http.HandleFunc("/processes", s.handleProcesses) + http.HandleFunc("/processes/", s.handleProcess) + http.HandleFunc("/ws/", s.handleWebSocket) + http.HandleFunc("/groups", s.handleGroups) + http.HandleFunc("/groups/", s.handleGroup) + http.HandleFunc("/run", s.handleRun) + + return http.ListenAndServe(addr, nil) +} + +func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) { + log.Printf("Handling /processes request: method=%s", r.Method) + + switch r.Method { + case http.MethodGet: + processes := s.ListProcesses() + json.NewEncoder(w).Encode(processes) + case http.MethodPost: + var req struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + GroupID string `json:"group_id"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + id, err := s.StartProcess(context.Background(), req.Command, req.Args, req.Env, req.GroupID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"id": id}) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) { + id := r.URL.Path[len("/processes/"):] + + switch r.Method { + case http.MethodGet: + process, err := s.GetProcess(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + json.NewEncoder(w).Encode(process) + case http.MethodDelete: + if err := s.StopProcess(id); err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + w.WriteHeader(http.StatusNoContent) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { + id := r.URL.Path[len("/ws/"):] + xlog.Debug("Handling WebSocket connection", "processID", id) + + process, err := s.GetProcess(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + if process.Cmd.ProcessState != nil && process.Cmd.ProcessState.Exited() { + xlog.Debug("Process already exited", "processID", id) + http.Error(w, "Process already exited", http.StatusGone) + return + } + + xlog.Debug("Process is running", "processID", id, "pid", process.Cmd.Process.Pid) + + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer conn.Close() + + xlog.Debug("WebSocket connection established", "processID", id) + + // Create a done channel to signal process completion + done := make(chan struct{}) + + // Handle stdin + go func() { + defer func() { + select { + case <-done: + xlog.Debug("Process stdin handler done", "processID", id) + default: + xlog.Debug("WebSocket stdin connection closed", "processID", id) + } + }() + + for { + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + xlog.Debug("WebSocket stdin unexpected error", "processID", id, "error", err) + } + return + } + xlog.Debug("Received message", "processID", id, "message", string(message)) + if _, err := process.Stdin.Write(message); err != nil { + if err != io.EOF { + xlog.Debug("WebSocket stdin write error", "processID", id, "error", err) + } + return + } + xlog.Debug("Message sent to process", "processID", id, "message", string(message)) + } + }() + + // Handle stdout and stderr + go func() { + defer func() { + select { + case <-done: + xlog.Debug("Process output handler done", "processID", id) + default: + xlog.Debug("WebSocket output connection closed", "processID", id) + } + }() + + // Create a buffer for reading + buf := make([]byte, 4096) + reader := io.MultiReader(process.Stdout, process.Stderr) + + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + xlog.Debug("Read error", "processID", id, "error", err) + } + return + } + + if n > 0 { + xlog.Debug("Sending message", "processID", id, "size", n) + if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + xlog.Debug("WebSocket output write error", "processID", id, "error", err) + } + return + } + xlog.Debug("Message sent to client", "processID", id, "size", n) + } + } + }() + + // Wait for process to exit + xlog.Debug("Waiting for process to exit", "processID", id) + err = process.Cmd.Wait() + close(done) // Signal that the process is done + + if err != nil { + xlog.Debug("Process exited with error", + "processID", id, + "pid", process.Cmd.Process.Pid, + "error", err) + } else { + xlog.Debug("Process exited successfully", + "processID", id, + "pid", process.Cmd.Process.Pid) + } +} + +// Add new handlers for group management +func (s *Server) handleGroups(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + groups := s.ListGroups() + json.NewEncoder(w).Encode(groups) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) { + groupID := r.URL.Path[len("/groups/"):] + + switch r.Method { + case http.MethodGet: + processes, err := s.GetGroupProcesses(groupID) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + json.NewEncoder(w).Encode(processes) + case http.MethodDelete: + if err := s.StopGroup(groupID); err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + w.WriteHeader(http.StatusNoContent) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + log.Printf("Handling /run request") + + var req struct { + Command string `json:"command"` + Args []string `json:"args"` + Env []string `json:"env"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args) + + output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.Printf("One-time process completed with output length: %d", len(output)) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "output": output, + }) +}