371 lines
13 KiB
Python
371 lines
13 KiB
Python
import openai
|
|
#from langchain.embeddings import HuggingFaceEmbeddings
|
|
from langchain.embeddings import LocalAIEmbeddings
|
|
|
|
from langchain.document_loaders import (
|
|
SitemapLoader,
|
|
# GitHubIssuesLoader,
|
|
# GitLoader,
|
|
)
|
|
|
|
import uuid
|
|
import sys
|
|
from config import config
|
|
|
|
from queue import Queue
|
|
import asyncio
|
|
import threading
|
|
from localagi import LocalAGI
|
|
from loguru import logger
|
|
from ascii_magic import AsciiArt
|
|
from duckduckgo_search import DDGS
|
|
from typing import Dict, List
|
|
import os
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
import discord
|
|
import openai
|
|
import urllib.request
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
from io import StringIO
|
|
FILE_NAME_FORMAT = '%Y_%m_%d_%H_%M_%S'
|
|
|
|
EMBEDDINGS_MODEL = config["agent"]["embeddings_model"]
|
|
EMBEDDINGS_API_BASE = config["agent"]["embeddings_api_base"]
|
|
PERSISTENT_DIR = config["agent"]["persistent_dir"]
|
|
MILVUS_HOST = config["agent"]["milvus_host"] if "milvus_host" in config["agent"] else ""
|
|
MILVUS_PORT = config["agent"]["milvus_port"] if "milvus_port" in config["agent"] else 0
|
|
MEMORY_COLLECTION = config["agent"]["memory_collection"]
|
|
DB_DIR = config["agent"]["db_dir"]
|
|
MEMORY_CHUNK_SIZE = int(config["agent"]["memory_chunk_size"])
|
|
MEMORY_CHUNK_OVERLAP = int(config["agent"]["memory_chunk_overlap"])
|
|
MEMORY_RESULTS = int(config["agent"]["memory_results"])
|
|
MEMORY_SEARCH_TYPE = config["agent"]["memory_search_type"]
|
|
|
|
if not os.environ.get("PYSQL_HACK", "false") == "false":
|
|
# these three lines swap the stdlib sqlite3 lib with the pysqlite3 package for chroma
|
|
__import__('pysqlite3')
|
|
import sys
|
|
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
|
|
if MILVUS_HOST == "":
|
|
from langchain.vectorstores import Chroma
|
|
else:
|
|
from langchain.vectorstores import Milvus
|
|
|
|
embeddings = LocalAIEmbeddings(model=EMBEDDINGS_MODEL,openai_api_base=EMBEDDINGS_API_BASE)
|
|
|
|
loop = None
|
|
channel = None
|
|
def call(thing):
|
|
return asyncio.run_coroutine_threadsafe(thing,loop).result()
|
|
|
|
def ingest(a, agent_actions={}, localagi=None):
|
|
q = json.loads(a)
|
|
chunk_size = MEMORY_CHUNK_SIZE
|
|
chunk_overlap = MEMORY_CHUNK_OVERLAP
|
|
logger.info(">>> ingesting: ")
|
|
logger.info(q)
|
|
documents = []
|
|
sitemap_loader = SitemapLoader(web_path=q["url"])
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
|
documents.extend(sitemap_loader.load())
|
|
texts = text_splitter.split_documents(documents)
|
|
if MILVUS_HOST == "":
|
|
db = Chroma.from_documents(texts,embeddings,collection_name=MEMORY_COLLECTION, persist_directory=DB_DIR)
|
|
db.persist()
|
|
db = None
|
|
else:
|
|
Milvus.from_documents(texts,embeddings,collection_name=MEMORY_COLLECTION, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT})
|
|
return f"Documents ingested"
|
|
|
|
def create_image(a, agent_actions={}, localagi=None):
|
|
q = json.loads(a)
|
|
logger.info(">>> creating image: ")
|
|
logger.info(q["caption"])
|
|
size=f"{q['width']}x{q['height']}"
|
|
response = openai.Image.create(prompt=q["caption"], n=1, size=size)
|
|
image_url = response["data"][0]["url"]
|
|
image_name = download_image(image_url)
|
|
image_path = f"{PERSISTENT_DIR}{image_name}"
|
|
|
|
file = discord.File(image_path, filename=image_name)
|
|
embed = discord.Embed(title="Generated image")
|
|
embed.set_image(url=f"attachment://{image_name}")
|
|
|
|
call(channel.send(file=file, content=f"Here is what I have generated", embed=embed))
|
|
|
|
return f"Image created: {response['data'][0]['url']}"
|
|
|
|
def download_image(url: str):
|
|
file_name = f"{datetime.now().strftime(FILE_NAME_FORMAT)}.jpg"
|
|
full_path = f"{PERSISTENT_DIR}{file_name}"
|
|
urllib.request.urlretrieve(url, full_path)
|
|
return file_name
|
|
|
|
|
|
### Agent capabilities
|
|
### These functions are called by the agent to perform actions
|
|
###
|
|
def save(memory, agent_actions={}, localagi=None):
|
|
q = json.loads(memory)
|
|
logger.info(">>> saving to memories: ")
|
|
logger.info(q["content"])
|
|
if MILVUS_HOST == "":
|
|
chroma_client = Chroma(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, persist_directory=DB_DIR)
|
|
else:
|
|
chroma_client = Milvus(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT})
|
|
chroma_client.add_texts([q["content"]],[{"id": str(uuid.uuid4())}])
|
|
if MILVUS_HOST == "":
|
|
chroma_client.persist()
|
|
chroma_client = None
|
|
return f"The object was saved permanently to memory."
|
|
|
|
def search_memory(query, agent_actions={}, localagi=None):
|
|
q = json.loads(query)
|
|
if MILVUS_HOST == "":
|
|
chroma_client = Chroma(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, persist_directory=DB_DIR)
|
|
else:
|
|
chroma_client = Milvus(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT})
|
|
#docs = chroma_client.search(q["keywords"], "mmr")
|
|
retriever = chroma_client.as_retriever(search_type=MEMORY_SEARCH_TYPE, search_kwargs={"k": MEMORY_RESULTS})
|
|
|
|
docs = retriever.get_relevant_documents(q["keywords"])
|
|
text_res="Memories found in the database:\n"
|
|
|
|
sources = set() # To store unique sources
|
|
|
|
# Collect unique sources
|
|
for document in docs:
|
|
if "source" in document.metadata:
|
|
sources.add(document.metadata["source"])
|
|
|
|
for doc in docs:
|
|
# drop newlines from page_content
|
|
content = doc.page_content.replace("\n", " ")
|
|
content = " ".join(content.split())
|
|
text_res+="- "+content+"\n"
|
|
|
|
# Print the relevant sources used for the answer
|
|
for source in sources:
|
|
if source.startswith("http"):
|
|
text_res += "" + source + "\n"
|
|
|
|
chroma_client = None
|
|
#if args.postprocess:
|
|
# return post_process(text_res)
|
|
return text_res
|
|
#return localagi.post_process(text_res)
|
|
|
|
# write file to disk with content
|
|
def save_file(arg, agent_actions={}, localagi=None):
|
|
arg = json.loads(arg)
|
|
file = filename = arg["filename"]
|
|
content = arg["content"]
|
|
# create persistent dir if does not exist
|
|
if not os.path.exists(PERSISTENT_DIR):
|
|
os.makedirs(PERSISTENT_DIR)
|
|
# write the file in the directory specified
|
|
file = os.path.join(PERSISTENT_DIR, filename)
|
|
|
|
# Check if the file already exists
|
|
if os.path.exists(file):
|
|
mode = 'a' # Append mode
|
|
else:
|
|
mode = 'w' # Write mode
|
|
|
|
with open(file, mode) as f:
|
|
f.write(content)
|
|
|
|
file = discord.File(file, filename=filename)
|
|
call(channel.send(file=file, content=f"Here is what I have generated"))
|
|
return f"File {file} saved successfully."
|
|
|
|
def ddg(query: str, num_results: int, backend: str = "api") -> List[Dict[str, str]]:
|
|
"""Run query through DuckDuckGo and return metadata.
|
|
|
|
Args:
|
|
query: The query to search for.
|
|
num_results: The number of results to return.
|
|
|
|
Returns:
|
|
A list of dictionaries with the following keys:
|
|
snippet - The description of the result.
|
|
title - The title of the result.
|
|
link - The link to the result.
|
|
"""
|
|
ddgs = DDGS()
|
|
try:
|
|
results = ddgs.text(
|
|
query,
|
|
backend=backend,
|
|
)
|
|
if results is None:
|
|
return [{"Result": "No good DuckDuckGo Search Result was found"}]
|
|
|
|
def to_metadata(result: Dict) -> Dict[str, str]:
|
|
if backend == "news":
|
|
return {
|
|
"date": result["date"],
|
|
"title": result["title"],
|
|
"snippet": result["body"],
|
|
"source": result["source"],
|
|
"link": result["url"],
|
|
}
|
|
return {
|
|
"snippet": result["body"],
|
|
"title": result["title"],
|
|
"link": result["href"],
|
|
}
|
|
|
|
formatted_results = []
|
|
for i, res in enumerate(results, 1):
|
|
if res is not None:
|
|
formatted_results.append(to_metadata(res))
|
|
if len(formatted_results) == num_results:
|
|
break
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return []
|
|
return formatted_results
|
|
|
|
## Search on duckduckgo
|
|
def search_duckduckgo(a, agent_actions={}, localagi=None):
|
|
a = json.loads(a)
|
|
list=ddg(a["query"], 2)
|
|
|
|
text_res=""
|
|
for doc in list:
|
|
text_res+=f"""{doc["link"]}: {doc["title"]} {doc["snippet"]}\n"""
|
|
|
|
#if args.postprocess:
|
|
# return post_process(text_res)
|
|
return text_res
|
|
#l = json.dumps(list)
|
|
#return l
|
|
|
|
### End Agent capabilities
|
|
###
|
|
|
|
### Agent action definitions
|
|
agent_actions = {
|
|
"generate_picture": {
|
|
"function": create_image,
|
|
"plannable": True,
|
|
"description": 'For creating a picture, the assistant replies with "generate_picture" and a detailed caption, enhancing it with as much detail as possible.',
|
|
"signature": {
|
|
"name": "generate_picture",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"caption": {
|
|
"type": "string",
|
|
},
|
|
"width": {
|
|
"type": "number",
|
|
},
|
|
"height": {
|
|
"type": "number",
|
|
},
|
|
},
|
|
}
|
|
},
|
|
},
|
|
"search_internet": {
|
|
"function": search_duckduckgo,
|
|
"plannable": True,
|
|
"description": 'For searching the internet with a query, the assistant replies with the action "search_internet" and the query to search.',
|
|
"signature": {
|
|
"name": "search_internet",
|
|
"description": """For searching internet.""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {
|
|
"type": "string",
|
|
"description": "information to save"
|
|
},
|
|
},
|
|
}
|
|
},
|
|
},
|
|
"save_file": {
|
|
"function": save_file,
|
|
"plannable": True,
|
|
"description": 'The assistant replies with the action "save_file", the filename and content to save for writing a file to disk permanently. This can be used to store the result of complex actions locally.',
|
|
"signature": {
|
|
"name": "save_file",
|
|
"description": """For saving a file to disk with content.""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"filename": {
|
|
"type": "string",
|
|
"description": "information to save"
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "information to save"
|
|
},
|
|
},
|
|
}
|
|
},
|
|
},
|
|
"ingest": {
|
|
"function": ingest,
|
|
"plannable": True,
|
|
"description": 'The assistant replies with the action "ingest" when there is an url to a sitemap to ingest memories from.',
|
|
"signature": {
|
|
"name": "ingest",
|
|
"description": """Save or store informations into memory.""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"url": {
|
|
"type": "string",
|
|
"description": "information to save"
|
|
},
|
|
},
|
|
"required": ["url"]
|
|
}
|
|
},
|
|
},
|
|
"save_memory": {
|
|
"function": save,
|
|
"plannable": True,
|
|
"description": 'The assistant replies with the action "save_memory" and the string to remember or store an information that thinks it is relevant permanently.',
|
|
"signature": {
|
|
"name": "save_memory",
|
|
"description": """Save or store informations into memory.""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {
|
|
"type": "string",
|
|
"description": "information to save"
|
|
},
|
|
},
|
|
"required": ["content"]
|
|
}
|
|
},
|
|
},
|
|
"search_memory": {
|
|
"function": search_memory,
|
|
"plannable": True,
|
|
"description": 'The assistant replies with the action "search_memory" for searching between its memories with a query term.',
|
|
"signature": {
|
|
"name": "search_memory",
|
|
"description": """Search in memory""",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"keywords": {
|
|
"type": "string",
|
|
"description": "reasoning behind the intent"
|
|
},
|
|
},
|
|
"required": ["keywords"]
|
|
}
|
|
},
|
|
},
|
|
} |