Compare commits
18 Commits
obs-detail
...
mcp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bea6c7322 | ||
|
|
0ab7a615f0 | ||
|
|
b4b77c564e | ||
|
|
24c3e41d27 | ||
|
|
ebf26ebb64 | ||
|
|
3e80bd9608 | ||
|
|
34d0954171 | ||
|
|
9b7344fbdf | ||
|
|
608d7aba85 | ||
|
|
bd903a3f33 | ||
|
|
0e1106eaf5 | ||
|
|
e9cd6a1073 | ||
|
|
4b727116cd | ||
|
|
3ebeb82f0d | ||
|
|
6d9c58e6c0 | ||
|
|
02490ea8a2 | ||
|
|
eec88d74fe | ||
|
|
33b8aaddfe |
74
.github/workflows/image.yml
vendored
74
.github/workflows/image.yml
vendored
@@ -84,3 +84,77 @@ jobs:
|
|||||||
#tags: ${{ steps.prep.outputs.tags }}
|
#tags: ${{ steps.prep.outputs.tags }}
|
||||||
tags: ${{ steps.meta.outputs.tags }}
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
labels: ${{ steps.meta.outputs.labels }}
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
|
mcpbox-build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Prepare
|
||||||
|
id: prep
|
||||||
|
run: |
|
||||||
|
DOCKER_IMAGE=quay.io/mudler/localagi-mcpbox
|
||||||
|
# Use branch name as default
|
||||||
|
VERSION=${GITHUB_REF#refs/heads/}
|
||||||
|
BINARY_VERSION=$(git describe --always --tags --dirty)
|
||||||
|
SHORTREF=${GITHUB_SHA::8}
|
||||||
|
# If this is git tag, use the tag name as a docker tag
|
||||||
|
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||||
|
VERSION=${GITHUB_REF#refs/tags/}
|
||||||
|
fi
|
||||||
|
TAGS="${DOCKER_IMAGE}:${VERSION},${DOCKER_IMAGE}:${SHORTREF}"
|
||||||
|
# If the VERSION looks like a version number, assume that
|
||||||
|
# this is the most recent version of the image and also
|
||||||
|
# tag it 'latest'.
|
||||||
|
if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
|
||||||
|
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
|
||||||
|
fi
|
||||||
|
# Set output parameters.
|
||||||
|
echo ::set-output name=binary_version::${BINARY_VERSION}
|
||||||
|
echo ::set-output name=tags::${TAGS}
|
||||||
|
echo ::set-output name=docker_image::${DOCKER_IMAGE}
|
||||||
|
- name: Set up QEMU
|
||||||
|
uses: docker/setup-qemu-action@master
|
||||||
|
with:
|
||||||
|
platforms: all
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
id: buildx
|
||||||
|
uses: docker/setup-buildx-action@master
|
||||||
|
|
||||||
|
- name: Login to DockerHub
|
||||||
|
if: github.event_name != 'pull_request'
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: quay.io
|
||||||
|
username: ${{ secrets.DOCKER_USERNAME }}
|
||||||
|
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||||
|
- name: Extract metadata (tags, labels) for Docker
|
||||||
|
id: meta
|
||||||
|
uses: docker/metadata-action@902fa8ec7d6ecbf8d84d538b9b233a880e428804
|
||||||
|
with:
|
||||||
|
images: quay.io/mudler/localagi-mcpbox
|
||||||
|
tags: |
|
||||||
|
type=ref,event=branch,suffix=-{{date 'YYYYMMDDHHmmss'}}
|
||||||
|
type=semver,pattern={{raw}}
|
||||||
|
type=sha,suffix=-{{date 'YYYYMMDDHHmmss'}}
|
||||||
|
type=ref,event=branch
|
||||||
|
flavor: |
|
||||||
|
latest=auto
|
||||||
|
prefix=
|
||||||
|
suffix=
|
||||||
|
|
||||||
|
- name: Build
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
builder: ${{ steps.buildx.outputs.name }}
|
||||||
|
build-args: |
|
||||||
|
VERSION=${{ steps.prep.outputs.binary_version }}
|
||||||
|
context: ./
|
||||||
|
file: ./Dockerfile.mcpbox
|
||||||
|
#platforms: linux/amd64,linux/arm64
|
||||||
|
platforms: linux/amd64
|
||||||
|
push: true
|
||||||
|
#tags: ${{ steps.prep.outputs.tags }}
|
||||||
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
47
Dockerfile.mcpbox
Normal file
47
Dockerfile.mcpbox
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
# Build stage
|
||||||
|
FROM golang:1.24-alpine AS builder
|
||||||
|
|
||||||
|
# Install build dependencies
|
||||||
|
RUN apk add --no-cache git
|
||||||
|
|
||||||
|
# Set working directory
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy go mod files
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
|
||||||
|
# Download dependencies
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Copy source code
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Build the application
|
||||||
|
RUN CGO_ENABLED=0 GOOS=linux go build -o mcpbox ./cmd/mcpbox
|
||||||
|
|
||||||
|
# Final stage
|
||||||
|
FROM alpine:3.19
|
||||||
|
|
||||||
|
# Install runtime dependencies
|
||||||
|
RUN apk add --no-cache ca-certificates tzdata docker
|
||||||
|
|
||||||
|
# Create non-root user
|
||||||
|
#RUN adduser -D -g '' appuser
|
||||||
|
|
||||||
|
# Set working directory
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy binary from builder
|
||||||
|
COPY --from=builder /app/mcpbox .
|
||||||
|
|
||||||
|
# Use non-root user
|
||||||
|
#USER appuser
|
||||||
|
|
||||||
|
# Expose port
|
||||||
|
EXPOSE 8080
|
||||||
|
|
||||||
|
# Set entrypoint
|
||||||
|
ENTRYPOINT ["/app/mcpbox"]
|
||||||
|
|
||||||
|
# Default command
|
||||||
|
CMD ["-addr", ":8080"]
|
||||||
14
Makefile
14
Makefile
@@ -1,15 +1,17 @@
|
|||||||
GOCMD?=go
|
GOCMD?=go
|
||||||
IMAGE_NAME?=webui
|
IMAGE_NAME?=webui
|
||||||
|
MCPBOX_IMAGE_NAME?=mcpbox
|
||||||
ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
|
ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
|
||||||
|
|
||||||
prepare-tests:
|
prepare-tests: build-mcpbox
|
||||||
docker compose up -d --build
|
docker compose up -d --build
|
||||||
|
docker run -d -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 --rm -ti $(MCPBOX_IMAGE_NAME)
|
||||||
|
|
||||||
cleanup-tests:
|
cleanup-tests:
|
||||||
docker compose down
|
docker compose down
|
||||||
|
|
||||||
tests: prepare-tests
|
tests: prepare-tests
|
||||||
LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./...
|
LOCALAGI_MCPBOX_URL="http://localhost:9090" LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./...
|
||||||
|
|
||||||
run-nokb:
|
run-nokb:
|
||||||
$(MAKE) run KBDISABLEINDEX=true
|
$(MAKE) run KBDISABLEINDEX=true
|
||||||
@@ -23,10 +25,16 @@ build: webui/react-ui/dist
|
|||||||
|
|
||||||
.PHONY: run
|
.PHONY: run
|
||||||
run: webui/react-ui/dist
|
run: webui/react-ui/dist
|
||||||
$(GOCMD) run ./
|
LOCALAGI_MCPBOX_URL="http://localhost:9090" $(GOCMD) run ./
|
||||||
|
|
||||||
build-image:
|
build-image:
|
||||||
docker build -t $(IMAGE_NAME) -f Dockerfile.webui .
|
docker build -t $(IMAGE_NAME) -f Dockerfile.webui .
|
||||||
|
|
||||||
image-push:
|
image-push:
|
||||||
docker push $(IMAGE_NAME)
|
docker push $(IMAGE_NAME)
|
||||||
|
|
||||||
|
build-mcpbox:
|
||||||
|
docker build -t $(MCPBOX_IMAGE_NAME) -f Dockerfile.mcpbox .
|
||||||
|
|
||||||
|
run-mcpbox:
|
||||||
|
docker run -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 -ti mcpbox
|
||||||
38
cmd/mcpbox/main.go
Normal file
38
cmd/mcpbox/main.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/mudler/LocalAGI/pkg/stdio"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Parse command line flags
|
||||||
|
addr := flag.String("addr", ":8080", "HTTP server address")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// Create and start the server
|
||||||
|
server := stdio.NewServer()
|
||||||
|
|
||||||
|
// Handle graceful shutdown
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
log.Printf("Starting server on %s", *addr)
|
||||||
|
if err := server.Start(*addr); err != nil {
|
||||||
|
log.Fatalf("Failed to start server: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for shutdown signal
|
||||||
|
<-sigChan
|
||||||
|
log.Println("Shutting down server...")
|
||||||
|
|
||||||
|
// TODO: Implement graceful shutdown if needed
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
@@ -241,6 +241,7 @@ func (a *Agent) Stop() {
|
|||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
xlog.Debug("Stopping agent", "agent", a.Character.Name)
|
xlog.Debug("Stopping agent", "agent", a.Character.Name)
|
||||||
|
a.closeMCPSTDIOServers()
|
||||||
a.context.Cancel()
|
a.context.Cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,12 +3,14 @@ package agent
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
|
|
||||||
mcp "github.com/metoro-io/mcp-golang"
|
mcp "github.com/metoro-io/mcp-golang"
|
||||||
"github.com/metoro-io/mcp-golang/transport/http"
|
"github.com/metoro-io/mcp-golang/transport/http"
|
||||||
|
stdioTransport "github.com/metoro-io/mcp-golang/transport/stdio"
|
||||||
"github.com/mudler/LocalAGI/core/types"
|
"github.com/mudler/LocalAGI/core/types"
|
||||||
|
"github.com/mudler/LocalAGI/pkg/stdio"
|
||||||
"github.com/mudler/LocalAGI/pkg/xlog"
|
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||||
|
|
||||||
"github.com/sashabaranov/go-openai/jsonschema"
|
"github.com/sashabaranov/go-openai/jsonschema"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,6 +21,12 @@ type MCPServer struct {
|
|||||||
Token string `json:"token"`
|
Token string `json:"token"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MCPSTDIOServer struct {
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env []string `json:"env"`
|
||||||
|
Cmd string `json:"cmd"`
|
||||||
|
}
|
||||||
|
|
||||||
type mcpAction struct {
|
type mcpAction struct {
|
||||||
mcpClient *mcp.Client
|
mcpClient *mcp.Client
|
||||||
inputSchema ToolInputSchema
|
inputSchema ToolInputSchema
|
||||||
@@ -79,6 +87,68 @@ type ToolInputSchema struct {
|
|||||||
Required []string `json:"required,omitempty"`
|
Required []string `json:"required,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) {
|
||||||
|
|
||||||
|
var generatedActions types.Actions
|
||||||
|
xlog.Debug("Initializing client")
|
||||||
|
// Initialize the client
|
||||||
|
response, e := client.Initialize(a.context)
|
||||||
|
if e != nil {
|
||||||
|
xlog.Error("Failed to initialize client", "error", e.Error())
|
||||||
|
return nil, e
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Client initialized: %v", response.Instructions)
|
||||||
|
|
||||||
|
var cursor *string
|
||||||
|
for {
|
||||||
|
tools, err := client.ListTools(a.context, cursor)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to list tools", "error", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, t := range tools.Tools {
|
||||||
|
desc := ""
|
||||||
|
if t.Description != nil {
|
||||||
|
desc = *t.Description
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Tool", "name", t.Name, "description", desc)
|
||||||
|
|
||||||
|
dat, err := json.Marshal(t.InputSchema)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to marshal input schema", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat))
|
||||||
|
|
||||||
|
// XXX: This is a wild guess, to verify (data types might be incompatible)
|
||||||
|
var inputSchema ToolInputSchema
|
||||||
|
err = json.Unmarshal(dat, &inputSchema)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new action with Client + tool
|
||||||
|
generatedActions = append(generatedActions, &mcpAction{
|
||||||
|
mcpClient: client,
|
||||||
|
toolName: t.Name,
|
||||||
|
inputSchema: inputSchema,
|
||||||
|
toolDescription: desc,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if tools.NextCursor == nil {
|
||||||
|
break // No more pages
|
||||||
|
}
|
||||||
|
cursor = tools.NextCursor
|
||||||
|
}
|
||||||
|
|
||||||
|
return generatedActions, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Agent) initMCPActions() error {
|
func (a *Agent) initMCPActions() error {
|
||||||
|
|
||||||
a.mcpActions = nil
|
a.mcpActions = nil
|
||||||
@@ -86,6 +156,7 @@ func (a *Agent) initMCPActions() error {
|
|||||||
|
|
||||||
generatedActions := types.Actions{}
|
generatedActions := types.Actions{}
|
||||||
|
|
||||||
|
// MCP HTTP Servers
|
||||||
for _, mcpServer := range a.options.mcpServers {
|
for _, mcpServer := range a.options.mcpServers {
|
||||||
transport := http.NewHTTPClientTransport("/mcp")
|
transport := http.NewHTTPClientTransport("/mcp")
|
||||||
transport.WithBaseURL(mcpServer.URL)
|
transport.WithBaseURL(mcpServer.URL)
|
||||||
@@ -95,70 +166,60 @@ func (a *Agent) initMCPActions() error {
|
|||||||
|
|
||||||
// Create a new client
|
// Create a new client
|
||||||
client := mcp.NewClient(transport)
|
client := mcp.NewClient(transport)
|
||||||
|
xlog.Debug("Adding tools for MCP server", "server", mcpServer)
|
||||||
|
actions, err := a.addTools(client)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to add tools for MCP server", "server", mcpServer, "error", err.Error())
|
||||||
|
}
|
||||||
|
generatedActions = append(generatedActions, actions...)
|
||||||
|
}
|
||||||
|
|
||||||
xlog.Debug("Initializing client", "server", mcpServer.URL)
|
// MCP STDIO Servers
|
||||||
// Initialize the client
|
|
||||||
response, e := client.Initialize(a.context)
|
a.closeMCPSTDIOServers() // Make sure we stop all previous servers if any is active
|
||||||
if e != nil {
|
|
||||||
xlog.Error("Failed to initialize client", "error", e.Error(), "server", mcpServer)
|
if a.options.mcpPrepareScript != "" {
|
||||||
if err == nil {
|
xlog.Debug("Preparing MCP box", "script", a.options.mcpPrepareScript)
|
||||||
err = e
|
client := stdio.NewClient(a.options.mcpBoxURL)
|
||||||
} else {
|
client.RunProcess(a.context, "/bin/bash", []string{"-c", a.options.mcpPrepareScript}, []string{})
|
||||||
err = errors.Join(err, e)
|
}
|
||||||
}
|
|
||||||
|
for _, mcpStdioServer := range a.options.mcpStdioServers {
|
||||||
|
client := stdio.NewClient(a.options.mcpBoxURL)
|
||||||
|
p, err := client.CreateProcess(a.context,
|
||||||
|
mcpStdioServer.Cmd,
|
||||||
|
mcpStdioServer.Args,
|
||||||
|
mcpStdioServer.Env,
|
||||||
|
a.Character.Name)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to create process", "error", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
read, writer, err := client.GetProcessIO(p.ID)
|
||||||
|
if err != nil {
|
||||||
|
xlog.Error("Failed to get process IO", "error", err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
xlog.Debug("Client initialized: %v", response.Instructions)
|
transport := stdioTransport.NewStdioServerTransportWithIO(read, writer)
|
||||||
|
|
||||||
var cursor *string
|
// Create a new client
|
||||||
for {
|
mcpClient := mcp.NewClient(transport)
|
||||||
tools, err := client.ListTools(a.context, cursor)
|
|
||||||
if err != nil {
|
|
||||||
xlog.Error("Failed to list tools", "error", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, t := range tools.Tools {
|
xlog.Debug("Adding tools for MCP server (stdio)", "server", mcpStdioServer)
|
||||||
desc := ""
|
actions, err := a.addTools(mcpClient)
|
||||||
if t.Description != nil {
|
if err != nil {
|
||||||
desc = *t.Description
|
xlog.Error("Failed to add tools for MCP server", "server", mcpStdioServer, "error", err.Error())
|
||||||
}
|
|
||||||
|
|
||||||
xlog.Debug("Tool", "mcpServer", mcpServer, "name", t.Name, "description", desc)
|
|
||||||
|
|
||||||
dat, err := json.Marshal(t.InputSchema)
|
|
||||||
if err != nil {
|
|
||||||
xlog.Error("Failed to marshal input schema", "error", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
xlog.Debug("Input schema", "mcpServer", mcpServer, "tool", t.Name, "schema", string(dat))
|
|
||||||
|
|
||||||
// XXX: This is a wild guess, to verify (data types might be incompatible)
|
|
||||||
var inputSchema ToolInputSchema
|
|
||||||
err = json.Unmarshal(dat, &inputSchema)
|
|
||||||
if err != nil {
|
|
||||||
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new action with Client + tool
|
|
||||||
generatedActions = append(generatedActions, &mcpAction{
|
|
||||||
mcpClient: client,
|
|
||||||
toolName: t.Name,
|
|
||||||
inputSchema: inputSchema,
|
|
||||||
toolDescription: desc,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if tools.NextCursor == nil {
|
|
||||||
break // No more pages
|
|
||||||
}
|
|
||||||
cursor = tools.NextCursor
|
|
||||||
}
|
}
|
||||||
|
generatedActions = append(generatedActions, actions...)
|
||||||
}
|
}
|
||||||
|
|
||||||
a.mcpActions = generatedActions
|
a.mcpActions = generatedActions
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) closeMCPSTDIOServers() {
|
||||||
|
client := stdio.NewClient(a.options.mcpBoxURL)
|
||||||
|
client.StopGroup(a.Character.Name)
|
||||||
|
}
|
||||||
|
|||||||
@@ -50,8 +50,10 @@ type options struct {
|
|||||||
|
|
||||||
conversationsPath string
|
conversationsPath string
|
||||||
|
|
||||||
mcpServers []MCPServer
|
mcpServers []MCPServer
|
||||||
|
mcpStdioServers []MCPSTDIOServer
|
||||||
|
mcpBoxURL string
|
||||||
|
mcpPrepareScript string
|
||||||
newConversationsSubscribers []func(openai.ChatCompletionMessage)
|
newConversationsSubscribers []func(openai.ChatCompletionMessage)
|
||||||
|
|
||||||
observer Observer
|
observer Observer
|
||||||
@@ -207,6 +209,27 @@ func WithMCPServers(servers ...MCPServer) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithMCPSTDIOServers(servers ...MCPSTDIOServer) Option {
|
||||||
|
return func(o *options) error {
|
||||||
|
o.mcpStdioServers = servers
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithMCPBoxURL(url string) Option {
|
||||||
|
return func(o *options) error {
|
||||||
|
o.mcpBoxURL = url
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithMCPPrepareScript(script string) Option {
|
||||||
|
return func(o *options) error {
|
||||||
|
o.mcpPrepareScript = script
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithLLMAPIURL(url string) Option {
|
func WithLLMAPIURL(url string) Option {
|
||||||
return func(o *options) error {
|
return func(o *options) error {
|
||||||
o.LLMAPI.APIURL = url
|
o.LLMAPI.APIURL = url
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package state
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/mudler/LocalAGI/core/agent"
|
"github.com/mudler/LocalAGI/core/agent"
|
||||||
"github.com/mudler/LocalAGI/core/types"
|
"github.com/mudler/LocalAGI/core/types"
|
||||||
@@ -30,10 +32,13 @@ func (d DynamicPromptsConfig) ToMap() map[string]string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type AgentConfig struct {
|
type AgentConfig struct {
|
||||||
Connector []ConnectorConfig `json:"connectors" form:"connectors" `
|
Connector []ConnectorConfig `json:"connectors" form:"connectors" `
|
||||||
Actions []ActionsConfig `json:"actions" form:"actions"`
|
Actions []ActionsConfig `json:"actions" form:"actions"`
|
||||||
DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"`
|
DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"`
|
||||||
MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"`
|
MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"`
|
||||||
|
MCPSTDIOServers []agent.MCPSTDIOServer `json:"mcp_stdio_servers" form:"mcp_stdio_servers"`
|
||||||
|
MCPPrepareScript string `json:"mcp_prepare_script" form:"mcp_prepare_script"`
|
||||||
|
MCPBoxURL string `json:"mcp_box_url" form:"mcp_box_url"`
|
||||||
|
|
||||||
Description string `json:"description" form:"description"`
|
Description string `json:"description" form:"description"`
|
||||||
|
|
||||||
@@ -271,6 +276,22 @@ func NewAgentConfigMeta(
|
|||||||
HelpText: "Number of concurrent tasks that can run in parallel",
|
HelpText: "Number of concurrent tasks that can run in parallel",
|
||||||
Tags: config.Tags{Section: "AdvancedSettings"},
|
Tags: config.Tags{Section: "AdvancedSettings"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "mcp_stdio_servers",
|
||||||
|
Label: "MCP STDIO Servers",
|
||||||
|
Type: "textarea",
|
||||||
|
DefaultValue: "",
|
||||||
|
HelpText: "JSON configuration for MCP STDIO servers",
|
||||||
|
Tags: config.Tags{Section: "AdvancedSettings"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "mcp_prepare_script",
|
||||||
|
Label: "MCP Prepare Script",
|
||||||
|
Type: "textarea",
|
||||||
|
DefaultValue: "",
|
||||||
|
HelpText: "Script to prepare the MCP box",
|
||||||
|
Tags: config.Tags{Section: "AdvancedSettings"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
MCPServers: []config.Field{
|
MCPServers: []config.Field{
|
||||||
{
|
{
|
||||||
@@ -297,3 +318,148 @@ type Connector interface {
|
|||||||
AgentReasoningCallback() func(state types.ActionCurrentState) bool
|
AgentReasoningCallback() func(state types.ActionCurrentState) bool
|
||||||
Start(a *agent.Agent)
|
Start(a *agent.Agent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON implements json.Unmarshaler for AgentConfig
|
||||||
|
func (a *AgentConfig) UnmarshalJSON(data []byte) error {
|
||||||
|
// Create a temporary type to avoid infinite recursion
|
||||||
|
type Alias AgentConfig
|
||||||
|
aux := &struct {
|
||||||
|
*Alias
|
||||||
|
MCPSTDIOServersConfig interface{} `json:"mcp_stdio_servers"`
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(a),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(data, &aux); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle MCP STDIO servers configuration
|
||||||
|
if aux.MCPSTDIOServersConfig != nil {
|
||||||
|
switch v := aux.MCPSTDIOServersConfig.(type) {
|
||||||
|
case string:
|
||||||
|
// Parse string configuration
|
||||||
|
var mcpConfig struct {
|
||||||
|
MCPServers map[string]struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env map[string]string `json:"env"`
|
||||||
|
} `json:"mcpServers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(v), &mcpConfig); err != nil {
|
||||||
|
return fmt.Errorf("failed to parse MCP STDIO servers configuration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(mcpConfig.MCPServers))
|
||||||
|
for _, server := range mcpConfig.MCPServers {
|
||||||
|
// Convert env map to slice of "KEY=VALUE" strings
|
||||||
|
envSlice := make([]string, 0, len(server.Env))
|
||||||
|
for k, v := range server.Env {
|
||||||
|
envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
|
||||||
|
Cmd: server.Command,
|
||||||
|
Args: server.Args,
|
||||||
|
Env: envSlice,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case []interface{}:
|
||||||
|
// Parse array configuration
|
||||||
|
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(v))
|
||||||
|
for _, server := range v {
|
||||||
|
serverMap, ok := server.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid server configuration format")
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd, _ := serverMap["cmd"].(string)
|
||||||
|
args := make([]string, 0)
|
||||||
|
if argsInterface, ok := serverMap["args"].([]interface{}); ok {
|
||||||
|
for _, arg := range argsInterface {
|
||||||
|
if argStr, ok := arg.(string); ok {
|
||||||
|
args = append(args, argStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
env := make([]string, 0)
|
||||||
|
if envInterface, ok := serverMap["env"].([]interface{}); ok {
|
||||||
|
for _, e := range envInterface {
|
||||||
|
if envStr, ok := e.(string); ok {
|
||||||
|
env = append(env, envStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
|
||||||
|
Cmd: cmd,
|
||||||
|
Args: args,
|
||||||
|
Env: env,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON implements json.Marshaler for AgentConfig
|
||||||
|
func (a *AgentConfig) MarshalJSON() ([]byte, error) {
|
||||||
|
// Create a temporary type to avoid infinite recursion
|
||||||
|
type Alias AgentConfig
|
||||||
|
aux := &struct {
|
||||||
|
*Alias
|
||||||
|
MCPSTDIOServersConfig string `json:"mcp_stdio_servers,omitempty"`
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(a),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert MCPSTDIOServers back to the expected JSON format
|
||||||
|
if len(a.MCPSTDIOServers) > 0 {
|
||||||
|
mcpConfig := struct {
|
||||||
|
MCPServers map[string]struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env map[string]string `json:"env"`
|
||||||
|
} `json:"mcpServers"`
|
||||||
|
}{
|
||||||
|
MCPServers: make(map[string]struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env map[string]string `json:"env"`
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert each MCPSTDIOServer to the expected format
|
||||||
|
for i, server := range a.MCPSTDIOServers {
|
||||||
|
// Convert env slice back to map
|
||||||
|
envMap := make(map[string]string)
|
||||||
|
for _, env := range server.Env {
|
||||||
|
if parts := strings.SplitN(env, "=", 2); len(parts) == 2 {
|
||||||
|
envMap[parts[0]] = parts[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mcpConfig.MCPServers[fmt.Sprintf("server%d", i)] = struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env map[string]string `json:"env"`
|
||||||
|
}{
|
||||||
|
Command: server.Cmd,
|
||||||
|
Args: server.Args,
|
||||||
|
Env: envMap,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal the MCP config to JSON string
|
||||||
|
mcpConfigJSON, err := json.Marshal(mcpConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal MCP STDIO servers configuration: %w", err)
|
||||||
|
}
|
||||||
|
aux.MCPSTDIOServersConfig = string(mcpConfigJSON)
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(aux)
|
||||||
|
}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ type AgentPool struct {
|
|||||||
managers map[string]sse.Manager
|
managers map[string]sse.Manager
|
||||||
agentStatus map[string]*Status
|
agentStatus map[string]*Status
|
||||||
apiURL, defaultModel, defaultMultimodalModel string
|
apiURL, defaultModel, defaultMultimodalModel string
|
||||||
|
mcpBoxURL string
|
||||||
imageModel, localRAGAPI, localRAGKey, apiKey string
|
imageModel, localRAGAPI, localRAGKey, apiKey string
|
||||||
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action
|
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action
|
||||||
connectors func(*AgentConfig) []Connector
|
connectors func(*AgentConfig) []Connector
|
||||||
@@ -72,7 +73,7 @@ func loadPoolFromFile(path string) (*AgentPoolData, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewAgentPool(
|
func NewAgentPool(
|
||||||
defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory string,
|
defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory, mcpBoxURL string,
|
||||||
LocalRAGAPI string,
|
LocalRAGAPI string,
|
||||||
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action,
|
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action,
|
||||||
connectors func(*AgentConfig) []Connector,
|
connectors func(*AgentConfig) []Connector,
|
||||||
@@ -98,6 +99,7 @@ func NewAgentPool(
|
|||||||
apiURL: apiURL,
|
apiURL: apiURL,
|
||||||
defaultModel: defaultModel,
|
defaultModel: defaultModel,
|
||||||
defaultMultimodalModel: defaultMultimodalModel,
|
defaultMultimodalModel: defaultMultimodalModel,
|
||||||
|
mcpBoxURL: mcpBoxURL,
|
||||||
imageModel: imageModel,
|
imageModel: imageModel,
|
||||||
localRAGAPI: LocalRAGAPI,
|
localRAGAPI: LocalRAGAPI,
|
||||||
apiKey: apiKey,
|
apiKey: apiKey,
|
||||||
@@ -123,6 +125,7 @@ func NewAgentPool(
|
|||||||
pooldir: directory,
|
pooldir: directory,
|
||||||
defaultModel: defaultModel,
|
defaultModel: defaultModel,
|
||||||
defaultMultimodalModel: defaultMultimodalModel,
|
defaultMultimodalModel: defaultMultimodalModel,
|
||||||
|
mcpBoxURL: mcpBoxURL,
|
||||||
imageModel: imageModel,
|
imageModel: imageModel,
|
||||||
apiKey: apiKey,
|
apiKey: apiKey,
|
||||||
agents: make(map[string]*Agent),
|
agents: make(map[string]*Agent),
|
||||||
@@ -336,6 +339,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
|
|||||||
model = config.Model
|
model = config.Model
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.MCPBoxURL != "" {
|
||||||
|
a.mcpBoxURL = config.MCPBoxURL
|
||||||
|
}
|
||||||
|
|
||||||
if config.PeriodicRuns == "" {
|
if config.PeriodicRuns == "" {
|
||||||
config.PeriodicRuns = "10m"
|
config.PeriodicRuns = "10m"
|
||||||
}
|
}
|
||||||
@@ -396,7 +403,10 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
|
|||||||
WithMCPServers(config.MCPServers...),
|
WithMCPServers(config.MCPServers...),
|
||||||
WithPeriodicRuns(config.PeriodicRuns),
|
WithPeriodicRuns(config.PeriodicRuns),
|
||||||
WithPermanentGoal(config.PermanentGoal),
|
WithPermanentGoal(config.PermanentGoal),
|
||||||
|
WithMCPSTDIOServers(config.MCPSTDIOServers...),
|
||||||
|
WithMCPBoxURL(a.mcpBoxURL),
|
||||||
WithPrompts(promptBlocks...),
|
WithPrompts(promptBlocks...),
|
||||||
|
WithMCPPrepareScript(config.MCPPrepareScript),
|
||||||
// WithDynamicPrompts(dynamicPrompts...),
|
// WithDynamicPrompts(dynamicPrompts...),
|
||||||
WithCharacter(Character{
|
WithCharacter(Character{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
|||||||
@@ -46,12 +46,30 @@ services:
|
|||||||
image: busybox
|
image: busybox
|
||||||
command: ["sh", "-c", "until wget -q -O - http://localrecall:8080 > /dev/null 2>&1; do echo 'Waiting for localrecall...'; sleep 1; done; echo 'localrecall is up!'"]
|
command: ["sh", "-c", "until wget -q -O - http://localrecall:8080 > /dev/null 2>&1; do echo 'Waiting for localrecall...'; sleep 1; done; echo 'localrecall is up!'"]
|
||||||
|
|
||||||
|
mcpbox:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile.mcpbox
|
||||||
|
ports:
|
||||||
|
- "8080"
|
||||||
|
volumes:
|
||||||
|
- ./volumes/mcpbox:/app/data
|
||||||
|
# share docker socket if you want it to be able to run docker commands
|
||||||
|
- /var/run/docker.sock:/var/run/docker.sock
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-q", "-O", "-", "http://localhost:8080/processes"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
localagi:
|
localagi:
|
||||||
depends_on:
|
depends_on:
|
||||||
localai:
|
localai:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
localrecall-healthcheck:
|
localrecall-healthcheck:
|
||||||
condition: service_completed_successfully
|
condition: service_completed_successfully
|
||||||
|
mcpbox:
|
||||||
|
condition: service_healthy
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: Dockerfile.webui
|
dockerfile: Dockerfile.webui
|
||||||
@@ -68,6 +86,7 @@ services:
|
|||||||
- LOCALAGI_STATE_DIR=/pool
|
- LOCALAGI_STATE_DIR=/pool
|
||||||
- LOCALAGI_TIMEOUT=5m
|
- LOCALAGI_TIMEOUT=5m
|
||||||
- LOCALAGI_ENABLE_CONVERSATIONS_LOGGING=false
|
- LOCALAGI_ENABLE_CONVERSATIONS_LOGGING=false
|
||||||
|
- LOCALAGI_MCPBOX_URL=http://mcpbox:8080
|
||||||
extra_hosts:
|
extra_hosts:
|
||||||
- "host.docker.internal:host-gateway"
|
- "host.docker.internal:host-gateway"
|
||||||
volumes:
|
volumes:
|
||||||
|
|||||||
2
main.go
2
main.go
@@ -23,6 +23,7 @@ var apiKeysEnv = os.Getenv("LOCALAGI_API_KEYS")
|
|||||||
var imageModel = os.Getenv("LOCALAGI_IMAGE_MODEL")
|
var imageModel = os.Getenv("LOCALAGI_IMAGE_MODEL")
|
||||||
var conversationDuration = os.Getenv("LOCALAGI_CONVERSATION_DURATION")
|
var conversationDuration = os.Getenv("LOCALAGI_CONVERSATION_DURATION")
|
||||||
var localOperatorBaseURL = os.Getenv("LOCALOPERATOR_BASE_URL")
|
var localOperatorBaseURL = os.Getenv("LOCALOPERATOR_BASE_URL")
|
||||||
|
var mcpboxURL = os.Getenv("LOCALAGI_MCPBOX_URL")
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if baseModel == "" {
|
if baseModel == "" {
|
||||||
@@ -61,6 +62,7 @@ func main() {
|
|||||||
apiURL,
|
apiURL,
|
||||||
apiKey,
|
apiKey,
|
||||||
stateDir,
|
stateDir,
|
||||||
|
mcpboxURL,
|
||||||
localRAG,
|
localRAG,
|
||||||
services.Actions(map[string]string{
|
services.Actions(map[string]string{
|
||||||
"browser-agent-runner-base-url": localOperatorBaseURL,
|
"browser-agent-runner-base-url": localOperatorBaseURL,
|
||||||
|
|||||||
325
pkg/stdio/client.go
Normal file
325
pkg/stdio/client.go
Normal file
@@ -0,0 +1,325 @@
|
|||||||
|
package stdio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client implements the transport.Interface for stdio processes
|
||||||
|
type Client struct {
|
||||||
|
baseURL string
|
||||||
|
processes map[string]*Process
|
||||||
|
groups map[string][]string
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates a new stdio transport client
|
||||||
|
func NewClient(baseURL string) *Client {
|
||||||
|
return &Client{
|
||||||
|
baseURL: baseURL,
|
||||||
|
processes: make(map[string]*Process),
|
||||||
|
groups: make(map[string][]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateProcess starts a new process in a group
|
||||||
|
func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) {
|
||||||
|
log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID)
|
||||||
|
|
||||||
|
req := struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env []string `json:"env"`
|
||||||
|
GroupID string `json:"group_id"`
|
||||||
|
}{
|
||||||
|
Command: command,
|
||||||
|
Args: args,
|
||||||
|
Env: env,
|
||||||
|
GroupID: groupID,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqBody, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
url := fmt.Sprintf("%s/processes", c.baseURL)
|
||||||
|
log.Printf("Sending POST request to %s", url)
|
||||||
|
|
||||||
|
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to start process: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
log.Printf("Received response with status: %d", resp.StatusCode)
|
||||||
|
|
||||||
|
var result struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
|
||||||
|
if err := json.Unmarshal(body, &result); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Successfully created process with ID: %s", result.ID)
|
||||||
|
|
||||||
|
process := &Process{
|
||||||
|
ID: result.ID,
|
||||||
|
GroupID: groupID,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.processes[process.ID] = process
|
||||||
|
if groupID != "" {
|
||||||
|
c.groups[groupID] = append(c.groups[groupID], process.ID)
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
return process, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProcess returns a process by ID
|
||||||
|
func (c *Client) GetProcess(id string) (*Process, error) {
|
||||||
|
c.mu.RLock()
|
||||||
|
process, exists := c.processes[id]
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("process not found: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return process, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGroupProcesses returns all processes in a group
|
||||||
|
func (c *Client) GetGroupProcesses(groupID string) ([]*Process, error) {
|
||||||
|
c.mu.RLock()
|
||||||
|
processIDs, exists := c.groups[groupID]
|
||||||
|
if !exists {
|
||||||
|
c.mu.RUnlock()
|
||||||
|
return nil, fmt.Errorf("group not found: %s", groupID)
|
||||||
|
}
|
||||||
|
|
||||||
|
processes := make([]*Process, 0, len(processIDs))
|
||||||
|
for _, pid := range processIDs {
|
||||||
|
if process, exists := c.processes[pid]; exists {
|
||||||
|
processes = append(processes, process)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
return processes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopProcess stops a single process
|
||||||
|
func (c *Client) StopProcess(id string) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
process, exists := c.processes[id]
|
||||||
|
if !exists {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return fmt.Errorf("process not found: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from group if it exists
|
||||||
|
if process.GroupID != "" {
|
||||||
|
groupProcesses := c.groups[process.GroupID]
|
||||||
|
for i, pid := range groupProcesses {
|
||||||
|
if pid == id {
|
||||||
|
c.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(c.groups[process.GroupID]) == 0 {
|
||||||
|
delete(c.groups, process.GroupID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(c.processes, id)
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
req, err := http.NewRequest(
|
||||||
|
"DELETE",
|
||||||
|
fmt.Sprintf("%s/processes/%s", c.baseURL, id),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stop process: %w", err)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopGroup stops all processes in a group
|
||||||
|
func (c *Client) StopGroup(groupID string) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
processIDs, exists := c.groups[groupID]
|
||||||
|
if !exists {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return fmt.Errorf("group not found: %s", groupID)
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
for _, pid := range processIDs {
|
||||||
|
if err := c.StopProcess(pid); err != nil {
|
||||||
|
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListGroups returns all group IDs
|
||||||
|
func (c *Client) ListGroups() []string {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
groups := make([]string, 0, len(c.groups))
|
||||||
|
for groupID := range c.groups {
|
||||||
|
groups = append(groups, groupID)
|
||||||
|
}
|
||||||
|
return groups
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProcessIO returns io.Reader and io.Writer for a process
|
||||||
|
func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) {
|
||||||
|
log.Printf("Getting IO for process: %s", id)
|
||||||
|
|
||||||
|
process, err := c.GetProcess(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the base URL to get the host
|
||||||
|
baseURL, err := url.Parse(c.baseURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to parse base URL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to WebSocket
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "ws",
|
||||||
|
Host: baseURL.Host,
|
||||||
|
Path: fmt.Sprintf("/ws/%s", process.ID),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Connecting to WebSocket at: %s", u.String())
|
||||||
|
|
||||||
|
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Successfully connected to WebSocket for process: %s", id)
|
||||||
|
|
||||||
|
// Create reader and writer
|
||||||
|
reader := &websocketReader{conn: conn}
|
||||||
|
writer := &websocketWriter{conn: conn}
|
||||||
|
|
||||||
|
return reader, writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// websocketReader implements io.Reader for WebSocket
|
||||||
|
type websocketReader struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *websocketReader) Read(p []byte) (n int, err error) {
|
||||||
|
_, message, err := r.conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
n = copy(p, message)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// websocketWriter implements io.Writer for WebSocket
|
||||||
|
type websocketWriter struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *websocketWriter) Write(p []byte) (n int, err error) {
|
||||||
|
// Use BinaryMessage type for better compatibility
|
||||||
|
err = w.conn.WriteMessage(websocket.BinaryMessage, p)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to write WebSocket message: %w", err)
|
||||||
|
}
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes all connections and stops all processes
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
// Stop all processes
|
||||||
|
for id := range c.processes {
|
||||||
|
if err := c.StopProcess(id); err != nil {
|
||||||
|
return fmt.Errorf("failed to stop process %s: %w", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunProcess executes a command and returns its output
|
||||||
|
func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
|
||||||
|
log.Printf("Running one-time process: command=%s, args=%v", command, args)
|
||||||
|
|
||||||
|
req := struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env []string `json:"env"`
|
||||||
|
}{
|
||||||
|
Command: command,
|
||||||
|
Args: args,
|
||||||
|
Env: env,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqBody, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to marshal request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
url := fmt.Sprintf("%s/run", c.baseURL)
|
||||||
|
log.Printf("Sending POST request to %s", url)
|
||||||
|
|
||||||
|
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to execute process: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
log.Printf("Received response with status: %d", resp.StatusCode)
|
||||||
|
|
||||||
|
var result struct {
|
||||||
|
Output string `json:"output"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Successfully executed process with output length: %d", len(result.Output))
|
||||||
|
return result.Output, nil
|
||||||
|
}
|
||||||
28
pkg/stdio/client_suite_test.go
Normal file
28
pkg/stdio/client_suite_test.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package stdio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSTDIOTransport(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "STDIOTransport test suite")
|
||||||
|
}
|
||||||
|
|
||||||
|
var baseURL string
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
baseURL = os.Getenv("LOCALAGI_MCPBOX_URL")
|
||||||
|
if baseURL == "" {
|
||||||
|
baseURL = "http://localhost:8080"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ = AfterSuite(func() {
|
||||||
|
client := NewClient(baseURL)
|
||||||
|
client.StopGroup("test-group")
|
||||||
|
})
|
||||||
235
pkg/stdio/client_test.go
Normal file
235
pkg/stdio/client_test.go
Normal file
@@ -0,0 +1,235 @@
|
|||||||
|
package stdio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mcp "github.com/metoro-io/mcp-golang"
|
||||||
|
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||||
|
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Client", func() {
|
||||||
|
var (
|
||||||
|
client *Client
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
client = NewClient(baseURL)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
if client != nil {
|
||||||
|
Expect(client.Close()).To(Succeed())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("Process Management", func() {
|
||||||
|
It("should create and stop a process", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
// Use a command that doesn't exit immediately
|
||||||
|
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Hello, World!'; sleep 10"}, []string{}, "test-group")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process).NotTo(BeNil())
|
||||||
|
Expect(process.ID).NotTo(BeEmpty())
|
||||||
|
|
||||||
|
// Get process IO
|
||||||
|
reader, writer, err := client.GetProcessIO(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(reader).NotTo(BeNil())
|
||||||
|
Expect(writer).NotTo(BeNil())
|
||||||
|
|
||||||
|
// Write to process
|
||||||
|
_, err = writer.Write([]byte("test input\n"))
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Read from process with timeout
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
var readErr error
|
||||||
|
var readN int
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
readN, readErr = reader.Read(buf)
|
||||||
|
close(readDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for read with timeout
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
Expect(readErr).NotTo(HaveOccurred())
|
||||||
|
Expect(readN).To(BeNumerically(">", 0))
|
||||||
|
Expect(string(buf[:readN])).To(ContainSubstring("Hello, World!"))
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
Fail("Timeout waiting for process output")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the process
|
||||||
|
err = client.StopProcess(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should manage process groups", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
groupID := "test-group"
|
||||||
|
|
||||||
|
// Create multiple processes in the same group
|
||||||
|
process1, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 1'; sleep 1"}, []string{}, groupID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process1).NotTo(BeNil())
|
||||||
|
|
||||||
|
process2, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 2'; sleep 1"}, []string{}, groupID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process2).NotTo(BeNil())
|
||||||
|
|
||||||
|
// Get group processes
|
||||||
|
processes, err := client.GetGroupProcesses(groupID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(processes).To(HaveLen(2))
|
||||||
|
|
||||||
|
// List groups
|
||||||
|
groups := client.ListGroups()
|
||||||
|
Expect(groups).To(ContainElement(groupID))
|
||||||
|
|
||||||
|
// Stop the group
|
||||||
|
err = client.StopGroup(groupID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should run a one-time process", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
output, err := client.RunProcess(ctx, "echo", []string{"One-time process"}, []string{})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(output).To(ContainSubstring("One-time process"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should handle process with environment variables", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
env := []string{"TEST_VAR=test_value"}
|
||||||
|
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "env | grep TEST_VAR; sleep 1"}, env, "test-group")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process).NotTo(BeNil())
|
||||||
|
|
||||||
|
// Get process IO
|
||||||
|
reader, _, err := client.GetProcessIO(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Read environment variables with timeout
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
var readErr error
|
||||||
|
var readN int
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
readN, readErr = reader.Read(buf)
|
||||||
|
close(readDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for read with timeout
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
Expect(readErr).NotTo(HaveOccurred())
|
||||||
|
Expect(readN).To(BeNumerically(">", 0))
|
||||||
|
Expect(string(buf[:readN])).To(ContainSubstring("TEST_VAR=test_value"))
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
Fail("Timeout waiting for process output")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the process
|
||||||
|
err = client.StopProcess(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should handle long-running processes", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Starting long process'; sleep 5"}, []string{}, "test-group")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process).NotTo(BeNil())
|
||||||
|
|
||||||
|
// Get process IO
|
||||||
|
reader, _, err := client.GetProcessIO(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Read initial output
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
var readErr error
|
||||||
|
var readN int
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
readN, readErr = reader.Read(buf)
|
||||||
|
close(readDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for read with timeout
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
Expect(readErr).NotTo(HaveOccurred())
|
||||||
|
Expect(readN).To(BeNumerically(">", 0))
|
||||||
|
Expect(string(buf[:readN])).To(ContainSubstring("Starting long process"))
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
Fail("Timeout waiting for process output")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait a bit to ensure process is running
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// Stop the process
|
||||||
|
err = client.StopProcess(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("MCP", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
process, err := client.CreateProcess(ctx,
|
||||||
|
"docker", []string{"run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"},
|
||||||
|
[]string{"GITHUB_PERSONAL_ACCESS_TOKEN=test"}, "test-group")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(process).NotTo(BeNil())
|
||||||
|
Expect(process.ID).NotTo(BeEmpty())
|
||||||
|
|
||||||
|
defer client.StopProcess(process.ID)
|
||||||
|
|
||||||
|
// MCP client
|
||||||
|
|
||||||
|
read, writer, err := client.GetProcessIO(process.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(read).NotTo(BeNil())
|
||||||
|
Expect(writer).NotTo(BeNil())
|
||||||
|
|
||||||
|
transport := stdio.NewStdioServerTransportWithIO(read, writer)
|
||||||
|
|
||||||
|
// Create a new client
|
||||||
|
mcpClient := mcp.NewClient(transport)
|
||||||
|
// Initialize the client
|
||||||
|
response, e := mcpClient.Initialize(ctx)
|
||||||
|
Expect(e).NotTo(HaveOccurred())
|
||||||
|
Expect(response).NotTo(BeNil())
|
||||||
|
|
||||||
|
Expect(mcpClient.Ping(ctx)).To(Succeed())
|
||||||
|
|
||||||
|
xlog.Debug("Client initialized: %v", response.Instructions)
|
||||||
|
|
||||||
|
alltools := []mcp.ToolRetType{}
|
||||||
|
var cursor *string
|
||||||
|
for {
|
||||||
|
tools, err := mcpClient.ListTools(ctx, cursor)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(tools).NotTo(BeNil())
|
||||||
|
Expect(tools.Tools).NotTo(BeEmpty())
|
||||||
|
alltools = append(alltools, tools.Tools...)
|
||||||
|
|
||||||
|
if tools.NextCursor == nil {
|
||||||
|
break // No more pages
|
||||||
|
}
|
||||||
|
cursor = tools.NextCursor
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tool := range alltools {
|
||||||
|
xlog.Debug("Tool: %v", tool)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
473
pkg/stdio/server.go
Normal file
473
pkg/stdio/server.go
Normal file
@@ -0,0 +1,473 @@
|
|||||||
|
package stdio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Process represents a running process with its stdio streams
|
||||||
|
type Process struct {
|
||||||
|
ID string
|
||||||
|
GroupID string
|
||||||
|
Cmd *exec.Cmd
|
||||||
|
Stdin io.WriteCloser
|
||||||
|
Stdout io.ReadCloser
|
||||||
|
Stderr io.ReadCloser
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server handles process management and stdio streaming
|
||||||
|
type Server struct {
|
||||||
|
processes map[string]*Process
|
||||||
|
groups map[string][]string // maps group ID to process IDs
|
||||||
|
mu sync.RWMutex
|
||||||
|
upgrader websocket.Upgrader
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServer creates a new stdio server
|
||||||
|
func NewServer() *Server {
|
||||||
|
return &Server{
|
||||||
|
processes: make(map[string]*Process),
|
||||||
|
groups: make(map[string][]string),
|
||||||
|
upgrader: websocket.Upgrader{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartProcess starts a new process and returns its ID
|
||||||
|
func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) {
|
||||||
|
xlog.Debug("Starting process", "command", command, "args", args, "groupID", groupID)
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, command, args...)
|
||||||
|
|
||||||
|
if len(env) > 0 {
|
||||||
|
cmd.Env = append(os.Environ(), env...)
|
||||||
|
xlog.Debug("Process environment", "env", cmd.Env)
|
||||||
|
}
|
||||||
|
|
||||||
|
stdin, err := cmd.StdinPipe()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create stdin pipe: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create stdout pipe: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create stderr pipe: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to start process: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
process := &Process{
|
||||||
|
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||||
|
GroupID: groupID,
|
||||||
|
Cmd: cmd,
|
||||||
|
Stdin: stdin,
|
||||||
|
Stdout: stdout,
|
||||||
|
Stderr: stderr,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.processes[process.ID] = process
|
||||||
|
if groupID != "" {
|
||||||
|
s.groups[groupID] = append(s.groups[groupID], process.ID)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
xlog.Debug("Successfully started process", "id", process.ID, "pid", cmd.Process.Pid)
|
||||||
|
return process.ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopProcess stops a running process
|
||||||
|
func (s *Server) StopProcess(id string) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
process, exists := s.processes[id]
|
||||||
|
if !exists {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return fmt.Errorf("process not found: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Stopping process", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||||
|
|
||||||
|
// Remove from group if it exists
|
||||||
|
if process.GroupID != "" {
|
||||||
|
groupProcesses := s.groups[process.GroupID]
|
||||||
|
for i, pid := range groupProcesses {
|
||||||
|
if pid == id {
|
||||||
|
s.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.groups[process.GroupID]) == 0 {
|
||||||
|
delete(s.groups, process.GroupID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.processes, id)
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if err := process.Cmd.Process.Kill(); err != nil {
|
||||||
|
xlog.Debug("Failed to kill process", "processID", id, "pid", process.Cmd.Process.Pid, "error", err)
|
||||||
|
return fmt.Errorf("failed to kill process: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Successfully killed process", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopGroup stops all processes in a group
|
||||||
|
func (s *Server) StopGroup(groupID string) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
processIDs, exists := s.groups[groupID]
|
||||||
|
if !exists {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return fmt.Errorf("group not found: %s", groupID)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
for _, pid := range processIDs {
|
||||||
|
if err := s.StopProcess(pid); err != nil {
|
||||||
|
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGroupProcesses returns all processes in a group
|
||||||
|
func (s *Server) GetGroupProcesses(groupID string) ([]*Process, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
processIDs, exists := s.groups[groupID]
|
||||||
|
if !exists {
|
||||||
|
s.mu.RUnlock()
|
||||||
|
return nil, fmt.Errorf("group not found: %s", groupID)
|
||||||
|
}
|
||||||
|
|
||||||
|
processes := make([]*Process, 0, len(processIDs))
|
||||||
|
for _, pid := range processIDs {
|
||||||
|
if process, exists := s.processes[pid]; exists {
|
||||||
|
processes = append(processes, process)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
return processes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListGroups returns all group IDs
|
||||||
|
func (s *Server) ListGroups() []string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
groups := make([]string, 0, len(s.groups))
|
||||||
|
for groupID := range s.groups {
|
||||||
|
groups = append(groups, groupID)
|
||||||
|
}
|
||||||
|
return groups
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProcess returns a process by ID
|
||||||
|
func (s *Server) GetProcess(id string) (*Process, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
process, exists := s.processes[id]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("process not found: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return process, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListProcesses returns all running processes
|
||||||
|
func (s *Server) ListProcesses() []*Process {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
processes := make([]*Process, 0, len(s.processes))
|
||||||
|
for _, p := range s.processes {
|
||||||
|
processes = append(processes, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return processes
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunProcess executes a command and returns its output
|
||||||
|
func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
|
||||||
|
cmd := exec.CommandContext(ctx, command, args...)
|
||||||
|
|
||||||
|
if len(env) > 0 {
|
||||||
|
cmd.Env = append(os.Environ(), env...)
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return string(output), fmt.Errorf("process failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(output), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the HTTP server
|
||||||
|
func (s *Server) Start(addr string) error {
|
||||||
|
http.HandleFunc("/processes", s.handleProcesses)
|
||||||
|
http.HandleFunc("/processes/", s.handleProcess)
|
||||||
|
http.HandleFunc("/ws/", s.handleWebSocket)
|
||||||
|
http.HandleFunc("/groups", s.handleGroups)
|
||||||
|
http.HandleFunc("/groups/", s.handleGroup)
|
||||||
|
http.HandleFunc("/run", s.handleRun)
|
||||||
|
|
||||||
|
return http.ListenAndServe(addr, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Printf("Handling /processes request: method=%s", r.Method)
|
||||||
|
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
processes := s.ListProcesses()
|
||||||
|
json.NewEncoder(w).Encode(processes)
|
||||||
|
case http.MethodPost:
|
||||||
|
var req struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env []string `json:"env"`
|
||||||
|
GroupID string `json:"group_id"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := s.StartProcess(context.Background(), req.Command, req.Args, req.Env, req.GroupID)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"id": id})
|
||||||
|
default:
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.URL.Path[len("/processes/"):]
|
||||||
|
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
process, err := s.GetProcess(id)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(process)
|
||||||
|
case http.MethodDelete:
|
||||||
|
if err := s.StopProcess(id); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.URL.Path[len("/ws/"):]
|
||||||
|
xlog.Debug("Handling WebSocket connection", "processID", id)
|
||||||
|
|
||||||
|
process, err := s.GetProcess(id)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if process.Cmd.ProcessState != nil && process.Cmd.ProcessState.Exited() {
|
||||||
|
xlog.Debug("Process already exited", "processID", id)
|
||||||
|
http.Error(w, "Process already exited", http.StatusGone)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
xlog.Debug("Process is running", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||||
|
|
||||||
|
conn, err := s.upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
xlog.Debug("WebSocket connection established", "processID", id)
|
||||||
|
|
||||||
|
// Create a done channel to signal process completion
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
// Handle stdin
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
xlog.Debug("Process stdin handler done", "processID", id)
|
||||||
|
default:
|
||||||
|
xlog.Debug("WebSocket stdin connection closed", "processID", id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, message, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||||
|
xlog.Debug("WebSocket stdin unexpected error", "processID", id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
xlog.Debug("Received message", "processID", id, "message", string(message))
|
||||||
|
if _, err := process.Stdin.Write(message); err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
xlog.Debug("WebSocket stdin write error", "processID", id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
xlog.Debug("Message sent to process", "processID", id, "message", string(message))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Handle stdout and stderr
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
xlog.Debug("Process output handler done", "processID", id)
|
||||||
|
default:
|
||||||
|
xlog.Debug("WebSocket output connection closed", "processID", id)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Create a buffer for reading
|
||||||
|
buf := make([]byte, 4096)
|
||||||
|
reader := io.MultiReader(process.Stdout, process.Stderr)
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := reader.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
xlog.Debug("Read error", "processID", id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if n > 0 {
|
||||||
|
xlog.Debug("Sending message", "processID", id, "size", n)
|
||||||
|
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
|
||||||
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||||
|
xlog.Debug("WebSocket output write error", "processID", id, "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
xlog.Debug("Message sent to client", "processID", id, "size", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for process to exit
|
||||||
|
xlog.Debug("Waiting for process to exit", "processID", id)
|
||||||
|
err = process.Cmd.Wait()
|
||||||
|
close(done) // Signal that the process is done
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
xlog.Debug("Process exited with error",
|
||||||
|
"processID", id,
|
||||||
|
"pid", process.Cmd.Process.Pid,
|
||||||
|
"error", err)
|
||||||
|
} else {
|
||||||
|
xlog.Debug("Process exited successfully",
|
||||||
|
"processID", id,
|
||||||
|
"pid", process.Cmd.Process.Pid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new handlers for group management
|
||||||
|
func (s *Server) handleGroups(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
groups := s.ListGroups()
|
||||||
|
json.NewEncoder(w).Encode(groups)
|
||||||
|
default:
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) {
|
||||||
|
groupID := r.URL.Path[len("/groups/"):]
|
||||||
|
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
processes, err := s.GetGroupProcesses(groupID)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(processes)
|
||||||
|
case http.MethodDelete:
|
||||||
|
if err := s.StopGroup(groupID); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Handling /run request")
|
||||||
|
|
||||||
|
var req struct {
|
||||||
|
Command string `json:"command"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env []string `json:"env"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args)
|
||||||
|
|
||||||
|
output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("One-time process completed with output length: %d", len(output))
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{
|
||||||
|
"output": output,
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user