Integrate with LocalRAG, drop RAG functionalities

This commit is contained in:
Ettore Di Giacinto
2025-02-27 23:51:02 +01:00
parent bc431ea6ff
commit 371ea63f5a
7 changed files with 93 additions and 497 deletions

View File

@@ -1,212 +0,0 @@
package state
import (
"encoding/json"
"fmt"
"io"
"github.com/mudler/LocalAgent/pkg/xlog"
"net/http"
"os"
"strings"
"sync"
. "github.com/mudler/LocalAgent/core/agent"
"jaytaylor.com/html2text"
sitemap "github.com/oxffaa/gopher-parse-sitemap"
)
type InMemoryDatabase struct {
RAGDB
sync.Mutex
Database []string
path string
}
func loadDB(path string) ([]string, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
poolData := []string{}
err = json.Unmarshal(data, &poolData)
return poolData, err
}
func NewInMemoryDB(poolfile string, store RAGDB) (*InMemoryDatabase, error) {
// if file exists, try to load an existing pool.
// if file does not exist, create a new pool.
if _, err := os.Stat(poolfile); err != nil {
// file does not exist, return a new pool
return &InMemoryDatabase{
Database: []string{},
path: poolfile,
RAGDB: store,
}, nil
}
poolData, err := loadDB(poolfile)
if err != nil {
return nil, err
}
db := &InMemoryDatabase{
RAGDB: store,
Database: poolData,
path: poolfile,
}
if err := db.populateRAGDB(); err != nil {
return nil, fmt.Errorf("error populating RAGDB: %w", err)
}
return db, nil
}
func (db *InMemoryDatabase) Data() []string {
db.Lock()
defer db.Unlock()
return db.Database
}
func (db *InMemoryDatabase) populateRAGDB() error {
for _, d := range db.Database {
if d == "" {
// skip empty chunks
continue
}
err := db.RAGDB.Store(d)
if err != nil {
return fmt.Errorf("error storing in the KB: %w", err)
}
}
return nil
}
func (db *InMemoryDatabase) Reset() error {
db.Lock()
db.Database = []string{}
db.Unlock()
if err := db.RAGDB.Reset(); err != nil {
return err
}
return db.SaveDB()
}
func (db *InMemoryDatabase) save() error {
data, err := json.Marshal(db.Database)
if err != nil {
return err
}
return os.WriteFile(db.path, data, 0644)
}
func (db *InMemoryDatabase) Store(entry string) error {
db.Lock()
defer db.Unlock()
db.Database = append(db.Database, entry)
if err := db.RAGDB.Store(entry); err != nil {
return err
}
return db.save()
}
func (db *InMemoryDatabase) SaveDB() error {
db.Lock()
defer db.Unlock()
return db.save()
}
func getWebPage(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return html2text.FromString(string(body), html2text.Options{PrettyTables: true})
}
func getWebSitemap(url string) (res []string, err error) {
err = sitemap.ParseFromSite(url, func(e sitemap.Entry) error {
xlog.Info("Sitemap page: " + e.GetLocation())
content, err := getWebPage(e.GetLocation())
if err == nil {
res = append(res, content)
}
return nil
})
return
}
func WebsiteToKB(website string, chunkSize int, db RAGDB) {
content, err := getWebSitemap(website)
if err != nil {
xlog.Info("Error walking sitemap for website", err)
}
xlog.Info("Found pages: ", len(content))
xlog.Info("ChunkSize: ", chunkSize)
StringsToKB(db, chunkSize, content...)
}
func StringsToKB(db RAGDB, chunkSize int, content ...string) {
for _, c := range content {
chunks := splitParagraphIntoChunks(c, chunkSize)
xlog.Info("chunks: ", len(chunks))
for _, chunk := range chunks {
xlog.Info("Chunk size: ", len(chunk))
db.Store(chunk)
}
}
}
// splitParagraphIntoChunks takes a paragraph and a maxChunkSize as input,
// and returns a slice of strings where each string is a chunk of the paragraph
// that is at most maxChunkSize long, ensuring that words are not split.
func splitParagraphIntoChunks(paragraph string, maxChunkSize int) []string {
if len(paragraph) <= maxChunkSize {
return []string{paragraph}
}
var chunks []string
var currentChunk strings.Builder
words := strings.Fields(paragraph) // Splits the paragraph into words.
for _, word := range words {
// If adding the next word would exceed maxChunkSize (considering a space if not the first word in a chunk),
// add the currentChunk to chunks, and reset currentChunk.
if currentChunk.Len() > 0 && currentChunk.Len()+len(word)+1 > maxChunkSize { // +1 for the space if not the first word
chunks = append(chunks, currentChunk.String())
currentChunk.Reset()
} else if currentChunk.Len() == 0 && len(word) > maxChunkSize { // Word itself exceeds maxChunkSize, split the word
chunks = append(chunks, word)
continue
}
// Add a space before the word if it's not the beginning of a new chunk.
if currentChunk.Len() > 0 {
currentChunk.WriteString(" ")
}
// Add the word to the current chunk.
currentChunk.WriteString(word)
}
// After the loop, add any remaining content in currentChunk to chunks.
if currentChunk.Len() > 0 {
chunks = append(chunks, currentChunk.String())
}
return chunks
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/mudler/LocalAgent/core/agent"
. "github.com/mudler/LocalAgent/core/agent"
"github.com/mudler/LocalAgent/core/sse"
"github.com/mudler/LocalAgent/pkg/localrag"
"github.com/mudler/LocalAgent/pkg/utils"
"github.com/mudler/LocalAgent/pkg/xlog"
@@ -20,18 +21,16 @@ import (
type AgentPool struct {
sync.Mutex
file string
pooldir string
pool AgentPoolData
agents map[string]*Agent
managers map[string]sse.Manager
agentStatus map[string]*Status
agentMemory map[string]*InMemoryDatabase
apiURL, model string
ragDB RAGDB
availableActions func(*AgentConfig) func(ctx context.Context) []Action
connectors func(*AgentConfig) []Connector
timeout string
file string
pooldir string
pool AgentPoolData
agents map[string]*Agent
managers map[string]sse.Manager
agentStatus map[string]*Status
apiURL, model, localRAGAPI, apiKey string
availableActions func(*AgentConfig) func(ctx context.Context) []Action
connectors func(*AgentConfig) []Connector
timeout string
}
type Status struct {
@@ -65,8 +64,8 @@ func loadPoolFromFile(path string) (*AgentPoolData, error) {
}
func NewAgentPool(
model, apiURL, directory string,
RagDB RAGDB,
model, apiURL, apiKey, directory string,
LocalRAGAPI string,
availableActions func(*AgentConfig) func(ctx context.Context) []agent.Action,
connectors func(*AgentConfig) []Connector,
timeout string,
@@ -83,12 +82,12 @@ func NewAgentPool(
pooldir: directory,
apiURL: apiURL,
model: model,
ragDB: RagDB,
localRAGAPI: LocalRAGAPI,
apiKey: apiKey,
agents: make(map[string]*Agent),
pool: make(map[string]AgentConfig),
agentStatus: make(map[string]*Status),
managers: make(map[string]sse.Manager),
agentMemory: make(map[string]*InMemoryDatabase),
connectors: connectors,
availableActions: availableActions,
timeout: timeout,
@@ -103,14 +102,14 @@ func NewAgentPool(
file: poolfile,
apiURL: apiURL,
pooldir: directory,
ragDB: RagDB,
model: model,
apiKey: apiKey,
agents: make(map[string]*Agent),
managers: make(map[string]sse.Manager),
agentStatus: map[string]*Status{},
agentMemory: map[string]*InMemoryDatabase{},
pool: *poolData,
connectors: connectors,
localRAGAPI: LocalRAGAPI,
availableActions: availableActions,
timeout: timeout,
}, nil
@@ -164,14 +163,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
actions := a.availableActions(config)(ctx)
stateFile, characterFile, knowledgeBase := a.stateFiles(name)
agentDB, err := NewInMemoryDB(knowledgeBase, a.ragDB)
if err != nil {
return err
}
a.agentMemory[name] = agentDB
stateFile, characterFile := a.stateFiles(name)
actionsLog := []string{}
for _, action := range actions {
@@ -205,8 +197,9 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig) error
),
WithStateFile(stateFile),
WithCharacterFile(characterFile),
WithLLMAPIKey(a.apiKey),
WithTimeout(a.timeout),
WithRAGDB(agentDB),
WithRAGDB(localrag.NewWrappedClient(a.localRAGAPI, name)),
WithAgentReasoningCallback(func(state ActionCurrentState) bool {
xlog.Info(
"Agent is thinking",
@@ -382,22 +375,20 @@ func (a *AgentPool) Start(name string) error {
return fmt.Errorf("agent %s not found", name)
}
func (a *AgentPool) stateFiles(name string) (string, string, string) {
func (a *AgentPool) stateFiles(name string) (string, string) {
stateFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.state.json", name))
characterFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.character.json", name))
knowledgeBaseFile := filepath.Join(a.pooldir, fmt.Sprintf("%s.knowledgebase.json", name))
return stateFile, characterFile, knowledgeBaseFile
return stateFile, characterFile
}
func (a *AgentPool) Remove(name string) error {
// Cleanup character and state
stateFile, characterFile, knowledgeBaseFile := a.stateFiles(name)
stateFile, characterFile := a.stateFiles(name)
os.Remove(stateFile)
os.Remove(characterFile)
os.Remove(knowledgeBaseFile)
a.Stop(name)
delete(a.agents, name)
@@ -420,10 +411,6 @@ func (a *AgentPool) GetAgent(name string) *Agent {
return a.agents[name]
}
func (a *AgentPool) GetAgentMemory(name string) *InMemoryDatabase {
return a.agentMemory[name]
}
func (a *AgentPool) GetConfig(name string) *AgentConfig {
agent, exists := a.pool[name]
if !exists {