feat: local MCP server support (#61)
* wip Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Signed-off-by: mudler <mudler@localai.io> * Add groups to mcpbox Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Signed-off-by: mudler <mudler@localai.io> * Add mcpbox dockerfile and entrypoint Signed-off-by: mudler <mudler@localai.io> * Attach mcp stdio box to agent Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Signed-off-by: mudler <mudler@localai.io> * Add to dockerfile Signed-off-by: mudler <mudler@localai.io> * Attach to config Signed-off-by: mudler <mudler@localai.io> * Attach to ui Signed-off-by: mudler <mudler@localai.io> * Revert "Attach to ui" This reverts commit 088d0c47e87ee8f84297e47d178fb7384bbe6d45. Signed-off-by: mudler <mudler@localai.io> * add one-time process, attach to UI the mcp server json configuration Signed-off-by: mudler <mudler@localai.io> * quality of life improvements Signed-off-by: mudler <mudler@localai.io> * fixes Signed-off-by: mudler <mudler@localai.io> * Make it working, expose MCP prepare script to UI Signed-off-by: mudler <mudler@localai.io> * Add container image to CI builds * Wire mcpbox to tests * Improve setup' * Not needed anymore, using tests Signed-off-by: mudler <mudler@localai.io> * fix: do not override actions Signed-off-by: mudler <mudler@localai.io> * chore(tests): fix env var Signed-off-by: mudler <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Signed-off-by: mudler <mudler@localai.io>
This commit is contained in:
committed by
GitHub
parent
ce997d2425
commit
eb8663ada1
325
pkg/stdio/client.go
Normal file
325
pkg/stdio/client.go
Normal file
@@ -0,0 +1,325 @@
|
||||
package stdio
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Client implements the transport.Interface for stdio processes
|
||||
type Client struct {
|
||||
baseURL string
|
||||
processes map[string]*Process
|
||||
groups map[string][]string
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewClient creates a new stdio transport client
|
||||
func NewClient(baseURL string) *Client {
|
||||
return &Client{
|
||||
baseURL: baseURL,
|
||||
processes: make(map[string]*Process),
|
||||
groups: make(map[string][]string),
|
||||
}
|
||||
}
|
||||
|
||||
// CreateProcess starts a new process in a group
|
||||
func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) {
|
||||
log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID)
|
||||
|
||||
req := struct {
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args"`
|
||||
Env []string `json:"env"`
|
||||
GroupID string `json:"group_id"`
|
||||
}{
|
||||
Command: command,
|
||||
Args: args,
|
||||
Env: env,
|
||||
GroupID: groupID,
|
||||
}
|
||||
|
||||
reqBody, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/processes", c.baseURL)
|
||||
log.Printf("Sending POST request to %s", url)
|
||||
|
||||
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start process: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
log.Printf("Received response with status: %d", resp.StatusCode)
|
||||
|
||||
var result struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
|
||||
}
|
||||
|
||||
log.Printf("Successfully created process with ID: %s", result.ID)
|
||||
|
||||
process := &Process{
|
||||
ID: result.ID,
|
||||
GroupID: groupID,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.processes[process.ID] = process
|
||||
if groupID != "" {
|
||||
c.groups[groupID] = append(c.groups[groupID], process.ID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
return process, nil
|
||||
}
|
||||
|
||||
// GetProcess returns a process by ID
|
||||
func (c *Client) GetProcess(id string) (*Process, error) {
|
||||
c.mu.RLock()
|
||||
process, exists := c.processes[id]
|
||||
c.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
|
||||
return process, nil
|
||||
}
|
||||
|
||||
// GetGroupProcesses returns all processes in a group
|
||||
func (c *Client) GetGroupProcesses(groupID string) ([]*Process, error) {
|
||||
c.mu.RLock()
|
||||
processIDs, exists := c.groups[groupID]
|
||||
if !exists {
|
||||
c.mu.RUnlock()
|
||||
return nil, fmt.Errorf("group not found: %s", groupID)
|
||||
}
|
||||
|
||||
processes := make([]*Process, 0, len(processIDs))
|
||||
for _, pid := range processIDs {
|
||||
if process, exists := c.processes[pid]; exists {
|
||||
processes = append(processes, process)
|
||||
}
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
return processes, nil
|
||||
}
|
||||
|
||||
// StopProcess stops a single process
|
||||
func (c *Client) StopProcess(id string) error {
|
||||
c.mu.Lock()
|
||||
process, exists := c.processes[id]
|
||||
if !exists {
|
||||
c.mu.Unlock()
|
||||
return fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
|
||||
// Remove from group if it exists
|
||||
if process.GroupID != "" {
|
||||
groupProcesses := c.groups[process.GroupID]
|
||||
for i, pid := range groupProcesses {
|
||||
if pid == id {
|
||||
c.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(c.groups[process.GroupID]) == 0 {
|
||||
delete(c.groups, process.GroupID)
|
||||
}
|
||||
}
|
||||
|
||||
delete(c.processes, id)
|
||||
c.mu.Unlock()
|
||||
|
||||
req, err := http.NewRequest(
|
||||
"DELETE",
|
||||
fmt.Sprintf("%s/processes/%s", c.baseURL, id),
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stop process: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopGroup stops all processes in a group
|
||||
func (c *Client) StopGroup(groupID string) error {
|
||||
c.mu.Lock()
|
||||
processIDs, exists := c.groups[groupID]
|
||||
if !exists {
|
||||
c.mu.Unlock()
|
||||
return fmt.Errorf("group not found: %s", groupID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, pid := range processIDs {
|
||||
if err := c.StopProcess(pid); err != nil {
|
||||
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListGroups returns all group IDs
|
||||
func (c *Client) ListGroups() []string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
groups := make([]string, 0, len(c.groups))
|
||||
for groupID := range c.groups {
|
||||
groups = append(groups, groupID)
|
||||
}
|
||||
return groups
|
||||
}
|
||||
|
||||
// GetProcessIO returns io.Reader and io.Writer for a process
|
||||
func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) {
|
||||
log.Printf("Getting IO for process: %s", id)
|
||||
|
||||
process, err := c.GetProcess(id)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Parse the base URL to get the host
|
||||
baseURL, err := url.Parse(c.baseURL)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to parse base URL: %w", err)
|
||||
}
|
||||
|
||||
// Connect to WebSocket
|
||||
u := url.URL{
|
||||
Scheme: "ws",
|
||||
Host: baseURL.Host,
|
||||
Path: fmt.Sprintf("/ws/%s", process.ID),
|
||||
}
|
||||
|
||||
log.Printf("Connecting to WebSocket at: %s", u.String())
|
||||
|
||||
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("Successfully connected to WebSocket for process: %s", id)
|
||||
|
||||
// Create reader and writer
|
||||
reader := &websocketReader{conn: conn}
|
||||
writer := &websocketWriter{conn: conn}
|
||||
|
||||
return reader, writer, nil
|
||||
}
|
||||
|
||||
// websocketReader implements io.Reader for WebSocket
|
||||
type websocketReader struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
func (r *websocketReader) Read(p []byte) (n int, err error) {
|
||||
_, message, err := r.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n = copy(p, message)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// websocketWriter implements io.Writer for WebSocket
|
||||
type websocketWriter struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
func (w *websocketWriter) Write(p []byte) (n int, err error) {
|
||||
// Use BinaryMessage type for better compatibility
|
||||
err = w.conn.WriteMessage(websocket.BinaryMessage, p)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to write WebSocket message: %w", err)
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close closes all connections and stops all processes
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Stop all processes
|
||||
for id := range c.processes {
|
||||
if err := c.StopProcess(id); err != nil {
|
||||
return fmt.Errorf("failed to stop process %s: %w", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunProcess executes a command and returns its output
|
||||
func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
|
||||
log.Printf("Running one-time process: command=%s, args=%v", command, args)
|
||||
|
||||
req := struct {
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args"`
|
||||
Env []string `json:"env"`
|
||||
}{
|
||||
Command: command,
|
||||
Args: args,
|
||||
Env: env,
|
||||
}
|
||||
|
||||
reqBody, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal request: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/run", c.baseURL)
|
||||
log.Printf("Sending POST request to %s", url)
|
||||
|
||||
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to execute process: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
log.Printf("Received response with status: %d", resp.StatusCode)
|
||||
|
||||
var result struct {
|
||||
Output string `json:"output"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
|
||||
}
|
||||
|
||||
log.Printf("Successfully executed process with output length: %d", len(result.Output))
|
||||
return result.Output, nil
|
||||
}
|
||||
28
pkg/stdio/client_suite_test.go
Normal file
28
pkg/stdio/client_suite_test.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package stdio
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestSTDIOTransport(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "STDIOTransport test suite")
|
||||
}
|
||||
|
||||
var baseURL string
|
||||
|
||||
func init() {
|
||||
baseURL = os.Getenv("LOCALAGI_MCPBOX_URL")
|
||||
if baseURL == "" {
|
||||
baseURL = "http://localhost:8080"
|
||||
}
|
||||
}
|
||||
|
||||
var _ = AfterSuite(func() {
|
||||
client := NewClient(baseURL)
|
||||
client.StopGroup("test-group")
|
||||
})
|
||||
235
pkg/stdio/client_test.go
Normal file
235
pkg/stdio/client_test.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package stdio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
mcp "github.com/metoro-io/mcp-golang"
|
||||
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Client", func() {
|
||||
var (
|
||||
client *Client
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
client = NewClient(baseURL)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
if client != nil {
|
||||
Expect(client.Close()).To(Succeed())
|
||||
}
|
||||
})
|
||||
|
||||
Context("Process Management", func() {
|
||||
It("should create and stop a process", func() {
|
||||
ctx := context.Background()
|
||||
// Use a command that doesn't exit immediately
|
||||
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Hello, World!'; sleep 10"}, []string{}, "test-group")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process).NotTo(BeNil())
|
||||
Expect(process.ID).NotTo(BeEmpty())
|
||||
|
||||
// Get process IO
|
||||
reader, writer, err := client.GetProcessIO(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(reader).NotTo(BeNil())
|
||||
Expect(writer).NotTo(BeNil())
|
||||
|
||||
// Write to process
|
||||
_, err = writer.Write([]byte("test input\n"))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Read from process with timeout
|
||||
buf := make([]byte, 1024)
|
||||
readDone := make(chan struct{})
|
||||
var readErr error
|
||||
var readN int
|
||||
|
||||
go func() {
|
||||
readN, readErr = reader.Read(buf)
|
||||
close(readDone)
|
||||
}()
|
||||
|
||||
// Wait for read with timeout
|
||||
select {
|
||||
case <-readDone:
|
||||
Expect(readErr).NotTo(HaveOccurred())
|
||||
Expect(readN).To(BeNumerically(">", 0))
|
||||
Expect(string(buf[:readN])).To(ContainSubstring("Hello, World!"))
|
||||
case <-time.After(5 * time.Second):
|
||||
Fail("Timeout waiting for process output")
|
||||
}
|
||||
|
||||
// Stop the process
|
||||
err = client.StopProcess(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should manage process groups", func() {
|
||||
ctx := context.Background()
|
||||
groupID := "test-group"
|
||||
|
||||
// Create multiple processes in the same group
|
||||
process1, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 1'; sleep 1"}, []string{}, groupID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process1).NotTo(BeNil())
|
||||
|
||||
process2, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 2'; sleep 1"}, []string{}, groupID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process2).NotTo(BeNil())
|
||||
|
||||
// Get group processes
|
||||
processes, err := client.GetGroupProcesses(groupID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(processes).To(HaveLen(2))
|
||||
|
||||
// List groups
|
||||
groups := client.ListGroups()
|
||||
Expect(groups).To(ContainElement(groupID))
|
||||
|
||||
// Stop the group
|
||||
err = client.StopGroup(groupID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should run a one-time process", func() {
|
||||
ctx := context.Background()
|
||||
output, err := client.RunProcess(ctx, "echo", []string{"One-time process"}, []string{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(output).To(ContainSubstring("One-time process"))
|
||||
})
|
||||
|
||||
It("should handle process with environment variables", func() {
|
||||
ctx := context.Background()
|
||||
env := []string{"TEST_VAR=test_value"}
|
||||
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "env | grep TEST_VAR; sleep 1"}, env, "test-group")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process).NotTo(BeNil())
|
||||
|
||||
// Get process IO
|
||||
reader, _, err := client.GetProcessIO(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Read environment variables with timeout
|
||||
buf := make([]byte, 1024)
|
||||
readDone := make(chan struct{})
|
||||
var readErr error
|
||||
var readN int
|
||||
|
||||
go func() {
|
||||
readN, readErr = reader.Read(buf)
|
||||
close(readDone)
|
||||
}()
|
||||
|
||||
// Wait for read with timeout
|
||||
select {
|
||||
case <-readDone:
|
||||
Expect(readErr).NotTo(HaveOccurred())
|
||||
Expect(readN).To(BeNumerically(">", 0))
|
||||
Expect(string(buf[:readN])).To(ContainSubstring("TEST_VAR=test_value"))
|
||||
case <-time.After(5 * time.Second):
|
||||
Fail("Timeout waiting for process output")
|
||||
}
|
||||
|
||||
// Stop the process
|
||||
err = client.StopProcess(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should handle long-running processes", func() {
|
||||
ctx := context.Background()
|
||||
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Starting long process'; sleep 5"}, []string{}, "test-group")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process).NotTo(BeNil())
|
||||
|
||||
// Get process IO
|
||||
reader, _, err := client.GetProcessIO(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Read initial output
|
||||
buf := make([]byte, 1024)
|
||||
readDone := make(chan struct{})
|
||||
var readErr error
|
||||
var readN int
|
||||
|
||||
go func() {
|
||||
readN, readErr = reader.Read(buf)
|
||||
close(readDone)
|
||||
}()
|
||||
|
||||
// Wait for read with timeout
|
||||
select {
|
||||
case <-readDone:
|
||||
Expect(readErr).NotTo(HaveOccurred())
|
||||
Expect(readN).To(BeNumerically(">", 0))
|
||||
Expect(string(buf[:readN])).To(ContainSubstring("Starting long process"))
|
||||
case <-time.After(5 * time.Second):
|
||||
Fail("Timeout waiting for process output")
|
||||
}
|
||||
|
||||
// Wait a bit to ensure process is running
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Stop the process
|
||||
err = client.StopProcess(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("MCP", func() {
|
||||
ctx := context.Background()
|
||||
process, err := client.CreateProcess(ctx,
|
||||
"docker", []string{"run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"},
|
||||
[]string{"GITHUB_PERSONAL_ACCESS_TOKEN=test"}, "test-group")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(process).NotTo(BeNil())
|
||||
Expect(process.ID).NotTo(BeEmpty())
|
||||
|
||||
defer client.StopProcess(process.ID)
|
||||
|
||||
// MCP client
|
||||
|
||||
read, writer, err := client.GetProcessIO(process.ID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(read).NotTo(BeNil())
|
||||
Expect(writer).NotTo(BeNil())
|
||||
|
||||
transport := stdio.NewStdioServerTransportWithIO(read, writer)
|
||||
|
||||
// Create a new client
|
||||
mcpClient := mcp.NewClient(transport)
|
||||
// Initialize the client
|
||||
response, e := mcpClient.Initialize(ctx)
|
||||
Expect(e).NotTo(HaveOccurred())
|
||||
Expect(response).NotTo(BeNil())
|
||||
|
||||
Expect(mcpClient.Ping(ctx)).To(Succeed())
|
||||
|
||||
xlog.Debug("Client initialized: %v", response.Instructions)
|
||||
|
||||
alltools := []mcp.ToolRetType{}
|
||||
var cursor *string
|
||||
for {
|
||||
tools, err := mcpClient.ListTools(ctx, cursor)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(tools).NotTo(BeNil())
|
||||
Expect(tools.Tools).NotTo(BeEmpty())
|
||||
alltools = append(alltools, tools.Tools...)
|
||||
|
||||
if tools.NextCursor == nil {
|
||||
break // No more pages
|
||||
}
|
||||
cursor = tools.NextCursor
|
||||
}
|
||||
|
||||
for _, tool := range alltools {
|
||||
xlog.Debug("Tool: %v", tool)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
473
pkg/stdio/server.go
Normal file
473
pkg/stdio/server.go
Normal file
@@ -0,0 +1,473 @@
|
||||
package stdio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/mudler/LocalAGI/pkg/xlog"
|
||||
)
|
||||
|
||||
// Process represents a running process with its stdio streams
|
||||
type Process struct {
|
||||
ID string
|
||||
GroupID string
|
||||
Cmd *exec.Cmd
|
||||
Stdin io.WriteCloser
|
||||
Stdout io.ReadCloser
|
||||
Stderr io.ReadCloser
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// Server handles process management and stdio streaming
|
||||
type Server struct {
|
||||
processes map[string]*Process
|
||||
groups map[string][]string // maps group ID to process IDs
|
||||
mu sync.RWMutex
|
||||
upgrader websocket.Upgrader
|
||||
}
|
||||
|
||||
// NewServer creates a new stdio server
|
||||
func NewServer() *Server {
|
||||
return &Server{
|
||||
processes: make(map[string]*Process),
|
||||
groups: make(map[string][]string),
|
||||
upgrader: websocket.Upgrader{},
|
||||
}
|
||||
}
|
||||
|
||||
// StartProcess starts a new process and returns its ID
|
||||
func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) {
|
||||
xlog.Debug("Starting process", "command", command, "args", args, "groupID", groupID)
|
||||
|
||||
cmd := exec.CommandContext(ctx, command, args...)
|
||||
|
||||
if len(env) > 0 {
|
||||
cmd.Env = append(os.Environ(), env...)
|
||||
xlog.Debug("Process environment", "env", cmd.Env)
|
||||
}
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create stdin pipe: %w", err)
|
||||
}
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return "", fmt.Errorf("failed to start process: %w", err)
|
||||
}
|
||||
|
||||
process := &Process{
|
||||
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||
GroupID: groupID,
|
||||
Cmd: cmd,
|
||||
Stdin: stdin,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.processes[process.ID] = process
|
||||
if groupID != "" {
|
||||
s.groups[groupID] = append(s.groups[groupID], process.ID)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
xlog.Debug("Successfully started process", "id", process.ID, "pid", cmd.Process.Pid)
|
||||
return process.ID, nil
|
||||
}
|
||||
|
||||
// StopProcess stops a running process
|
||||
func (s *Server) StopProcess(id string) error {
|
||||
s.mu.Lock()
|
||||
process, exists := s.processes[id]
|
||||
if !exists {
|
||||
s.mu.Unlock()
|
||||
return fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
|
||||
xlog.Debug("Stopping process", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||
|
||||
// Remove from group if it exists
|
||||
if process.GroupID != "" {
|
||||
groupProcesses := s.groups[process.GroupID]
|
||||
for i, pid := range groupProcesses {
|
||||
if pid == id {
|
||||
s.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(s.groups[process.GroupID]) == 0 {
|
||||
delete(s.groups, process.GroupID)
|
||||
}
|
||||
}
|
||||
|
||||
delete(s.processes, id)
|
||||
s.mu.Unlock()
|
||||
|
||||
if err := process.Cmd.Process.Kill(); err != nil {
|
||||
xlog.Debug("Failed to kill process", "processID", id, "pid", process.Cmd.Process.Pid, "error", err)
|
||||
return fmt.Errorf("failed to kill process: %w", err)
|
||||
}
|
||||
|
||||
xlog.Debug("Successfully killed process", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopGroup stops all processes in a group
|
||||
func (s *Server) StopGroup(groupID string) error {
|
||||
s.mu.Lock()
|
||||
processIDs, exists := s.groups[groupID]
|
||||
if !exists {
|
||||
s.mu.Unlock()
|
||||
return fmt.Errorf("group not found: %s", groupID)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, pid := range processIDs {
|
||||
if err := s.StopProcess(pid); err != nil {
|
||||
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGroupProcesses returns all processes in a group
|
||||
func (s *Server) GetGroupProcesses(groupID string) ([]*Process, error) {
|
||||
s.mu.RLock()
|
||||
processIDs, exists := s.groups[groupID]
|
||||
if !exists {
|
||||
s.mu.RUnlock()
|
||||
return nil, fmt.Errorf("group not found: %s", groupID)
|
||||
}
|
||||
|
||||
processes := make([]*Process, 0, len(processIDs))
|
||||
for _, pid := range processIDs {
|
||||
if process, exists := s.processes[pid]; exists {
|
||||
processes = append(processes, process)
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
return processes, nil
|
||||
}
|
||||
|
||||
// ListGroups returns all group IDs
|
||||
func (s *Server) ListGroups() []string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
groups := make([]string, 0, len(s.groups))
|
||||
for groupID := range s.groups {
|
||||
groups = append(groups, groupID)
|
||||
}
|
||||
return groups
|
||||
}
|
||||
|
||||
// GetProcess returns a process by ID
|
||||
func (s *Server) GetProcess(id string) (*Process, error) {
|
||||
s.mu.RLock()
|
||||
process, exists := s.processes[id]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
|
||||
return process, nil
|
||||
}
|
||||
|
||||
// ListProcesses returns all running processes
|
||||
func (s *Server) ListProcesses() []*Process {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
processes := make([]*Process, 0, len(s.processes))
|
||||
for _, p := range s.processes {
|
||||
processes = append(processes, p)
|
||||
}
|
||||
|
||||
return processes
|
||||
}
|
||||
|
||||
// RunProcess executes a command and returns its output
|
||||
func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
|
||||
cmd := exec.CommandContext(ctx, command, args...)
|
||||
|
||||
if len(env) > 0 {
|
||||
cmd.Env = append(os.Environ(), env...)
|
||||
}
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return string(output), fmt.Errorf("process failed: %w", err)
|
||||
}
|
||||
|
||||
return string(output), nil
|
||||
}
|
||||
|
||||
// Start starts the HTTP server
|
||||
func (s *Server) Start(addr string) error {
|
||||
http.HandleFunc("/processes", s.handleProcesses)
|
||||
http.HandleFunc("/processes/", s.handleProcess)
|
||||
http.HandleFunc("/ws/", s.handleWebSocket)
|
||||
http.HandleFunc("/groups", s.handleGroups)
|
||||
http.HandleFunc("/groups/", s.handleGroup)
|
||||
http.HandleFunc("/run", s.handleRun)
|
||||
|
||||
return http.ListenAndServe(addr, nil)
|
||||
}
|
||||
|
||||
func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("Handling /processes request: method=%s", r.Method)
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
processes := s.ListProcesses()
|
||||
json.NewEncoder(w).Encode(processes)
|
||||
case http.MethodPost:
|
||||
var req struct {
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args"`
|
||||
Env []string `json:"env"`
|
||||
GroupID string `json:"group_id"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
id, err := s.StartProcess(context.Background(), req.Command, req.Args, req.Env, req.GroupID)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"id": id})
|
||||
default:
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Path[len("/processes/"):]
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
process, err := s.GetProcess(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(process)
|
||||
case http.MethodDelete:
|
||||
if err := s.StopProcess(id); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
default:
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Path[len("/ws/"):]
|
||||
xlog.Debug("Handling WebSocket connection", "processID", id)
|
||||
|
||||
process, err := s.GetProcess(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if process.Cmd.ProcessState != nil && process.Cmd.ProcessState.Exited() {
|
||||
xlog.Debug("Process already exited", "processID", id)
|
||||
http.Error(w, "Process already exited", http.StatusGone)
|
||||
return
|
||||
}
|
||||
|
||||
xlog.Debug("Process is running", "processID", id, "pid", process.Cmd.Process.Pid)
|
||||
|
||||
conn, err := s.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
xlog.Debug("WebSocket connection established", "processID", id)
|
||||
|
||||
// Create a done channel to signal process completion
|
||||
done := make(chan struct{})
|
||||
|
||||
// Handle stdin
|
||||
go func() {
|
||||
defer func() {
|
||||
select {
|
||||
case <-done:
|
||||
xlog.Debug("Process stdin handler done", "processID", id)
|
||||
default:
|
||||
xlog.Debug("WebSocket stdin connection closed", "processID", id)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
xlog.Debug("WebSocket stdin unexpected error", "processID", id, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
xlog.Debug("Received message", "processID", id, "message", string(message))
|
||||
if _, err := process.Stdin.Write(message); err != nil {
|
||||
if err != io.EOF {
|
||||
xlog.Debug("WebSocket stdin write error", "processID", id, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
xlog.Debug("Message sent to process", "processID", id, "message", string(message))
|
||||
}
|
||||
}()
|
||||
|
||||
// Handle stdout and stderr
|
||||
go func() {
|
||||
defer func() {
|
||||
select {
|
||||
case <-done:
|
||||
xlog.Debug("Process output handler done", "processID", id)
|
||||
default:
|
||||
xlog.Debug("WebSocket output connection closed", "processID", id)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a buffer for reading
|
||||
buf := make([]byte, 4096)
|
||||
reader := io.MultiReader(process.Stdout, process.Stderr)
|
||||
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
xlog.Debug("Read error", "processID", id, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
xlog.Debug("Sending message", "processID", id, "size", n)
|
||||
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
xlog.Debug("WebSocket output write error", "processID", id, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
xlog.Debug("Message sent to client", "processID", id, "size", n)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for process to exit
|
||||
xlog.Debug("Waiting for process to exit", "processID", id)
|
||||
err = process.Cmd.Wait()
|
||||
close(done) // Signal that the process is done
|
||||
|
||||
if err != nil {
|
||||
xlog.Debug("Process exited with error",
|
||||
"processID", id,
|
||||
"pid", process.Cmd.Process.Pid,
|
||||
"error", err)
|
||||
} else {
|
||||
xlog.Debug("Process exited successfully",
|
||||
"processID", id,
|
||||
"pid", process.Cmd.Process.Pid)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new handlers for group management
|
||||
func (s *Server) handleGroups(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
groups := s.ListGroups()
|
||||
json.NewEncoder(w).Encode(groups)
|
||||
default:
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) {
|
||||
groupID := r.URL.Path[len("/groups/"):]
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
processes, err := s.GetGroupProcesses(groupID)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(processes)
|
||||
case http.MethodDelete:
|
||||
if err := s.StopGroup(groupID); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
default:
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Handling /run request")
|
||||
|
||||
var req struct {
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args"`
|
||||
Env []string `json:"env"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args)
|
||||
|
||||
output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("One-time process completed with output length: %d", len(output))
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"output": output,
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user