Fix race conditions
This commit is contained in:
@@ -134,11 +134,13 @@ func NewAgentPool(
|
|||||||
// and starts it.
|
// and starts it.
|
||||||
// It also saves the state to the file.
|
// It also saves the state to the file.
|
||||||
func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error {
|
func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
if _, ok := a.pool[name]; ok {
|
if _, ok := a.pool[name]; ok {
|
||||||
return fmt.Errorf("agent %s already exists", name)
|
return fmt.Errorf("agent %s already exists", name)
|
||||||
}
|
}
|
||||||
a.pool[name] = *agentConfig
|
a.pool[name] = *agentConfig
|
||||||
if err := a.Save(); err != nil {
|
if err := a.save(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,6 +148,8 @@ func (a *AgentPool) CreateAgent(name string, agentConfig *AgentConfig) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) List() []string {
|
func (a *AgentPool) List() []string {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
var agents []string
|
var agents []string
|
||||||
for agent := range a.pool {
|
for agent := range a.pool {
|
||||||
agents = append(agents, agent)
|
agents = append(agents, agent)
|
||||||
@@ -365,6 +369,8 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
|
|||||||
|
|
||||||
// Starts all the agents in the pool
|
// Starts all the agents in the pool
|
||||||
func (a *AgentPool) StartAll() error {
|
func (a *AgentPool) StartAll() error {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
for name, config := range a.pool {
|
for name, config := range a.pool {
|
||||||
if a.agents[name] != nil { // Agent already started
|
if a.agents[name] != nil { // Agent already started
|
||||||
continue
|
continue
|
||||||
@@ -377,18 +383,27 @@ func (a *AgentPool) StartAll() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) StopAll() {
|
func (a *AgentPool) StopAll() {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
for _, agent := range a.agents {
|
for _, agent := range a.agents {
|
||||||
agent.Stop()
|
agent.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) Stop(name string) {
|
func (a *AgentPool) Stop(name string) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
a.stop(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AgentPool) stop(name string) {
|
||||||
if agent, ok := a.agents[name]; ok {
|
if agent, ok := a.agents[name]; ok {
|
||||||
agent.Stop()
|
agent.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) Start(name string) error {
|
func (a *AgentPool) Start(name string) error {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
if agent, ok := a.agents[name]; ok {
|
if agent, ok := a.agents[name]; ok {
|
||||||
err := agent.Run()
|
err := agent.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -412,35 +427,43 @@ func (a *AgentPool) stateFiles(name string) (string, string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) Remove(name string) error {
|
func (a *AgentPool) Remove(name string) error {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
// Cleanup character and state
|
// Cleanup character and state
|
||||||
stateFile, characterFile := a.stateFiles(name)
|
stateFile, characterFile := a.stateFiles(name)
|
||||||
|
|
||||||
os.Remove(stateFile)
|
os.Remove(stateFile)
|
||||||
os.Remove(characterFile)
|
os.Remove(characterFile)
|
||||||
|
|
||||||
a.Stop(name)
|
a.stop(name)
|
||||||
delete(a.agents, name)
|
delete(a.agents, name)
|
||||||
delete(a.pool, name)
|
delete(a.pool, name)
|
||||||
if err := a.Save(); err != nil {
|
if err := a.save(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) Save() error {
|
func (a *AgentPool) Save() error {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
return a.save()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AgentPool) save() error {
|
||||||
data, err := json.MarshalIndent(a.pool, "", " ")
|
data, err := json.MarshalIndent(a.pool, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return os.WriteFile(a.file, data, 0644)
|
return os.WriteFile(a.file, data, 0644)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) GetAgent(name string) *Agent {
|
func (a *AgentPool) GetAgent(name string) *Agent {
|
||||||
return a.agents[name]
|
return a.agents[name]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) GetConfig(name string) *AgentConfig {
|
func (a *AgentPool) GetConfig(name string) *AgentConfig {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
agent, exists := a.pool[name]
|
agent, exists := a.pool[name]
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil
|
return nil
|
||||||
@@ -449,5 +472,7 @@ func (a *AgentPool) GetConfig(name string) *AgentConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentPool) GetManager(name string) sse.Manager {
|
func (a *AgentPool) GetManager(name string) sse.Manager {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
return a.managers[name]
|
return a.managers[name]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/mudler/LocalAgent/core/agent"
|
"github.com/mudler/LocalAgent/core/agent"
|
||||||
"github.com/mudler/LocalAgent/core/sse"
|
"github.com/mudler/LocalAgent/core/sse"
|
||||||
"github.com/mudler/LocalAgent/core/state"
|
"github.com/mudler/LocalAgent/core/state"
|
||||||
|
"github.com/mudler/LocalAgent/pkg/xlog"
|
||||||
"github.com/mudler/LocalAgent/services"
|
"github.com/mudler/LocalAgent/services"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -51,7 +52,12 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) {
|
|||||||
webapp.Get("/agents", func(c *fiber.Ctx) error {
|
webapp.Get("/agents", func(c *fiber.Ctx) error {
|
||||||
statuses := map[string]bool{}
|
statuses := map[string]bool{}
|
||||||
for _, a := range pool.List() {
|
for _, a := range pool.List() {
|
||||||
statuses[a] = !pool.GetAgent(a).Paused()
|
agent := pool.GetAgent(a)
|
||||||
|
if agent == nil {
|
||||||
|
xlog.Error("Agent not found", "name", a)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
statuses[a] = !agent.Paused()
|
||||||
}
|
}
|
||||||
return c.Render("views/agents", fiber.Map{
|
return c.Render("views/agents", fiber.Map{
|
||||||
"Agents": pool.List(),
|
"Agents": pool.List(),
|
||||||
|
|||||||
Reference in New Issue
Block a user