Compare commits

..

2 Commits

Author SHA1 Message Date
mudler
47492f890f chore(tests): add timeout 2025-04-22 21:46:25 +02:00
mudler
ddac344147 chore: better defaults for parallel jobs
Signed-off-by: mudler <mudler@localai.io>
2025-04-22 21:28:00 +02:00
28 changed files with 98 additions and 1729 deletions

View File

@@ -84,77 +84,3 @@ jobs:
#tags: ${{ steps.prep.outputs.tags }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
mcpbox-build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Prepare
id: prep
run: |
DOCKER_IMAGE=quay.io/mudler/localagi-mcpbox
# Use branch name as default
VERSION=${GITHUB_REF#refs/heads/}
BINARY_VERSION=$(git describe --always --tags --dirty)
SHORTREF=${GITHUB_SHA::8}
# If this is git tag, use the tag name as a docker tag
if [[ $GITHUB_REF == refs/tags/* ]]; then
VERSION=${GITHUB_REF#refs/tags/}
fi
TAGS="${DOCKER_IMAGE}:${VERSION},${DOCKER_IMAGE}:${SHORTREF}"
# If the VERSION looks like a version number, assume that
# this is the most recent version of the image and also
# tag it 'latest'.
if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
fi
# Set output parameters.
echo ::set-output name=binary_version::${BINARY_VERSION}
echo ::set-output name=tags::${TAGS}
echo ::set-output name=docker_image::${DOCKER_IMAGE}
- name: Set up QEMU
uses: docker/setup-qemu-action@master
with:
platforms: all
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@master
- name: Login to DockerHub
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@902fa8ec7d6ecbf8d84d538b9b233a880e428804
with:
images: quay.io/mudler/localagi-mcpbox
tags: |
type=ref,event=branch,suffix=-{{date 'YYYYMMDDHHmmss'}}
type=semver,pattern={{raw}}
type=sha,suffix=-{{date 'YYYYMMDDHHmmss'}}
type=ref,event=branch
flavor: |
latest=auto
prefix=
suffix=
- name: Build
uses: docker/build-push-action@v6
with:
builder: ${{ steps.buildx.outputs.name }}
build-args: |
VERSION=${{ steps.prep.outputs.binary_version }}
context: ./
file: ./Dockerfile.mcpbox
#platforms: linux/amd64,linux/arm64
platforms: linux/amd64
push: true
#tags: ${{ steps.prep.outputs.tags }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v2
- run: |
# Add Docker's official GPG key:
sudo apt-get update

View File

@@ -1,47 +0,0 @@
# Build stage
FROM golang:1.24-alpine AS builder
# Install build dependencies
RUN apk add --no-cache git
# Set working directory
WORKDIR /app
# Copy go mod files
COPY go.mod go.sum ./
# Download dependencies
RUN go mod download
# Copy source code
COPY . .
# Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -o mcpbox ./cmd/mcpbox
# Final stage
FROM ubuntu:22.04
# Install runtime dependencies
RUN apt-get update && apt-get install -y ca-certificates tzdata docker.io bash
# Create non-root user
#RUN adduser -D -g '' appuser
# Set working directory
WORKDIR /app
# Copy binary from builder
COPY --from=builder /app/mcpbox .
# Use non-root user
#USER appuser
# Expose port
EXPOSE 8080
# Set entrypoint
ENTRYPOINT ["/app/mcpbox"]
# Default command
CMD ["-addr", ":8080"]

View File

@@ -1,17 +1,15 @@
GOCMD?=go
IMAGE_NAME?=webui
MCPBOX_IMAGE_NAME?=mcpbox
ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
prepare-tests: build-mcpbox
prepare-tests:
docker compose up -d --build
docker run -d -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 --rm -ti $(MCPBOX_IMAGE_NAME)
cleanup-tests:
docker compose down
tests: prepare-tests
LOCALAGI_MCPBOX_URL="http://localhost:9090" LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./...
LOCALAGI_MODEL="gemma-3-12b-it-qat" LOCALAI_API_URL="http://localhost:8081" LOCALAGI_API_URL="http://localhost:8080" $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --fail-fast -v -r ./...
run-nokb:
$(MAKE) run KBDISABLEINDEX=true
@@ -25,16 +23,10 @@ build: webui/react-ui/dist
.PHONY: run
run: webui/react-ui/dist
LOCALAGI_MCPBOX_URL="http://localhost:9090" $(GOCMD) run ./
$(GOCMD) run ./
build-image:
docker build -t $(IMAGE_NAME) -f Dockerfile.webui .
image-push:
docker push $(IMAGE_NAME)
build-mcpbox:
docker build -t $(MCPBOX_IMAGE_NAME) -f Dockerfile.mcpbox .
run-mcpbox:
docker run -v /var/run/docker.sock:/var/run/docker.sock --privileged -p 9090:8080 -ti mcpbox

View File

@@ -2,7 +2,7 @@
<img src="./webui/react-ui/public/logo_1.png" alt="LocalAGI Logo" width="220"/>
</p>
<h3 align="center"><em>Your AI. Your Hardware. Your Rules</em></h3>
<h3 align="center"><em>Your AI. Your Hardware. Your Rules.</em></h3>
<div align="center">
@@ -13,9 +13,9 @@
</div>
Create customizable AI assistants, automations, chat bots and agents that run 100% locally. No need for agentic Python libraries or cloud service keys, just bring your GPU (or even just CPU) and a web browser.
We empower you building AI Agents that you can run locally, without coding.
**LocalAGI** is a powerful, self-hostable AI Agent platform that allows you to design AI automations without writing code. A complete drop-in replacement for OpenAI's Responses APIs with advanced agentic capabilities. No clouds. No data leaks. Just pure local AI that works on consumer-grade hardware (CPU and GPU).
**LocalAGI** is a powerful, self-hostable AI Agent platform designed for maximum privacy and flexibility. A complete drop-in replacement for OpenAI's Responses APIs with advanced agentic capabilities. No clouds. No data leaks. Just pure local AI that works on consumer-grade hardware (CPU and GPU).
## 🛡️ Take Back Your Privacy
@@ -37,7 +37,6 @@ LocalAGI ensures your data stays exactly where you want it—on your hardware. N
- 🖼 **Multimodal Support**: Ready for vision, text, and more.
- 🔧 **Extensible Custom Actions**: Easily script dynamic agent behaviors in Go (interpreted, no compilation!).
- 🛠 **Fully Customizable Models**: Use your own models or integrate seamlessly with [LocalAI](https://github.com/mudler/LocalAI).
- 📊 **Observability**: Monitor agent status and view detailed observable updates in real-time.
## 🛠️ Quickstart
@@ -195,8 +194,6 @@ LocalAGI is part of the powerful Local family of privacy-focused AI tools:
![Web UI Dashboard](https://github.com/user-attachments/assets/a40194f9-af3a-461f-8b39-5f4612fbf221)
![Web UI Agent Settings](https://github.com/user-attachments/assets/fb3c3e2a-cd53-4ca8-97aa-c5da51ff1f83)
![Web UI Create Group](https://github.com/user-attachments/assets/102189a2-0fba-4a1e-b0cb-f99268ef8062)
![Web UI Agent Observability](https://github.com/user-attachments/assets/f7359048-9d28-4cf1-9151-1f5556ce9235)
### Connectors Ready-to-Go

View File

@@ -1,38 +0,0 @@
package main
import (
"flag"
"log"
"os"
"os/signal"
"syscall"
"github.com/mudler/LocalAGI/pkg/stdio"
)
func main() {
// Parse command line flags
addr := flag.String("addr", ":8080", "HTTP server address")
flag.Parse()
// Create and start the server
server := stdio.NewServer()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("Starting server on %s", *addr)
if err := server.Start(*addr); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}()
// Wait for shutdown signal
<-sigChan
log.Println("Shutting down server...")
// TODO: Implement graceful shutdown if needed
os.Exit(0)
}

View File

@@ -241,7 +241,6 @@ func (a *Agent) Stop() {
a.Lock()
defer a.Unlock()
xlog.Debug("Stopping agent", "agent", a.Character.Name)
a.closeMCPSTDIOServers()
a.context.Cancel()
}

View File

@@ -3,14 +3,12 @@ package agent
import (
"context"
"encoding/json"
"errors"
mcp "github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/http"
stdioTransport "github.com/metoro-io/mcp-golang/transport/stdio"
"github.com/mudler/LocalAGI/core/types"
"github.com/mudler/LocalAGI/pkg/stdio"
"github.com/mudler/LocalAGI/pkg/xlog"
"github.com/sashabaranov/go-openai/jsonschema"
)
@@ -21,12 +19,6 @@ type MCPServer struct {
Token string `json:"token"`
}
type MCPSTDIOServer struct {
Args []string `json:"args"`
Env []string `json:"env"`
Cmd string `json:"cmd"`
}
type mcpAction struct {
mcpClient *mcp.Client
inputSchema ToolInputSchema
@@ -87,68 +79,6 @@ type ToolInputSchema struct {
Required []string `json:"required,omitempty"`
}
func (a *Agent) addTools(client *mcp.Client) (types.Actions, error) {
var generatedActions types.Actions
xlog.Debug("Initializing client")
// Initialize the client
response, e := client.Initialize(a.context)
if e != nil {
xlog.Error("Failed to initialize client", "error", e.Error())
return nil, e
}
xlog.Debug("Client initialized: %v", response.Instructions)
var cursor *string
for {
tools, err := client.ListTools(a.context, cursor)
if err != nil {
xlog.Error("Failed to list tools", "error", err.Error())
return nil, err
}
for _, t := range tools.Tools {
desc := ""
if t.Description != nil {
desc = *t.Description
}
xlog.Debug("Tool", "name", t.Name, "description", desc)
dat, err := json.Marshal(t.InputSchema)
if err != nil {
xlog.Error("Failed to marshal input schema", "error", err.Error())
}
xlog.Debug("Input schema", "tool", t.Name, "schema", string(dat))
// XXX: This is a wild guess, to verify (data types might be incompatible)
var inputSchema ToolInputSchema
err = json.Unmarshal(dat, &inputSchema)
if err != nil {
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
}
// Create a new action with Client + tool
generatedActions = append(generatedActions, &mcpAction{
mcpClient: client,
toolName: t.Name,
inputSchema: inputSchema,
toolDescription: desc,
})
}
if tools.NextCursor == nil {
break // No more pages
}
cursor = tools.NextCursor
}
return generatedActions, nil
}
func (a *Agent) initMCPActions() error {
a.mcpActions = nil
@@ -156,7 +86,6 @@ func (a *Agent) initMCPActions() error {
generatedActions := types.Actions{}
// MCP HTTP Servers
for _, mcpServer := range a.options.mcpServers {
transport := http.NewHTTPClientTransport("/mcp")
transport.WithBaseURL(mcpServer.URL)
@@ -166,60 +95,70 @@ func (a *Agent) initMCPActions() error {
// Create a new client
client := mcp.NewClient(transport)
xlog.Debug("Adding tools for MCP server", "server", mcpServer)
actions, err := a.addTools(client)
if err != nil {
xlog.Error("Failed to add tools for MCP server", "server", mcpServer, "error", err.Error())
}
generatedActions = append(generatedActions, actions...)
}
// MCP STDIO Servers
a.closeMCPSTDIOServers() // Make sure we stop all previous servers if any is active
if a.options.mcpPrepareScript != "" {
xlog.Debug("Preparing MCP box", "script", a.options.mcpPrepareScript)
client := stdio.NewClient(a.options.mcpBoxURL)
client.RunProcess(a.context, "/bin/bash", []string{"-c", a.options.mcpPrepareScript}, []string{})
}
for _, mcpStdioServer := range a.options.mcpStdioServers {
client := stdio.NewClient(a.options.mcpBoxURL)
p, err := client.CreateProcess(a.context,
mcpStdioServer.Cmd,
mcpStdioServer.Args,
mcpStdioServer.Env,
a.Character.Name)
if err != nil {
xlog.Error("Failed to create process", "error", err.Error())
continue
}
read, writer, err := client.GetProcessIO(p.ID)
if err != nil {
xlog.Error("Failed to get process IO", "error", err.Error())
xlog.Debug("Initializing client", "server", mcpServer.URL)
// Initialize the client
response, e := client.Initialize(a.context)
if e != nil {
xlog.Error("Failed to initialize client", "error", e.Error(), "server", mcpServer)
if err == nil {
err = e
} else {
err = errors.Join(err, e)
}
continue
}
transport := stdioTransport.NewStdioServerTransportWithIO(read, writer)
xlog.Debug("Client initialized: %v", response.Instructions)
// Create a new client
mcpClient := mcp.NewClient(transport)
var cursor *string
for {
tools, err := client.ListTools(a.context, cursor)
if err != nil {
xlog.Error("Failed to list tools", "error", err.Error())
return err
}
xlog.Debug("Adding tools for MCP server (stdio)", "server", mcpStdioServer)
actions, err := a.addTools(mcpClient)
if err != nil {
xlog.Error("Failed to add tools for MCP server", "server", mcpStdioServer, "error", err.Error())
for _, t := range tools.Tools {
desc := ""
if t.Description != nil {
desc = *t.Description
}
xlog.Debug("Tool", "mcpServer", mcpServer, "name", t.Name, "description", desc)
dat, err := json.Marshal(t.InputSchema)
if err != nil {
xlog.Error("Failed to marshal input schema", "error", err.Error())
}
xlog.Debug("Input schema", "mcpServer", mcpServer, "tool", t.Name, "schema", string(dat))
// XXX: This is a wild guess, to verify (data types might be incompatible)
var inputSchema ToolInputSchema
err = json.Unmarshal(dat, &inputSchema)
if err != nil {
xlog.Error("Failed to unmarshal input schema", "error", err.Error())
}
// Create a new action with Client + tool
generatedActions = append(generatedActions, &mcpAction{
mcpClient: client,
toolName: t.Name,
inputSchema: inputSchema,
toolDescription: desc,
})
}
if tools.NextCursor == nil {
break // No more pages
}
cursor = tools.NextCursor
}
generatedActions = append(generatedActions, actions...)
}
a.mcpActions = generatedActions
return err
}
func (a *Agent) closeMCPSTDIOServers() {
client := stdio.NewClient(a.options.mcpBoxURL)
client.StopGroup(a.Character.Name)
}

View File

@@ -36,8 +36,7 @@ func NewSSEObserver(agent string, manager sse.Manager) *SSEObserver {
}
func (s *SSEObserver) NewObservable() *types.Observable {
id := atomic.AddInt32(&s.maxID, 1)
id := atomic.AddInt32(&s.maxID, 1)
return &types.Observable{
ID: id - 1,
Agent: s.agent,

View File

@@ -50,10 +50,8 @@ type options struct {
conversationsPath string
mcpServers []MCPServer
mcpStdioServers []MCPSTDIOServer
mcpBoxURL string
mcpPrepareScript string
mcpServers []MCPServer
newConversationsSubscribers []func(openai.ChatCompletionMessage)
observer Observer
@@ -209,27 +207,6 @@ func WithMCPServers(servers ...MCPServer) Option {
}
}
func WithMCPSTDIOServers(servers ...MCPSTDIOServer) Option {
return func(o *options) error {
o.mcpStdioServers = servers
return nil
}
}
func WithMCPBoxURL(url string) Option {
return func(o *options) error {
o.mcpBoxURL = url
return nil
}
}
func WithMCPPrepareScript(script string) Option {
return func(o *options) error {
o.mcpPrepareScript = script
return nil
}
}
func WithLLMAPIURL(url string) Option {
return func(o *options) error {
o.LLMAPI.APIURL = url

View File

@@ -2,8 +2,6 @@ package state
import (
"encoding/json"
"fmt"
"strings"
"github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/types"
@@ -32,13 +30,10 @@ func (d DynamicPromptsConfig) ToMap() map[string]string {
}
type AgentConfig struct {
Connector []ConnectorConfig `json:"connectors" form:"connectors" `
Actions []ActionsConfig `json:"actions" form:"actions"`
DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"`
MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"`
MCPSTDIOServers []agent.MCPSTDIOServer `json:"mcp_stdio_servers" form:"mcp_stdio_servers"`
MCPPrepareScript string `json:"mcp_prepare_script" form:"mcp_prepare_script"`
MCPBoxURL string `json:"mcp_box_url" form:"mcp_box_url"`
Connector []ConnectorConfig `json:"connectors" form:"connectors" `
Actions []ActionsConfig `json:"actions" form:"actions"`
DynamicPrompts []DynamicPromptsConfig `json:"dynamic_prompts" form:"dynamic_prompts"`
MCPServers []agent.MCPServer `json:"mcp_servers" form:"mcp_servers"`
Description string `json:"description" form:"description"`
@@ -276,22 +271,6 @@ func NewAgentConfigMeta(
HelpText: "Number of concurrent tasks that can run in parallel",
Tags: config.Tags{Section: "AdvancedSettings"},
},
{
Name: "mcp_stdio_servers",
Label: "MCP STDIO Servers",
Type: "textarea",
DefaultValue: "",
HelpText: "JSON configuration for MCP STDIO servers",
Tags: config.Tags{Section: "AdvancedSettings"},
},
{
Name: "mcp_prepare_script",
Label: "MCP Prepare Script",
Type: "textarea",
DefaultValue: "",
HelpText: "Script to prepare the MCP box",
Tags: config.Tags{Section: "AdvancedSettings"},
},
},
MCPServers: []config.Field{
{
@@ -318,148 +297,3 @@ type Connector interface {
AgentReasoningCallback() func(state types.ActionCurrentState) bool
Start(a *agent.Agent)
}
// UnmarshalJSON implements json.Unmarshaler for AgentConfig
func (a *AgentConfig) UnmarshalJSON(data []byte) error {
// Create a temporary type to avoid infinite recursion
type Alias AgentConfig
aux := &struct {
*Alias
MCPSTDIOServersConfig interface{} `json:"mcp_stdio_servers"`
}{
Alias: (*Alias)(a),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
// Handle MCP STDIO servers configuration
if aux.MCPSTDIOServersConfig != nil {
switch v := aux.MCPSTDIOServersConfig.(type) {
case string:
// Parse string configuration
var mcpConfig struct {
MCPServers map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
} `json:"mcpServers"`
}
if err := json.Unmarshal([]byte(v), &mcpConfig); err != nil {
return fmt.Errorf("failed to parse MCP STDIO servers configuration: %w", err)
}
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(mcpConfig.MCPServers))
for _, server := range mcpConfig.MCPServers {
// Convert env map to slice of "KEY=VALUE" strings
envSlice := make([]string, 0, len(server.Env))
for k, v := range server.Env {
envSlice = append(envSlice, fmt.Sprintf("%s=%s", k, v))
}
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
Cmd: server.Command,
Args: server.Args,
Env: envSlice,
})
}
case []interface{}:
// Parse array configuration
a.MCPSTDIOServers = make([]agent.MCPSTDIOServer, 0, len(v))
for _, server := range v {
serverMap, ok := server.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid server configuration format")
}
cmd, _ := serverMap["cmd"].(string)
args := make([]string, 0)
if argsInterface, ok := serverMap["args"].([]interface{}); ok {
for _, arg := range argsInterface {
if argStr, ok := arg.(string); ok {
args = append(args, argStr)
}
}
}
env := make([]string, 0)
if envInterface, ok := serverMap["env"].([]interface{}); ok {
for _, e := range envInterface {
if envStr, ok := e.(string); ok {
env = append(env, envStr)
}
}
}
a.MCPSTDIOServers = append(a.MCPSTDIOServers, agent.MCPSTDIOServer{
Cmd: cmd,
Args: args,
Env: env,
})
}
}
}
return nil
}
// MarshalJSON implements json.Marshaler for AgentConfig
func (a *AgentConfig) MarshalJSON() ([]byte, error) {
// Create a temporary type to avoid infinite recursion
type Alias AgentConfig
aux := &struct {
*Alias
MCPSTDIOServersConfig string `json:"mcp_stdio_servers,omitempty"`
}{
Alias: (*Alias)(a),
}
// Convert MCPSTDIOServers back to the expected JSON format
if len(a.MCPSTDIOServers) > 0 {
mcpConfig := struct {
MCPServers map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
} `json:"mcpServers"`
}{
MCPServers: make(map[string]struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}),
}
// Convert each MCPSTDIOServer to the expected format
for i, server := range a.MCPSTDIOServers {
// Convert env slice back to map
envMap := make(map[string]string)
for _, env := range server.Env {
if parts := strings.SplitN(env, "=", 2); len(parts) == 2 {
envMap[parts[0]] = parts[1]
}
}
mcpConfig.MCPServers[fmt.Sprintf("server%d", i)] = struct {
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}{
Command: server.Cmd,
Args: server.Args,
Env: envMap,
}
}
// Marshal the MCP config to JSON string
mcpConfigJSON, err := json.Marshal(mcpConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal MCP STDIO servers configuration: %w", err)
}
aux.MCPSTDIOServersConfig = string(mcpConfigJSON)
}
return json.Marshal(aux)
}

View File

@@ -33,7 +33,6 @@ type AgentPool struct {
managers map[string]sse.Manager
agentStatus map[string]*Status
apiURL, defaultModel, defaultMultimodalModel string
mcpBoxURL string
imageModel, localRAGAPI, localRAGKey, apiKey string
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action
connectors func(*AgentConfig) []Connector
@@ -73,7 +72,7 @@ func loadPoolFromFile(path string) (*AgentPoolData, error) {
}
func NewAgentPool(
defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory, mcpBoxURL string,
defaultModel, defaultMultimodalModel, imageModel, apiURL, apiKey, directory string,
LocalRAGAPI string,
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action,
connectors func(*AgentConfig) []Connector,
@@ -99,7 +98,6 @@ func NewAgentPool(
apiURL: apiURL,
defaultModel: defaultModel,
defaultMultimodalModel: defaultMultimodalModel,
mcpBoxURL: mcpBoxURL,
imageModel: imageModel,
localRAGAPI: LocalRAGAPI,
apiKey: apiKey,
@@ -125,7 +123,6 @@ func NewAgentPool(
pooldir: directory,
defaultModel: defaultModel,
defaultMultimodalModel: defaultMultimodalModel,
mcpBoxURL: mcpBoxURL,
imageModel: imageModel,
apiKey: apiKey,
agents: make(map[string]*Agent),
@@ -169,56 +166,7 @@ func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error {
}
}(a.pool[name])
return a.startAgentWithConfig(name, agentConfig, nil)
}
func (a *AgentPool) RecreateAgent(name string, agentConfig *AgentConfig) error {
a.Lock()
defer a.Unlock()
oldAgent := a.agents[name]
var o *types.Observable
obs := oldAgent.Observer()
if obs != nil {
o = obs.NewObservable()
o.Name = "Restarting Agent"
o.Icon = "sync"
o.Creation = &types.Creation{}
obs.Update(*o)
}
stateFile, characterFile := a.stateFiles(name)
os.Remove(stateFile)
os.Remove(characterFile)
oldAgent.Stop()
a.pool[name] = *agentConfig
delete(a.agents, name)
if err := a.save(); err != nil {
if obs != nil {
o.Completion = &types.Completion{Error: err.Error()}
obs.Update(*o)
}
return err
}
if err := a.startAgentWithConfig(name, agentConfig, obs); err != nil {
if obs != nil {
o.Completion = &types.Completion{Error: err.Error()}
obs.Update(*o)
}
return err
}
if obs != nil {
o.Completion = &types.Completion{}
obs.Update(*o)
}
return nil
return a.startAgentWithConfig(name, agentConfig)
}
func createAgentAvatar(APIURL, APIKey, model, imageModel, avatarDir string, agent AgentConfig) error {
@@ -320,13 +268,8 @@ func (a *AgentPool) GetStatusHistory(name string) *Status {
return a.agentStatus[name]
}
func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs Observer) error {
var manager sse.Manager
if m, ok := a.managers[name]; ok {
manager = m
} else {
manager = sse.NewManager(5)
}
func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error {
manager := sse.NewManager(5)
ctx := context.Background()
model := a.defaultModel
multimodalModel := a.defaultMultimodalModel
@@ -339,10 +282,6 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
model = config.Model
}
if config.MCPBoxURL != "" {
a.mcpBoxURL = config.MCPBoxURL
}
if config.PeriodicRuns == "" {
config.PeriodicRuns = "10m"
}
@@ -392,10 +331,6 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
// dynamicPrompts = append(dynamicPrompts, p.ToMap())
// }
if obs == nil {
obs = NewSSEObserver(name, manager)
}
opts := []Option{
WithModel(model),
WithLLMAPIURL(a.apiURL),
@@ -403,10 +338,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
WithMCPServers(config.MCPServers...),
WithPeriodicRuns(config.PeriodicRuns),
WithPermanentGoal(config.PermanentGoal),
WithMCPSTDIOServers(config.MCPSTDIOServers...),
WithMCPBoxURL(a.mcpBoxURL),
WithPrompts(promptBlocks...),
WithMCPPrepareScript(config.MCPPrepareScript),
// WithDynamicPrompts(dynamicPrompts...),
WithCharacter(Character{
Name: name,
@@ -475,7 +407,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
c.AgentResultCallback()(state)
}
}),
WithObserver(obs),
WithObserver(NewSSEObserver(name, manager)),
}
if config.HUD {
@@ -582,7 +514,7 @@ func (a *AgentPool) StartAll() error {
if a.agents[name] != nil { // Agent already started
continue
}
if err := a.startAgentWithConfig(name, &config, nil); err != nil {
if err := a.startAgentWithConfig(name, &config); err != nil {
xlog.Error("Failed to start agent", "name", name, "error", err)
}
}
@@ -620,7 +552,7 @@ func (a *AgentPool) Start(name string) error {
return nil
}
if config, ok := a.pool[name]; ok {
return a.startAgentWithConfig(name, &config, nil)
return a.startAgentWithConfig(name, &config)
}
return fmt.Errorf("agent %s not found", name)

View File

@@ -12,11 +12,6 @@ services:
- /dev/dri/card1
- /dev/dri/renderD129
mcpbox:
extends:
file: docker-compose.yaml
service: mcpbox
localrecall:
extends:
file: docker-compose.yaml

View File

@@ -17,11 +17,6 @@ services:
count: 1
capabilities: [gpu]
mcpbox:
extends:
file: docker-compose.yaml
service: mcpbox
localrecall:
extends:
file: docker-compose.yaml
@@ -35,4 +30,4 @@ services:
localagi:
extends:
file: docker-compose.yaml
service: localagi
service: localagi

View File

@@ -46,30 +46,12 @@ services:
image: busybox
command: ["sh", "-c", "until wget -q -O - http://localrecall:8080 > /dev/null 2>&1; do echo 'Waiting for localrecall...'; sleep 1; done; echo 'localrecall is up!'"]
mcpbox:
build:
context: .
dockerfile: Dockerfile.mcpbox
ports:
- "8080"
volumes:
- ./volumes/mcpbox:/app/data
# share docker socket if you want it to be able to run docker commands
- /var/run/docker.sock:/var/run/docker.sock
healthcheck:
test: ["CMD", "wget", "-q", "-O", "-", "http://localhost:8080/processes"]
interval: 30s
timeout: 10s
retries: 3
localagi:
depends_on:
localai:
condition: service_healthy
localrecall-healthcheck:
condition: service_completed_successfully
mcpbox:
condition: service_healthy
build:
context: .
dockerfile: Dockerfile.webui
@@ -86,7 +68,6 @@ services:
- LOCALAGI_STATE_DIR=/pool
- LOCALAGI_TIMEOUT=5m
- LOCALAGI_ENABLE_CONVERSATIONS_LOGGING=false
- LOCALAGI_MCPBOX_URL=http://mcpbox:8080
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:

View File

@@ -23,7 +23,6 @@ var apiKeysEnv = os.Getenv("LOCALAGI_API_KEYS")
var imageModel = os.Getenv("LOCALAGI_IMAGE_MODEL")
var conversationDuration = os.Getenv("LOCALAGI_CONVERSATION_DURATION")
var localOperatorBaseURL = os.Getenv("LOCALOPERATOR_BASE_URL")
var mcpboxURL = os.Getenv("LOCALAGI_MCPBOX_URL")
func init() {
if baseModel == "" {
@@ -62,7 +61,6 @@ func main() {
apiURL,
apiKey,
stateDir,
mcpboxURL,
localRAG,
services.Actions(map[string]string{
"browser-agent-runner-base-url": localOperatorBaseURL,

View File

@@ -1,4 +1,4 @@
package localoperator
package api
import (
"bytes"

View File

@@ -1,325 +0,0 @@
package stdio
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Client implements the transport.Interface for stdio processes
type Client struct {
baseURL string
processes map[string]*Process
groups map[string][]string
mu sync.RWMutex
}
// NewClient creates a new stdio transport client
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
processes: make(map[string]*Process),
groups: make(map[string][]string),
}
}
// CreateProcess starts a new process in a group
func (c *Client) CreateProcess(ctx context.Context, command string, args []string, env []string, groupID string) (*Process, error) {
log.Printf("Creating process: command=%s, args=%v, groupID=%s", command, args, groupID)
req := struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
GroupID string `json:"group_id"`
}{
Command: command,
Args: args,
Env: env,
GroupID: groupID,
}
reqBody, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
url := fmt.Sprintf("%s/processes", c.baseURL)
log.Printf("Sending POST request to %s", url)
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
if err != nil {
return nil, fmt.Errorf("failed to start process: %w", err)
}
defer resp.Body.Close()
log.Printf("Received response with status: %d", resp.StatusCode)
var result struct {
ID string `json:"id"`
}
body, _ := io.ReadAll(resp.Body)
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
}
log.Printf("Successfully created process with ID: %s", result.ID)
process := &Process{
ID: result.ID,
GroupID: groupID,
CreatedAt: time.Now(),
}
c.mu.Lock()
c.processes[process.ID] = process
if groupID != "" {
c.groups[groupID] = append(c.groups[groupID], process.ID)
}
c.mu.Unlock()
return process, nil
}
// GetProcess returns a process by ID
func (c *Client) GetProcess(id string) (*Process, error) {
c.mu.RLock()
process, exists := c.processes[id]
c.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("process not found: %s", id)
}
return process, nil
}
// GetGroupProcesses returns all processes in a group
func (c *Client) GetGroupProcesses(groupID string) ([]*Process, error) {
c.mu.RLock()
processIDs, exists := c.groups[groupID]
if !exists {
c.mu.RUnlock()
return nil, fmt.Errorf("group not found: %s", groupID)
}
processes := make([]*Process, 0, len(processIDs))
for _, pid := range processIDs {
if process, exists := c.processes[pid]; exists {
processes = append(processes, process)
}
}
c.mu.RUnlock()
return processes, nil
}
// StopProcess stops a single process
func (c *Client) StopProcess(id string) error {
c.mu.Lock()
process, exists := c.processes[id]
if !exists {
c.mu.Unlock()
return fmt.Errorf("process not found: %s", id)
}
// Remove from group if it exists
if process.GroupID != "" {
groupProcesses := c.groups[process.GroupID]
for i, pid := range groupProcesses {
if pid == id {
c.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
break
}
}
if len(c.groups[process.GroupID]) == 0 {
delete(c.groups, process.GroupID)
}
}
delete(c.processes, id)
c.mu.Unlock()
req, err := http.NewRequest(
"DELETE",
fmt.Sprintf("%s/processes/%s", c.baseURL, id),
nil,
)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to stop process: %w", err)
}
resp.Body.Close()
return nil
}
// StopGroup stops all processes in a group
func (c *Client) StopGroup(groupID string) error {
c.mu.Lock()
processIDs, exists := c.groups[groupID]
if !exists {
c.mu.Unlock()
return fmt.Errorf("group not found: %s", groupID)
}
c.mu.Unlock()
for _, pid := range processIDs {
if err := c.StopProcess(pid); err != nil {
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
}
}
return nil
}
// ListGroups returns all group IDs
func (c *Client) ListGroups() []string {
c.mu.RLock()
defer c.mu.RUnlock()
groups := make([]string, 0, len(c.groups))
for groupID := range c.groups {
groups = append(groups, groupID)
}
return groups
}
// GetProcessIO returns io.Reader and io.Writer for a process
func (c *Client) GetProcessIO(id string) (io.Reader, io.Writer, error) {
log.Printf("Getting IO for process: %s", id)
process, err := c.GetProcess(id)
if err != nil {
return nil, nil, err
}
// Parse the base URL to get the host
baseURL, err := url.Parse(c.baseURL)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse base URL: %w", err)
}
// Connect to WebSocket
u := url.URL{
Scheme: "ws",
Host: baseURL.Host,
Path: fmt.Sprintf("/ws/%s", process.ID),
}
log.Printf("Connecting to WebSocket at: %s", u.String())
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to WebSocket: %w", err)
}
log.Printf("Successfully connected to WebSocket for process: %s", id)
// Create reader and writer
reader := &websocketReader{conn: conn}
writer := &websocketWriter{conn: conn}
return reader, writer, nil
}
// websocketReader implements io.Reader for WebSocket
type websocketReader struct {
conn *websocket.Conn
}
func (r *websocketReader) Read(p []byte) (n int, err error) {
_, message, err := r.conn.ReadMessage()
if err != nil {
return 0, err
}
n = copy(p, message)
return n, nil
}
// websocketWriter implements io.Writer for WebSocket
type websocketWriter struct {
conn *websocket.Conn
}
func (w *websocketWriter) Write(p []byte) (n int, err error) {
// Use BinaryMessage type for better compatibility
err = w.conn.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, fmt.Errorf("failed to write WebSocket message: %w", err)
}
return len(p), nil
}
// Close closes all connections and stops all processes
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
// Stop all processes
for id := range c.processes {
if err := c.StopProcess(id); err != nil {
return fmt.Errorf("failed to stop process %s: %w", id, err)
}
}
return nil
}
// RunProcess executes a command and returns its output
func (c *Client) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
log.Printf("Running one-time process: command=%s, args=%v", command, args)
req := struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
}{
Command: command,
Args: args,
Env: env,
}
reqBody, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("failed to marshal request: %w", err)
}
url := fmt.Sprintf("%s/run", c.baseURL)
log.Printf("Sending POST request to %s", url)
resp, err := http.Post(url, "application/json", bytes.NewReader(reqBody))
if err != nil {
return "", fmt.Errorf("failed to execute process: %w", err)
}
defer resp.Body.Close()
log.Printf("Received response with status: %d", resp.StatusCode)
var result struct {
Output string `json:"output"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("failed to decode response: %w. body: %s", err, string(body))
}
log.Printf("Successfully executed process with output length: %d", len(result.Output))
return result.Output, nil
}

View File

@@ -1,28 +0,0 @@
package stdio
import (
"os"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSTDIOTransport(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "STDIOTransport test suite")
}
var baseURL string
func init() {
baseURL = os.Getenv("LOCALAGI_MCPBOX_URL")
if baseURL == "" {
baseURL = "http://localhost:8080"
}
}
var _ = AfterSuite(func() {
client := NewClient(baseURL)
client.StopGroup("test-group")
})

View File

@@ -1,235 +0,0 @@
package stdio
import (
"context"
"time"
mcp "github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/stdio"
"github.com/mudler/LocalAGI/pkg/xlog"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Client", func() {
var (
client *Client
)
BeforeEach(func() {
client = NewClient(baseURL)
})
AfterEach(func() {
if client != nil {
Expect(client.Close()).To(Succeed())
}
})
Context("Process Management", func() {
It("should create and stop a process", func() {
ctx := context.Background()
// Use a command that doesn't exit immediately
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Hello, World!'; sleep 10"}, []string{}, "test-group")
Expect(err).NotTo(HaveOccurred())
Expect(process).NotTo(BeNil())
Expect(process.ID).NotTo(BeEmpty())
// Get process IO
reader, writer, err := client.GetProcessIO(process.ID)
Expect(err).NotTo(HaveOccurred())
Expect(reader).NotTo(BeNil())
Expect(writer).NotTo(BeNil())
// Write to process
_, err = writer.Write([]byte("test input\n"))
Expect(err).NotTo(HaveOccurred())
// Read from process with timeout
buf := make([]byte, 1024)
readDone := make(chan struct{})
var readErr error
var readN int
go func() {
readN, readErr = reader.Read(buf)
close(readDone)
}()
// Wait for read with timeout
select {
case <-readDone:
Expect(readErr).NotTo(HaveOccurred())
Expect(readN).To(BeNumerically(">", 0))
Expect(string(buf[:readN])).To(ContainSubstring("Hello, World!"))
case <-time.After(5 * time.Second):
Fail("Timeout waiting for process output")
}
// Stop the process
err = client.StopProcess(process.ID)
Expect(err).NotTo(HaveOccurred())
})
It("should manage process groups", func() {
ctx := context.Background()
groupID := "test-group"
// Create multiple processes in the same group
process1, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 1'; sleep 1"}, []string{}, groupID)
Expect(err).NotTo(HaveOccurred())
Expect(process1).NotTo(BeNil())
process2, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Process 2'; sleep 1"}, []string{}, groupID)
Expect(err).NotTo(HaveOccurred())
Expect(process2).NotTo(BeNil())
// Get group processes
processes, err := client.GetGroupProcesses(groupID)
Expect(err).NotTo(HaveOccurred())
Expect(processes).To(HaveLen(2))
// List groups
groups := client.ListGroups()
Expect(groups).To(ContainElement(groupID))
// Stop the group
err = client.StopGroup(groupID)
Expect(err).NotTo(HaveOccurred())
})
It("should run a one-time process", func() {
ctx := context.Background()
output, err := client.RunProcess(ctx, "echo", []string{"One-time process"}, []string{})
Expect(err).NotTo(HaveOccurred())
Expect(output).To(ContainSubstring("One-time process"))
})
It("should handle process with environment variables", func() {
ctx := context.Background()
env := []string{"TEST_VAR=test_value"}
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "env | grep TEST_VAR; sleep 1"}, env, "test-group")
Expect(err).NotTo(HaveOccurred())
Expect(process).NotTo(BeNil())
// Get process IO
reader, _, err := client.GetProcessIO(process.ID)
Expect(err).NotTo(HaveOccurred())
// Read environment variables with timeout
buf := make([]byte, 1024)
readDone := make(chan struct{})
var readErr error
var readN int
go func() {
readN, readErr = reader.Read(buf)
close(readDone)
}()
// Wait for read with timeout
select {
case <-readDone:
Expect(readErr).NotTo(HaveOccurred())
Expect(readN).To(BeNumerically(">", 0))
Expect(string(buf[:readN])).To(ContainSubstring("TEST_VAR=test_value"))
case <-time.After(5 * time.Second):
Fail("Timeout waiting for process output")
}
// Stop the process
err = client.StopProcess(process.ID)
Expect(err).NotTo(HaveOccurred())
})
It("should handle long-running processes", func() {
ctx := context.Background()
process, err := client.CreateProcess(ctx, "sh", []string{"-c", "echo 'Starting long process'; sleep 5"}, []string{}, "test-group")
Expect(err).NotTo(HaveOccurred())
Expect(process).NotTo(BeNil())
// Get process IO
reader, _, err := client.GetProcessIO(process.ID)
Expect(err).NotTo(HaveOccurred())
// Read initial output
buf := make([]byte, 1024)
readDone := make(chan struct{})
var readErr error
var readN int
go func() {
readN, readErr = reader.Read(buf)
close(readDone)
}()
// Wait for read with timeout
select {
case <-readDone:
Expect(readErr).NotTo(HaveOccurred())
Expect(readN).To(BeNumerically(">", 0))
Expect(string(buf[:readN])).To(ContainSubstring("Starting long process"))
case <-time.After(5 * time.Second):
Fail("Timeout waiting for process output")
}
// Wait a bit to ensure process is running
time.Sleep(time.Second)
// Stop the process
err = client.StopProcess(process.ID)
Expect(err).NotTo(HaveOccurred())
})
It("MCP", func() {
ctx := context.Background()
process, err := client.CreateProcess(ctx,
"docker", []string{"run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"},
[]string{"GITHUB_PERSONAL_ACCESS_TOKEN=test"}, "test-group")
Expect(err).NotTo(HaveOccurred())
Expect(process).NotTo(BeNil())
Expect(process.ID).NotTo(BeEmpty())
defer client.StopProcess(process.ID)
// MCP client
read, writer, err := client.GetProcessIO(process.ID)
Expect(err).NotTo(HaveOccurred())
Expect(read).NotTo(BeNil())
Expect(writer).NotTo(BeNil())
transport := stdio.NewStdioServerTransportWithIO(read, writer)
// Create a new client
mcpClient := mcp.NewClient(transport)
// Initialize the client
response, e := mcpClient.Initialize(ctx)
Expect(e).NotTo(HaveOccurred())
Expect(response).NotTo(BeNil())
Expect(mcpClient.Ping(ctx)).To(Succeed())
xlog.Debug("Client initialized: %v", response.Instructions)
alltools := []mcp.ToolRetType{}
var cursor *string
for {
tools, err := mcpClient.ListTools(ctx, cursor)
Expect(err).NotTo(HaveOccurred())
Expect(tools).NotTo(BeNil())
Expect(tools.Tools).NotTo(BeEmpty())
alltools = append(alltools, tools.Tools...)
if tools.NextCursor == nil {
break // No more pages
}
cursor = tools.NextCursor
}
for _, tool := range alltools {
xlog.Debug("Tool: %v", tool)
}
})
})
})

View File

@@ -1,473 +0,0 @@
package stdio
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/mudler/LocalAGI/pkg/xlog"
)
// Process represents a running process with its stdio streams
type Process struct {
ID string
GroupID string
Cmd *exec.Cmd
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
CreatedAt time.Time
}
// Server handles process management and stdio streaming
type Server struct {
processes map[string]*Process
groups map[string][]string // maps group ID to process IDs
mu sync.RWMutex
upgrader websocket.Upgrader
}
// NewServer creates a new stdio server
func NewServer() *Server {
return &Server{
processes: make(map[string]*Process),
groups: make(map[string][]string),
upgrader: websocket.Upgrader{},
}
}
// StartProcess starts a new process and returns its ID
func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) {
xlog.Debug("Starting process", "command", command, "args", args, "groupID", groupID)
cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
xlog.Debug("Process environment", "env", cmd.Env)
}
stdin, err := cmd.StdinPipe()
if err != nil {
return "", fmt.Errorf("failed to create stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return "", fmt.Errorf("failed to create stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return "", fmt.Errorf("failed to create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return "", fmt.Errorf("failed to start process: %w", err)
}
process := &Process{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
GroupID: groupID,
Cmd: cmd,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
CreatedAt: time.Now(),
}
s.mu.Lock()
s.processes[process.ID] = process
if groupID != "" {
s.groups[groupID] = append(s.groups[groupID], process.ID)
}
s.mu.Unlock()
xlog.Debug("Successfully started process", "id", process.ID, "pid", cmd.Process.Pid)
return process.ID, nil
}
// StopProcess stops a running process
func (s *Server) StopProcess(id string) error {
s.mu.Lock()
process, exists := s.processes[id]
if !exists {
s.mu.Unlock()
return fmt.Errorf("process not found: %s", id)
}
xlog.Debug("Stopping process", "processID", id, "pid", process.Cmd.Process.Pid)
// Remove from group if it exists
if process.GroupID != "" {
groupProcesses := s.groups[process.GroupID]
for i, pid := range groupProcesses {
if pid == id {
s.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
break
}
}
if len(s.groups[process.GroupID]) == 0 {
delete(s.groups, process.GroupID)
}
}
delete(s.processes, id)
s.mu.Unlock()
if err := process.Cmd.Process.Kill(); err != nil {
xlog.Debug("Failed to kill process", "processID", id, "pid", process.Cmd.Process.Pid, "error", err)
return fmt.Errorf("failed to kill process: %w", err)
}
xlog.Debug("Successfully killed process", "processID", id, "pid", process.Cmd.Process.Pid)
return nil
}
// StopGroup stops all processes in a group
func (s *Server) StopGroup(groupID string) error {
s.mu.Lock()
processIDs, exists := s.groups[groupID]
if !exists {
s.mu.Unlock()
return fmt.Errorf("group not found: %s", groupID)
}
s.mu.Unlock()
for _, pid := range processIDs {
if err := s.StopProcess(pid); err != nil {
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
}
}
return nil
}
// GetGroupProcesses returns all processes in a group
func (s *Server) GetGroupProcesses(groupID string) ([]*Process, error) {
s.mu.RLock()
processIDs, exists := s.groups[groupID]
if !exists {
s.mu.RUnlock()
return nil, fmt.Errorf("group not found: %s", groupID)
}
processes := make([]*Process, 0, len(processIDs))
for _, pid := range processIDs {
if process, exists := s.processes[pid]; exists {
processes = append(processes, process)
}
}
s.mu.RUnlock()
return processes, nil
}
// ListGroups returns all group IDs
func (s *Server) ListGroups() []string {
s.mu.RLock()
defer s.mu.RUnlock()
groups := make([]string, 0, len(s.groups))
for groupID := range s.groups {
groups = append(groups, groupID)
}
return groups
}
// GetProcess returns a process by ID
func (s *Server) GetProcess(id string) (*Process, error) {
s.mu.RLock()
process, exists := s.processes[id]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("process not found: %s", id)
}
return process, nil
}
// ListProcesses returns all running processes
func (s *Server) ListProcesses() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
processes := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
processes = append(processes, p)
}
return processes
}
// RunProcess executes a command and returns its output
func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}
output, err := cmd.CombinedOutput()
if err != nil {
return string(output), fmt.Errorf("process failed: %w", err)
}
return string(output), nil
}
// Start starts the HTTP server
func (s *Server) Start(addr string) error {
http.HandleFunc("/processes", s.handleProcesses)
http.HandleFunc("/processes/", s.handleProcess)
http.HandleFunc("/ws/", s.handleWebSocket)
http.HandleFunc("/groups", s.handleGroups)
http.HandleFunc("/groups/", s.handleGroup)
http.HandleFunc("/run", s.handleRun)
return http.ListenAndServe(addr, nil)
}
func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) {
log.Printf("Handling /processes request: method=%s", r.Method)
switch r.Method {
case http.MethodGet:
processes := s.ListProcesses()
json.NewEncoder(w).Encode(processes)
case http.MethodPost:
var req struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
GroupID string `json:"group_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
id, err := s.StartProcess(context.Background(), req.Command, req.Args, req.Env, req.GroupID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"id": id})
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) {
id := r.URL.Path[len("/processes/"):]
switch r.Method {
case http.MethodGet:
process, err := s.GetProcess(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(process)
case http.MethodDelete:
if err := s.StopProcess(id); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
id := r.URL.Path[len("/ws/"):]
xlog.Debug("Handling WebSocket connection", "processID", id)
process, err := s.GetProcess(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if process.Cmd.ProcessState != nil && process.Cmd.ProcessState.Exited() {
xlog.Debug("Process already exited", "processID", id)
http.Error(w, "Process already exited", http.StatusGone)
return
}
xlog.Debug("Process is running", "processID", id, "pid", process.Cmd.Process.Pid)
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
xlog.Debug("WebSocket connection established", "processID", id)
// Create a done channel to signal process completion
done := make(chan struct{})
// Handle stdin
go func() {
defer func() {
select {
case <-done:
xlog.Debug("Process stdin handler done", "processID", id)
default:
xlog.Debug("WebSocket stdin connection closed", "processID", id)
}
}()
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
xlog.Debug("WebSocket stdin unexpected error", "processID", id, "error", err)
}
return
}
xlog.Debug("Received message", "processID", id, "message", string(message))
if _, err := process.Stdin.Write(message); err != nil {
if err != io.EOF {
xlog.Debug("WebSocket stdin write error", "processID", id, "error", err)
}
return
}
xlog.Debug("Message sent to process", "processID", id, "message", string(message))
}
}()
// Handle stdout and stderr
go func() {
defer func() {
select {
case <-done:
xlog.Debug("Process output handler done", "processID", id)
default:
xlog.Debug("WebSocket output connection closed", "processID", id)
}
}()
// Create a buffer for reading
buf := make([]byte, 4096)
reader := io.MultiReader(process.Stdout, process.Stderr)
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
xlog.Debug("Read error", "processID", id, "error", err)
}
return
}
if n > 0 {
xlog.Debug("Sending message", "processID", id, "size", n)
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
xlog.Debug("WebSocket output write error", "processID", id, "error", err)
}
return
}
xlog.Debug("Message sent to client", "processID", id, "size", n)
}
}
}()
// Wait for process to exit
xlog.Debug("Waiting for process to exit", "processID", id)
err = process.Cmd.Wait()
close(done) // Signal that the process is done
if err != nil {
xlog.Debug("Process exited with error",
"processID", id,
"pid", process.Cmd.Process.Pid,
"error", err)
} else {
xlog.Debug("Process exited successfully",
"processID", id,
"pid", process.Cmd.Process.Pid)
}
}
// Add new handlers for group management
func (s *Server) handleGroups(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
groups := s.ListGroups()
json.NewEncoder(w).Encode(groups)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) {
groupID := r.URL.Path[len("/groups/"):]
switch r.Method {
case http.MethodGet:
processes, err := s.GetGroupProcesses(groupID)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(processes)
case http.MethodDelete:
if err := s.StopGroup(groupID); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
log.Printf("Handling /run request")
var req struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args)
output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("One-time process completed with output length: %d", len(output))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"output": output,
})
}

View File

@@ -10,10 +10,6 @@ import (
"github.com/sashabaranov/go-openai/jsonschema"
)
const (
MetadataBrowserAgentHistory = "browser_agent_history"
)
type BrowserAgentRunner struct {
baseURL, customActionName string
client *api.Client
@@ -66,7 +62,7 @@ func (b *BrowserAgentRunner) Run(ctx context.Context, params types.ActionParams)
return types.ActionResult{
Result: fmt.Sprintf("Browser agent completed successfully. History:\n%s", historyStr),
Metadata: map[string]interface{}{MetadataBrowserAgentHistory: stateHistory},
Metadata: map[string]interface{}{"browser_agent_history": stateHistory},
}, nil
}

View File

@@ -30,15 +30,9 @@ func NewDiscord(config map[string]string) *Discord {
duration = 5 * time.Minute
}
token := config["token"]
if !strings.HasPrefix(token, "Bot ") {
token = "Bot " + token
}
return &Discord{
conversationTracker: NewConversationTracker[string](duration),
token: token,
token: config["token"],
defaultChannel: config["defaultChannel"],
}
}

View File

@@ -77,9 +77,8 @@ func (i *IRC) Start(a *agent.Agent) {
}
i.conn.UseTLS = false
i.conn.AddCallback("001", func(e *irc.Event) {
xlog.Info("Connected to IRC server", "server", i.server, "arguments", e.Arguments)
xlog.Info("Connected to IRC server", "server", i.server)
i.conn.Join(i.channel)
i.nickname = e.Arguments[0]
xlog.Info("Joined channel", "channel", i.channel)
})
@@ -208,13 +207,6 @@ func (i *IRC) Start(a *agent.Agent) {
// Start the IRC client in a goroutine
go i.conn.Loop()
go func() {
select {
case <-a.Context().Done():
i.conn.Quit()
return
}
}()
}
// IRCConfigMeta returns the metadata for IRC connector configuration fields

View File

@@ -11,7 +11,6 @@ import (
"time"
"github.com/mudler/LocalAGI/pkg/config"
"github.com/mudler/LocalAGI/pkg/localoperator"
"github.com/mudler/LocalAGI/pkg/xlog"
"github.com/mudler/LocalAGI/pkg/xstrings"
"github.com/mudler/LocalAGI/services/actions"
@@ -168,38 +167,8 @@ func replaceUserIDsWithNamesInMessage(api *slack.Client, message string) string
return message
}
func generateAttachmentsFromJobResponse(j *types.JobResult, api *slack.Client, channelID, ts string) (attachments []slack.Attachment) {
func generateAttachmentsFromJobResponse(j *types.JobResult) (attachments []slack.Attachment) {
for _, state := range j.State {
// coming from the browser agent
if history, exists := state.Metadata[actions.MetadataBrowserAgentHistory]; exists {
if historyStruct, ok := history.(*localoperator.StateHistory); ok {
state := historyStruct.States[len(historyStruct.States)-1]
// Decode base64 screenshot and upload to Slack
if state.Screenshot != "" {
screenshotData, err := base64.StdEncoding.DecodeString(state.Screenshot)
if err != nil {
xlog.Error(fmt.Sprintf("Error decoding screenshot: %v", err))
continue
}
data := string(screenshotData)
// Upload the file to Slack
_, err = api.UploadFileV2(slack.UploadFileV2Parameters{
Reader: bytes.NewReader(screenshotData),
FileSize: len(data),
ThreadTimestamp: ts,
Channel: channelID,
Filename: "screenshot.png",
InitialComment: "Browser Agent Screenshot",
})
if err != nil {
xlog.Error(fmt.Sprintf("Error uploading screenshot: %v", err))
continue
}
}
}
}
// coming from the search action
if urls, exists := state.Metadata[actions.MetadataUrls]; exists {
for _, url := range xstrings.UniqueSlice(urls.([]string)) {
@@ -406,7 +375,7 @@ func replyWithPostMessage(finalResponse string, api *slack.Client, ev *slackeven
slack.MsgOptionEnableLinkUnfurl(),
slack.MsgOptionText(message, true),
slack.MsgOptionPostMessageParameters(postMessageParams),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res, api, ev.Channel, "")...),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...),
)
if err != nil {
xlog.Error(fmt.Sprintf("Error posting message: %v", err))
@@ -418,7 +387,7 @@ func replyWithPostMessage(finalResponse string, api *slack.Client, ev *slackeven
slack.MsgOptionEnableLinkUnfurl(),
slack.MsgOptionText(res.Response, true),
slack.MsgOptionPostMessageParameters(postMessageParams),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res, api, ev.Channel, "")...),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...),
// slack.MsgOptionTS(ts),
)
if err != nil {
@@ -439,7 +408,7 @@ func replyToUpdateMessage(finalResponse string, api *slack.Client, ev *slackeven
slack.MsgOptionLinkNames(true),
slack.MsgOptionEnableLinkUnfurl(),
slack.MsgOptionText(messages[0], true),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res, api, ev.Channel, msgTs)...),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...),
)
if err != nil {
xlog.Error(fmt.Sprintf("Error updating final message: %v", err))
@@ -466,7 +435,7 @@ func replyToUpdateMessage(finalResponse string, api *slack.Client, ev *slackeven
slack.MsgOptionLinkNames(true),
slack.MsgOptionEnableLinkUnfurl(),
slack.MsgOptionText(finalResponse, true),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res, api, ev.Channel, msgTs)...),
slack.MsgOptionAttachments(generateAttachmentsFromJobResponse(res)...),
)
if err != nil {
xlog.Error(fmt.Sprintf("Error updating final message: %v", err))

View File

@@ -23,7 +23,6 @@ oauth_config:
- commands
- groups:history
- files:read
- files:write
- im:history
- im:read
- im:write

View File

@@ -176,7 +176,17 @@ func (a *App) UpdateAgentConfig(pool *state.AgentPool) func(c *fiber.Ctx) error
return errorJSONMessage(c, err.Error())
}
if err := pool.RecreateAgent(agentName, &newConfig); err != nil {
// Remove the agent first
if err := pool.Remove(agentName); err != nil {
return errorJSONMessage(c, "Error removing agent: "+err.Error())
}
// Create agent with new config
if err := pool.CreateAgent(agentName, &newConfig); err != nil {
// Try to restore the old configuration if update fails
if restoreErr := pool.CreateAgent(agentName, oldConfig); restoreErr != nil {
return errorJSONMessage(c, fmt.Sprintf("Failed to update agent and restore failed: %v, %v", err, restoreErr))
}
return errorJSONMessage(c, "Error updating agent: "+err.Error())
}

View File

@@ -99,17 +99,8 @@ function AgentStatus() {
creation: data.creation,
progress: data.progress,
completion: data.completion,
// children are always built client-side
};
// Events can be received out of order
if (data.creation)
updated.creation = data.creation;
if (data.completion)
updated.completion = data.completion;
if (data.parent_id && !prevMap[data.parent_id])
prevMap[data.parent_id] = {
id: data.parent_id,
name: "unknown",
};
const newMap = { ...prevMap, [data.id]: updated };
setObservableTree(buildObservableTree(newMap));
return newMap;