Files
LocalAGI/pkg/stdio/server.go
mudler 608d7aba85 quality of life improvements
Signed-off-by: mudler <mudler@localai.io>
2025-04-24 12:46:47 +02:00

482 lines
12 KiB
Go

package stdio
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Process represents a running process with its stdio streams
type Process struct {
ID string
GroupID string
Cmd *exec.Cmd
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
CreatedAt time.Time
}
// Server handles process management and stdio streaming
type Server struct {
processes map[string]*Process
groups map[string][]string // maps group ID to process IDs
mu sync.RWMutex
upgrader websocket.Upgrader
}
// NewServer creates a new stdio server
func NewServer() *Server {
return &Server{
processes: make(map[string]*Process),
groups: make(map[string][]string),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
},
}
}
// StartProcess starts a new process and returns its ID
func (s *Server) StartProcess(ctx context.Context, command string, args []string, env []string, groupID string) (string, error) {
log.Printf("Starting process: command=%s, args=%v, groupID=%s", command, args, groupID)
cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}
stdin, err := cmd.StdinPipe()
if err != nil {
return "", fmt.Errorf("failed to create stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return "", fmt.Errorf("failed to create stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return "", fmt.Errorf("failed to create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return "", fmt.Errorf("failed to start process: %w", err)
}
process := &Process{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
GroupID: groupID,
Cmd: cmd,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
CreatedAt: time.Now(),
}
s.mu.Lock()
s.processes[process.ID] = process
if groupID != "" {
s.groups[groupID] = append(s.groups[groupID], process.ID)
}
s.mu.Unlock()
log.Printf("Successfully started process with ID: %s", process.ID)
return process.ID, nil
}
// StopProcess stops a running process
func (s *Server) StopProcess(id string) error {
s.mu.Lock()
process, exists := s.processes[id]
if !exists {
s.mu.Unlock()
return fmt.Errorf("process not found: %s", id)
}
// Remove from group if it exists
if process.GroupID != "" {
groupProcesses := s.groups[process.GroupID]
for i, pid := range groupProcesses {
if pid == id {
s.groups[process.GroupID] = append(groupProcesses[:i], groupProcesses[i+1:]...)
break
}
}
if len(s.groups[process.GroupID]) == 0 {
delete(s.groups, process.GroupID)
}
}
delete(s.processes, id)
s.mu.Unlock()
if err := process.Cmd.Process.Kill(); err != nil {
return fmt.Errorf("failed to kill process: %w", err)
}
return nil
}
// StopGroup stops all processes in a group
func (s *Server) StopGroup(groupID string) error {
s.mu.Lock()
processIDs, exists := s.groups[groupID]
if !exists {
s.mu.Unlock()
return fmt.Errorf("group not found: %s", groupID)
}
s.mu.Unlock()
for _, pid := range processIDs {
if err := s.StopProcess(pid); err != nil {
return fmt.Errorf("failed to stop process %s in group %s: %w", pid, groupID, err)
}
}
return nil
}
// GetGroupProcesses returns all processes in a group
func (s *Server) GetGroupProcesses(groupID string) ([]*Process, error) {
s.mu.RLock()
processIDs, exists := s.groups[groupID]
if !exists {
s.mu.RUnlock()
return nil, fmt.Errorf("group not found: %s", groupID)
}
processes := make([]*Process, 0, len(processIDs))
for _, pid := range processIDs {
if process, exists := s.processes[pid]; exists {
processes = append(processes, process)
}
}
s.mu.RUnlock()
return processes, nil
}
// ListGroups returns all group IDs
func (s *Server) ListGroups() []string {
s.mu.RLock()
defer s.mu.RUnlock()
groups := make([]string, 0, len(s.groups))
for groupID := range s.groups {
groups = append(groups, groupID)
}
return groups
}
// GetProcess returns a process by ID
func (s *Server) GetProcess(id string) (*Process, error) {
s.mu.RLock()
process, exists := s.processes[id]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("process not found: %s", id)
}
return process, nil
}
// ListProcesses returns all running processes
func (s *Server) ListProcesses() []*Process {
s.mu.RLock()
defer s.mu.RUnlock()
processes := make([]*Process, 0, len(s.processes))
for _, p := range s.processes {
processes = append(processes, p)
}
return processes
}
// RunProcess executes a command and returns its output
func (s *Server) RunProcess(ctx context.Context, command string, args []string, env []string) (string, error) {
cmd := exec.CommandContext(ctx, command, args...)
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}
output, err := cmd.CombinedOutput()
if err != nil {
return string(output), fmt.Errorf("process failed: %w", err)
}
return string(output), nil
}
// Start starts the HTTP server
func (s *Server) Start(addr string) error {
http.HandleFunc("/processes", s.handleProcesses)
http.HandleFunc("/processes/", s.handleProcess)
http.HandleFunc("/ws/", s.handleWebSocket)
http.HandleFunc("/groups", s.handleGroups)
http.HandleFunc("/groups/", s.handleGroup)
http.HandleFunc("/run", s.handleRun)
return http.ListenAndServe(addr, nil)
}
func (s *Server) handleProcesses(w http.ResponseWriter, r *http.Request) {
log.Printf("Handling /processes request: method=%s", r.Method)
switch r.Method {
case http.MethodGet:
processes := s.ListProcesses()
json.NewEncoder(w).Encode(processes)
case http.MethodPost:
var req struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
GroupID string `json:"group_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
id, err := s.StartProcess(r.Context(), req.Command, req.Args, req.Env, req.GroupID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"id": id})
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleProcess(w http.ResponseWriter, r *http.Request) {
id := r.URL.Path[len("/processes/"):]
switch r.Method {
case http.MethodGet:
process, err := s.GetProcess(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(process)
case http.MethodDelete:
if err := s.StopProcess(id); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
id := r.URL.Path[len("/ws/"):]
log.Printf("Handling WebSocket connection for process: %s", id)
process, err := s.GetProcess(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
log.Printf("WebSocket connection established for process: %s", id)
// Create a done channel to signal process completion
done := make(chan struct{})
// Create buffers to capture output
var stdoutBuf, stderrBuf bytes.Buffer
stdoutTee := io.TeeReader(process.Stdout, &stdoutBuf)
stderrTee := io.TeeReader(process.Stderr, &stderrBuf)
// Handle stdin
go func() {
defer func() {
select {
case <-done:
// Process already done, this is expected
default:
log.Printf("WebSocket stdin connection closed for process %s", id)
}
}()
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket stdin unexpected error for process %s: %v", id, err)
}
return
}
if _, err := process.Stdin.Write(message); err != nil {
if err != io.EOF {
log.Printf("WebSocket stdin write error for process %s: %v", id, err)
}
return
}
}
}()
// Handle stdout
go func() {
defer func() {
select {
case <-done:
// Process already done, this is expected
default:
log.Printf("WebSocket stdout connection closed for process %s", id)
}
}()
buf := make([]byte, 1024)
for {
n, err := stdoutTee.Read(buf)
if err != nil {
if err != io.EOF {
log.Printf("WebSocket stdout read error for process %s: %v", id, err)
}
return
}
if err := conn.WriteMessage(websocket.TextMessage, buf[:n]); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket stdout write error for process %s: %v", id, err)
}
return
}
}
}()
// Handle stderr
go func() {
defer func() {
select {
case <-done:
// Process already done, this is expected
default:
log.Printf("WebSocket stderr connection closed for process %s", id)
}
}()
buf := make([]byte, 1024)
for {
n, err := stderrTee.Read(buf)
if err != nil {
if err != io.EOF {
log.Printf("WebSocket stderr read error for process %s: %v", id, err)
}
return
}
if err := conn.WriteMessage(websocket.TextMessage, buf[:n]); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket stderr write error for process %s: %v", id, err)
}
return
}
}
}()
// Wait for process to exit
err = process.Cmd.Wait()
close(done) // Signal that the process is done
if err != nil {
log.Printf("Process %s exited with error: %v\nstdout: %s\nstderr: %s",
id, err, stdoutBuf.String(), stderrBuf.String())
} else {
log.Printf("Process %s exited successfully", id)
}
}
// Add new handlers for group management
func (s *Server) handleGroups(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
groups := s.ListGroups()
json.NewEncoder(w).Encode(groups)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleGroup(w http.ResponseWriter, r *http.Request) {
groupID := r.URL.Path[len("/groups/"):]
switch r.Method {
case http.MethodGet:
processes, err := s.GetGroupProcesses(groupID)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(processes)
case http.MethodDelete:
if err := s.StopGroup(groupID); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
log.Printf("Handling /run request")
var req struct {
Command string `json:"command"`
Args []string `json:"args"`
Env []string `json:"env"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Printf("Executing one-time process: command=%s, args=%v", req.Command, req.Args)
output, err := s.RunProcess(r.Context(), req.Command, req.Args, req.Env)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("One-time process completed with output length: %d", len(output))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"output": output,
})
}