diff --git a/core/state/memory.go b/core/state/memory.go deleted file mode 100644 index c54d0e3..0000000 --- a/core/state/memory.go +++ /dev/null @@ -1,212 +0,0 @@ -package state - -import ( - "encoding/json" - "fmt" - "io" - - "github.com/mudler/LocalAgent/pkg/xlog" - - "net/http" - "os" - "strings" - "sync" - - . "github.com/mudler/LocalAgent/core/agent" - "jaytaylor.com/html2text" - - sitemap "github.com/oxffaa/gopher-parse-sitemap" -) - -type InMemoryDatabase struct { - RAGDB - sync.Mutex - Database []string - path string -} - -func loadDB(path string) ([]string, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - poolData := []string{} - err = json.Unmarshal(data, &poolData) - return poolData, err -} - -func NewInMemoryDB(poolfile string, store RAGDB) (*InMemoryDatabase, error) { - // if file exists, try to load an existing pool. - // if file does not exist, create a new pool. - - if _, err := os.Stat(poolfile); err != nil { - // file does not exist, return a new pool - return &InMemoryDatabase{ - Database: []string{}, - path: poolfile, - RAGDB: store, - }, nil - } - - poolData, err := loadDB(poolfile) - if err != nil { - return nil, err - } - db := &InMemoryDatabase{ - RAGDB: store, - Database: poolData, - path: poolfile, - } - - if err := db.populateRAGDB(); err != nil { - return nil, fmt.Errorf("error populating RAGDB: %w", err) - } - - return db, nil -} - -func (db *InMemoryDatabase) Data() []string { - db.Lock() - defer db.Unlock() - return db.Database -} - -func (db *InMemoryDatabase) populateRAGDB() error { - for _, d := range db.Database { - if d == "" { - // skip empty chunks - continue - } - err := db.RAGDB.Store(d) - if err != nil { - return fmt.Errorf("error storing in the KB: %w", err) - } - } - - return nil -} - -func (db *InMemoryDatabase) Reset() error { - db.Lock() - db.Database = []string{} - db.Unlock() - if err := db.RAGDB.Reset(); err != nil { - return err - } - return db.SaveDB() -} - -func (db *InMemoryDatabase) save() error { - data, err := json.Marshal(db.Database) - if err != nil { - return err - } - - return os.WriteFile(db.path, data, 0644) -} - -func (db *InMemoryDatabase) Store(entry string) error { - db.Lock() - defer db.Unlock() - db.Database = append(db.Database, entry) - if err := db.RAGDB.Store(entry); err != nil { - return err - } - return db.save() -} - -func (db *InMemoryDatabase) SaveDB() error { - db.Lock() - defer db.Unlock() - return db.save() -} - -func getWebPage(url string) (string, error) { - resp, err := http.Get(url) - if err != nil { - return "", err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } - return html2text.FromString(string(body), html2text.Options{PrettyTables: true}) -} - -func getWebSitemap(url string) (res []string, err error) { - err = sitemap.ParseFromSite(url, func(e sitemap.Entry) error { - xlog.Info("Sitemap page: " + e.GetLocation()) - content, err := getWebPage(e.GetLocation()) - if err == nil { - res = append(res, content) - } - return nil - }) - return -} - -func WebsiteToKB(website string, chunkSize int, db RAGDB) { - content, err := getWebSitemap(website) - if err != nil { - xlog.Info("Error walking sitemap for website", err) - } - xlog.Info("Found pages: ", len(content)) - xlog.Info("ChunkSize: ", chunkSize) - - StringsToKB(db, chunkSize, content...) -} - -func StringsToKB(db RAGDB, chunkSize int, content ...string) { - for _, c := range content { - chunks := splitParagraphIntoChunks(c, chunkSize) - xlog.Info("chunks: ", len(chunks)) - for _, chunk := range chunks { - xlog.Info("Chunk size: ", len(chunk)) - db.Store(chunk) - } - } -} - -// splitParagraphIntoChunks takes a paragraph and a maxChunkSize as input, -// and returns a slice of strings where each string is a chunk of the paragraph -// that is at most maxChunkSize long, ensuring that words are not split. -func splitParagraphIntoChunks(paragraph string, maxChunkSize int) []string { - if len(paragraph) <= maxChunkSize { - return []string{paragraph} - } - - var chunks []string - var currentChunk strings.Builder - - words := strings.Fields(paragraph) // Splits the paragraph into words. - - for _, word := range words { - // If adding the next word would exceed maxChunkSize (considering a space if not the first word in a chunk), - // add the currentChunk to chunks, and reset currentChunk. - if currentChunk.Len() > 0 && currentChunk.Len()+len(word)+1 > maxChunkSize { // +1 for the space if not the first word - chunks = append(chunks, currentChunk.String()) - currentChunk.Reset() - } else if currentChunk.Len() == 0 && len(word) > maxChunkSize { // Word itself exceeds maxChunkSize, split the word - chunks = append(chunks, word) - continue - } - - // Add a space before the word if it's not the beginning of a new chunk. - if currentChunk.Len() > 0 { - currentChunk.WriteString(" ") - } - - // Add the word to the current chunk. - currentChunk.WriteString(word) - } - - // After the loop, add any remaining content in currentChunk to chunks. - if currentChunk.Len() > 0 { - chunks = append(chunks, currentChunk.String()) - } - - return chunks -} diff --git a/core/state/pool.go b/core/state/pool.go index 92e435c..ac15ae8 100644 --- a/core/state/pool.go +++ b/core/state/pool.go @@ -13,6 +13,7 @@ import ( "github.com/mudler/LocalAgent/core/agent" . "github.com/mudler/LocalAgent/core/agent" "github.com/mudler/LocalAgent/core/sse" + "github.com/mudler/LocalAgent/pkg/localrag" "github.com/mudler/LocalAgent/pkg/utils" "github.com/mudler/LocalAgent/pkg/xlog" @@ -20,18 +21,16 @@ import ( type AgentPool struct { sync.Mutex - file string - pooldir string - pool AgentPoolData - agents map[string]*Agent - managers map[string]sse.Manager - agentStatus map[string]*Status - agentMemory map[string]*InMemoryDatabase - apiURL, model string - ragDB RAGDB - availableActions func(*AgentConfig) func(ctx context.Context) []Action - connectors func(*AgentConfig) []Connector - timeout string + file string + pooldir string + pool AgentPoolData + agents map[string]*Agent + managers map[string]sse.Manager + agentStatus map[string]*Status + apiURL, model, localRAGAPI, apiKey string + availableActions func(*AgentConfig) func(ctx context.Context) []Action + connectors func(*AgentConfig) []Connector + timeout string } type Status struct { @@ -65,8 +64,8 @@ func loadPoolFromFile(path string) (*AgentPoolData, error) { } func NewAgentPool( - model, apiURL, directory string, - RagDB RAGDB, + model, apiURL, apiKey, directory string, + LocalRAGAPI string, availableActions func(*AgentConfig) func(ctx context.Context) []agent.Action, connectors func(*AgentConfig) []Connector, timeout string, @@ -83,12 +82,12 @@ func NewAgentPool( pooldir: directory, apiURL: apiURL, model: model, - ragDB: RagDB, + localRAGAPI: LocalRAGAPI, + apiKey: apiKey, agents: make(map[string]*Agent), pool: make(map[string]AgentConfig), agentStatus: make(map[string]*Status), managers: make(map[string]sse.Manager), - agentMemory: make(map[string]*InMemoryDatabase), connectors: connectors, availableActions: availableActions, timeout: timeout, @@ -103,14 +102,14 @@ func NewAgentPool( file: poolfile, apiURL: apiURL, pooldir: directory, - ragDB: RagDB, model: model, + apiKey: apiKey, agents: make(map[string]*Agent), managers: make(map[string]sse.Manager), agentStatus: map[string]*Status{}, - agentMemory: map[string]*InMemoryDatabase{}, pool: *poolData, connectors: connectors, + localRAGAPI: LocalRAGAPI, availableActions: availableActions, timeout: timeout, }, nil @@ -164,14 +163,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error actions := a.availableActions(config)(ctx) - stateFile, characterFile, knowledgeBase := a.stateFiles(name) - - agentDB, err := NewInMemoryDB(knowledgeBase, a.ragDB) - if err != nil { - return err - } - - a.agentMemory[name] = agentDB + stateFile, characterFile := a.stateFiles(name) actionsLog := []string{} for _, action := range actions { @@ -205,8 +197,9 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error ), WithStateFile(stateFile), WithCharacterFile(characterFile), + WithLLMAPIKey(a.apiKey), WithTimeout(a.timeout), - WithRAGDB(agentDB), + WithRAGDB(localrag.NewWrappedClient(a.localRAGAPI, name)), WithAgentReasoningCallback(func(state ActionCurrentState) bool { xlog.Info( "Agent is thinking", @@ -382,22 +375,20 @@ func (a *AgentPool) Start(name string) error { return fmt.Errorf("agent %s not found", name) } -func (a *AgentPool) stateFiles(name string) (string, string, string) { +func (a *AgentPool) stateFiles(name string) (string, string) { stateFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.state.json", name)) characterFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.character.json", name)) - knowledgeBaseFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.knowledgebase.json", name)) - return stateFile, characterFile, knowledgeBaseFile + return stateFile, characterFile } func (a *AgentPool) Remove(name string) error { // Cleanup character and state - stateFile, characterFile, knowledgeBaseFile := a.stateFiles(name) + stateFile, characterFile := a.stateFiles(name) os.Remove(stateFile) os.Remove(characterFile) - os.Remove(knowledgeBaseFile) a.Stop(name) delete(a.agents, name) @@ -420,10 +411,6 @@ func (a *AgentPool) GetAgent(name string) *Agent { return a.agents[name] } -func (a *AgentPool) GetAgentMemory(name string) *InMemoryDatabase { - return a.agentMemory[name] -} - func (a *AgentPool) GetConfig(name string) *AgentConfig { agent, exists := a.pool[name] if !exists { diff --git a/main.go b/main.go index e5f1e60..1bbd82e 100644 --- a/main.go +++ b/main.go @@ -5,20 +5,16 @@ import ( "os" "path/filepath" - "github.com/mudler/LocalAgent/core/agent" "github.com/mudler/LocalAgent/core/state" - "github.com/mudler/LocalAgent/pkg/llm" - rag "github.com/mudler/LocalAgent/pkg/vectorstore" "github.com/mudler/LocalAgent/webui" ) var testModel = os.Getenv("TEST_MODEL") var apiURL = os.Getenv("API_URL") var apiKey = os.Getenv("API_KEY") -var vectorStore = os.Getenv("VECTOR_STORE") var timeout = os.Getenv("TIMEOUT") -var embeddingModel = os.Getenv("EMBEDDING_MODEL") var stateDir = os.Getenv("STATE_DIR") +var localRAG = os.Getenv("LOCAL_RAG") func init() { if testModel == "" { @@ -40,33 +36,12 @@ func init() { } } -func ragDB() (ragDB agent.RAGDB) { - lai := llm.NewClient(apiKey, apiURL+"/v1", timeout) - - switch vectorStore { - case "localai": - laiStore := rag.NewStoreClient(apiURL, apiKey) - ragDB = rag.NewLocalAIRAGDB(laiStore, lai) - default: - var err error - ragDB, err = rag.NewChromemDB("local-agent-framework", stateDir, lai, embeddingModel) - if err != nil { - panic(err) - } - } - - return -} - func main() { // make sure state dir exists os.MkdirAll(stateDir, 0755) - // Initialize rag DB connection - ragDB := ragDB() - // Create the agent pool - pool, err := state.NewAgentPool(testModel, apiURL, stateDir, ragDB, webui.Actions, webui.Connectors, timeout) + pool, err := state.NewAgentPool(testModel, apiURL, apiKey, stateDir, localRAG, webui.Actions, webui.Connectors, timeout) if err != nil { panic(err) } diff --git a/pkg/localrag/client.go b/pkg/localrag/client.go index 4a0165a..63c82c5 100644 --- a/pkg/localrag/client.go +++ b/pkg/localrag/client.go @@ -1,8 +1,10 @@ // TODO: this is a duplicate of LocalRAG/pkg/client -package client +package localrag import ( "bytes" + "crypto/md5" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -10,8 +12,73 @@ import ( "mime/multipart" "net/http" "os" + "path/filepath" + + "github.com/mudler/LocalAgent/core/agent" ) +var _ agent.RAGDB = &WrappedClient{} + +type WrappedClient struct { + *Client + collection string +} + +func NewWrappedClient(baseURL, collection string) *WrappedClient { + return &WrappedClient{ + Client: NewClient(baseURL), + collection: collection, + } +} + +func (c *WrappedClient) Count() int { + entries, err := c.ListEntries(c.collection) + if err != nil { + return 0 + } + return len(entries) +} + +func (c *WrappedClient) Reset() error { + return c.Client.Reset(c.collection) +} + +func (c *WrappedClient) Search(s string, similarity int) ([]string, error) { + results, err := c.Client.Search(c.collection, s, similarity) + if err != nil { + return nil, err + } + var res []string + for _, r := range results { + res = append(res, fmt.Sprintf("%s (%+v)", r.Content, r.Metadata)) + } + return res, nil +} + +func (c *WrappedClient) Store(s string) error { + // the Client API of LocalRAG takes only files at the moment. + // So we take the string that we want to store, write it to a file, and then store the file. + + hash := md5.Sum([]byte(s)) + fileName := hex.EncodeToString(hash[:]) + ".txt" + + tempdir, err := os.MkdirTemp("", "localrag") + if err != nil { + return err + } + + defer os.RemoveAll(tempdir) + + f := filepath.Join(tempdir, fileName) + err = os.WriteFile(f, []byte(s), 0644) + if err != nil { + return err + } + + defer os.Remove(f) + return c.Client.Store(c.collection, f) +} + // Result represents a single result from a query. type Result struct { ID string diff --git a/webui/app.go b/webui/app.go index 69cb8e1..04f86f4 100644 --- a/webui/app.go +++ b/webui/app.go @@ -49,146 +49,6 @@ func NewApp(opts ...Option) *App { return a } -func (a *App) KnowledgeBaseReset(pool *state.AgentPool) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - db := pool.GetAgentMemory(c.Params("name")) - db.Reset() - return c.Redirect("/knowledgebase/" + c.Params("name")) - } -} - -func (a *App) KnowledgeBaseExport(pool *state.AgentPool) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - db := pool.GetAgentMemory(c.Params("name")) - knowledgeBase := db.Data() - - c.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s.knowledgebase.json", c.Params("name"))) - return c.JSON(knowledgeBase) - } -} - -func (a *App) KnowledgeBaseImport(pool *state.AgentPool) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - file, err := c.FormFile("file") - if err != nil { - // Handle error - return err - } - - os.MkdirAll("./uploads", os.ModePerm) - - destination := fmt.Sprintf("./uploads/%s", file.Filename) - if err := c.SaveFile(file, destination); err != nil { - // Handle error - return err - } - - data, err := os.ReadFile(destination) - if err != nil { - return err - } - - knowledge := []string{} - if err := json.Unmarshal(data, &knowledge); err != nil { - return err - } - - if len(knowledge) > 0 { - xlog.Info("Importing agent KB") - db := pool.GetAgentMemory(c.Params("name")) - db.Reset() - - for _, k := range knowledge { - db.Store(k) - } - - } else { - return fmt.Errorf("Empty knowledge base") - } - - return c.Redirect("/agents") - } -} - -func (a *App) KnowledgeBaseFile(pool *state.AgentPool) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - agent := pool.GetAgent(c.Params("name")) - db := agent.Memory() - - // https://golang.withcodeexample.com/blog/file-upload-handling-golang-fiber-guide/ - file, err := c.FormFile("file") - if err != nil { - // Handle error - return err - } - - payload := struct { - ChunkSize int `form:"chunk_size"` - }{} - - if err := c.BodyParser(&payload); err != nil { - return err - } - - os.MkdirAll("./uploads", os.ModePerm) - - destination := fmt.Sprintf("./uploads/%s", file.Filename) - if err := c.SaveFile(file, destination); err != nil { - // Handle error - return err - } - - xlog.Info("File uploaded to: " + destination) - fmt.Printf("Payload: %+v\n", payload) - - content, err := readPdf(destination) // Read local pdf file - if err != nil { - panic(err) - } - - xlog.Info("Content is", content) - chunkSize := a.config.DefaultChunkSize - if payload.ChunkSize > 0 { - chunkSize = payload.ChunkSize - } - - go state.StringsToKB(db, chunkSize, content) - - _, err = c.WriteString(chatDiv("File uploaded", "gray")) - - return err - } -} - -func (a *App) KnowledgeBase(pool *state.AgentPool) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - agent := pool.GetAgent(c.Params("name")) - db := agent.Memory() - - payload := struct { - URL string `form:"url"` - ChunkSize int `form:"chunk_size"` - }{} - - if err := c.BodyParser(&payload); err != nil { - return err - } - - website := payload.URL - if website == "" { - return fmt.Errorf("please enter a URL") - } - chunkSize := a.config.DefaultChunkSize - if payload.ChunkSize > 0 { - chunkSize = payload.ChunkSize - } - - go state.WebsiteToKB(website, chunkSize, db) - - return c.Redirect("/knowledgebase/" + c.Params("name")) - } -} - func (a *App) Notify(pool *state.AgentPool) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { payload := struct { diff --git a/webui/routes.go b/webui/routes.go index c835a0f..803d071 100644 --- a/webui/routes.go +++ b/webui/routes.go @@ -50,17 +50,6 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) { }) }) - webapp.Get("/knowledgebase/:name", func(c *fiber.Ctx) error { - db := pool.GetAgentMemory(c.Params("name")) - return c.Render( - "views/knowledgebase", - fiber.Map{ - "KnowledgebaseItemsCount": db.Count(), - "Name": c.Params("name"), - }, - ) - }) - // Define a route for the GET method on the root path '/' webapp.Get("/sse/:name", func(c *fiber.Ctx) error { m := pool.GetManager(c.Params("name")) @@ -92,12 +81,6 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) { webapp.Put("/pause/:name", app.Pause(pool)) webapp.Put("/start/:name", app.Start(pool)) - webapp.Post("/knowledgebase/:name", app.KnowledgeBase(pool)) - webapp.Post("/knowledgebase/:name/upload", app.KnowledgeBaseFile(pool)) - webapp.Delete("/knowledgebase/:name/reset", app.KnowledgeBaseReset(pool)) - webapp.Post("/knowledgebase/:name/import", app.KnowledgeBaseImport(pool)) - webapp.Get("/knowledgebase/:name/export", app.KnowledgeBaseExport(pool)) - webapp.Get("/talk/:name", func(c *fiber.Ctx) error { return c.Render("views/chat", fiber.Map{ // "Character": agent.Character, diff --git a/webui/views/knowledgebase.html b/webui/views/knowledgebase.html deleted file mode 100644 index c2cea87..0000000 --- a/webui/views/knowledgebase.html +++ /dev/null @@ -1,64 +0,0 @@ - - - - Knowledgebase for {{.Name}} - {{template "views/partials/header"}} - - - {{template "views/partials/menu"}} -
-

Knowledgebase (items: {{.KnowledgebaseItemsCount}})

-
-
- -
-
-

Add sites to KB

- - - - - -
-
- - -
-
-

Upload File

- - - - - - -
-
- - -
- -
- -
-

Export

- Export -
- -
-
-

Import

- - - - -
-
-
- - -