import openai #from langchain.embeddings import HuggingFaceEmbeddings from langchain.embeddings import LocalAIEmbeddings from langchain.document_loaders import ( SitemapLoader, # GitHubIssuesLoader, # GitLoader, ) import uuid import sys from config import config from queue import Queue import asyncio import threading from localagi import LocalAGI from loguru import logger from ascii_magic import AsciiArt from duckduckgo_search import DDGS from typing import Dict, List import os from langchain.text_splitter import RecursiveCharacterTextSplitter import discord import openai import urllib.request from datetime import datetime import json import os from io import StringIO FILE_NAME_FORMAT = '%Y_%m_%d_%H_%M_%S' EMBEDDINGS_MODEL = config["agent"]["embeddings_model"] EMBEDDINGS_API_BASE = config["agent"]["embeddings_api_base"] PERSISTENT_DIR = config["agent"]["persistent_dir"] MILVUS_HOST = config["agent"]["milvus_host"] if "milvus_host" in config["agent"] else "" MILVUS_PORT = config["agent"]["milvus_port"] if "milvus_port" in config["agent"] else 0 MEMORY_COLLECTION = config["agent"]["memory_collection"] DB_DIR = config["agent"]["db_dir"] MEMORY_CHUNK_SIZE = int(config["agent"]["memory_chunk_size"]) MEMORY_CHUNK_OVERLAP = int(config["agent"]["memory_chunk_overlap"]) MEMORY_RESULTS = int(config["agent"]["memory_results"]) MEMORY_SEARCH_TYPE = config["agent"]["memory_search_type"] if not os.environ.get("PYSQL_HACK", "false") == "false": # these three lines swap the stdlib sqlite3 lib with the pysqlite3 package for chroma __import__('pysqlite3') import sys sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') if MILVUS_HOST == "": from langchain.vectorstores import Chroma else: from langchain.vectorstores import Milvus embeddings = LocalAIEmbeddings(model=EMBEDDINGS_MODEL,openai_api_base=EMBEDDINGS_API_BASE) loop = None channel = None def call(thing): return asyncio.run_coroutine_threadsafe(thing,loop).result() def ingest(a, agent_actions={}, localagi=None): q = json.loads(a) chunk_size = MEMORY_CHUNK_SIZE chunk_overlap = MEMORY_CHUNK_OVERLAP logger.info(">>> ingesting: ") logger.info(q) documents = [] sitemap_loader = SitemapLoader(web_path=q["url"]) text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) documents.extend(sitemap_loader.load()) texts = text_splitter.split_documents(documents) if MILVUS_HOST == "": db = Chroma.from_documents(texts,embeddings,collection_name=MEMORY_COLLECTION, persist_directory=DB_DIR) db.persist() db = None else: Milvus.from_documents(texts,embeddings,collection_name=MEMORY_COLLECTION, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}) return f"Documents ingested" def create_image(a, agent_actions={}, localagi=None): q = json.loads(a) logger.info(">>> creating image: ") logger.info(q["description"]) size=f"{q['width']}x{q['height']}" response = openai.Image.create(prompt=q["description"], n=1, size=size) image_url = response["data"][0]["url"] image_name = download_image(image_url) image_path = f"{PERSISTENT_DIR}{image_name}" file = discord.File(image_path, filename=image_name) embed = discord.Embed(title="Generated image") embed.set_image(url=f"attachment://{image_name}") call(channel.send(file=file, content=f"Here is what I have generated", embed=embed)) return f"Image created: {response['data'][0]['url']}" def download_image(url: str): file_name = f"{datetime.now().strftime(FILE_NAME_FORMAT)}.jpg" full_path = f"{PERSISTENT_DIR}{file_name}" urllib.request.urlretrieve(url, full_path) return file_name ### Agent capabilities ### These functions are called by the agent to perform actions ### def save(memory, agent_actions={}, localagi=None): q = json.loads(memory) logger.info(">>> saving to memories: ") logger.info(q["content"]) if MILVUS_HOST == "": chroma_client = Chroma(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, persist_directory=DB_DIR) else: chroma_client = Milvus(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}) chroma_client.add_texts([q["content"]],[{"id": str(uuid.uuid4())}]) if MILVUS_HOST == "": chroma_client.persist() chroma_client = None return f"The object was saved permanently to memory." def search_memory(query, agent_actions={}, localagi=None): q = json.loads(query) if MILVUS_HOST == "": chroma_client = Chroma(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, persist_directory=DB_DIR) else: chroma_client = Milvus(collection_name=MEMORY_COLLECTION,embedding_function=embeddings, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}) #docs = chroma_client.search(q["keywords"], "mmr") retriever = chroma_client.as_retriever(search_type=MEMORY_SEARCH_TYPE, search_kwargs={"k": MEMORY_RESULTS}) docs = retriever.get_relevant_documents(q["keywords"]) text_res="Memories found in the database:\n" sources = set() # To store unique sources # Collect unique sources for document in docs: if "source" in document.metadata: sources.add(document.metadata["source"]) for doc in docs: # drop newlines from page_content content = doc.page_content.replace("\n", " ") content = " ".join(content.split()) text_res+="- "+content+"\n" # Print the relevant sources used for the answer for source in sources: if source.startswith("http"): text_res += "" + source + "\n" chroma_client = None #if args.postprocess: # return post_process(text_res) return text_res #return localagi.post_process(text_res) # write file to disk with content def save_file(arg, agent_actions={}, localagi=None): arg = json.loads(arg) file = filename = arg["filename"] content = arg["content"] # create persistent dir if does not exist if not os.path.exists(PERSISTENT_DIR): os.makedirs(PERSISTENT_DIR) # write the file in the directory specified file = os.path.join(PERSISTENT_DIR, filename) # Check if the file already exists if os.path.exists(file): mode = 'a' # Append mode else: mode = 'w' # Write mode with open(file, mode) as f: f.write(content) file = discord.File(file, filename=filename) call(channel.send(file=file, content=f"Here is what I have generated")) return f"File {file} saved successfully." def ddg(query: str, num_results: int, backend: str = "api") -> List[Dict[str, str]]: """Run query through DuckDuckGo and return metadata. Args: query: The query to search for. num_results: The number of results to return. Returns: A list of dictionaries with the following keys: snippet - The description of the result. title - The title of the result. link - The link to the result. """ ddgs = DDGS() try: results = ddgs.text( query, backend=backend, ) if results is None: return [{"Result": "No good DuckDuckGo Search Result was found"}] def to_metadata(result: Dict) -> Dict[str, str]: if backend == "news": return { "date": result["date"], "title": result["title"], "snippet": result["body"], "source": result["source"], "link": result["url"], } return { "snippet": result["body"], "title": result["title"], "link": result["href"], } formatted_results = [] for i, res in enumerate(results, 1): if res is not None: formatted_results.append(to_metadata(res)) if len(formatted_results) == num_results: break except Exception as e: logger.error(e) return [] return formatted_results ## Search on duckduckgo def search_duckduckgo(a, agent_actions={}, localagi=None): a = json.loads(a) list=ddg(a["query"], 2) text_res="" for doc in list: text_res+=f"""{doc["link"]}: {doc["title"]} {doc["snippet"]}\n""" #if args.postprocess: # return post_process(text_res) return text_res #l = json.dumps(list) #return l ### End Agent capabilities ### ### Agent action definitions agent_actions = { "generate_picture": { "function": create_image, "plannable": True, "description": 'For creating a picture, the assistant replies with "generate_picture" and a detailed description, enhancing it with as much detail as possible.', "signature": { "name": "generate_picture", "parameters": { "type": "object", "properties": { "description": { "type": "string", }, "width": { "type": "number", }, "height": { "type": "number", }, }, } }, }, "search_internet": { "function": search_duckduckgo, "plannable": True, "description": 'For searching the internet with a query, the assistant replies with the action "search_internet" and the query to search.', "signature": { "name": "search_internet", "description": """For searching internet.""", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "information to save" }, }, } }, }, "save_file": { "function": save_file, "plannable": True, "description": 'The assistant replies with the action "save_file", the filename and content to save for writing a file to disk permanently. This can be used to store the result of complex actions locally.', "signature": { "name": "save_file", "description": """For saving a file to disk with content.""", "parameters": { "type": "object", "properties": { "filename": { "type": "string", "description": "information to save" }, "content": { "type": "string", "description": "information to save" }, }, } }, }, "ingest": { "function": ingest, "plannable": True, "description": 'The assistant replies with the action "ingest" when there is an url to a sitemap to ingest memories from.', "signature": { "name": "ingest", "description": """Save or store informations into memory.""", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "information to save" }, }, "required": ["url"] } }, }, "save_memory": { "function": save, "plannable": True, "description": 'The assistant replies with the action "save_memory" and the string to remember or store an information that thinks it is relevant permanently.', "signature": { "name": "save_memory", "description": """Save or store informations into memory.""", "parameters": { "type": "object", "properties": { "content": { "type": "string", "description": "information to save" }, }, "required": ["content"] } }, }, "search_memory": { "function": search_memory, "plannable": True, "description": 'The assistant replies with the action "search_memory" for searching between its memories with a query term.', "signature": { "name": "search_memory", "description": """Search in memory""", "parameters": { "type": "object", "properties": { "keywords": { "type": "string", "description": "reasoning behind the intent" }, }, "required": ["keywords"] } }, }, }