Files
LocalAGI/main.py
2023-08-01 22:47:31 +02:00

757 lines
29 KiB
Python

import openai
#from langchain.embeddings import HuggingFaceEmbeddings
from langchain.embeddings import LocalAIEmbeddings
import uuid
import requests
import sys
from loguru import logger
from ascii_magic import AsciiArt
from duckduckgo_search import DDGS
from typing import Dict, List, Optional
# these three lines swap the stdlib sqlite3 lib with the pysqlite3 package for chroma
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
from langchain.vectorstores import Chroma
from chromadb.config import Settings
import json
import os
# Parse arguments such as system prompt and batch mode
import argparse
parser = argparse.ArgumentParser(description='μAGI')
parser.add_argument('--system-prompt', dest='system_prompt', action='store',
help='System prompt to use')
parser.add_argument('--prompt', dest='prompt', action='store', default=False,
help='Prompt mode')
# skip avatar creation
parser.add_argument('--skip-avatar', dest='skip_avatar', action='store_true', default=False,
help='Skip avatar creation')
# Reevaluate
parser.add_argument('--re-evaluate', dest='re_evaluate', action='store_true', default=False,
help='Reevaluate if another action is needed or we have completed the user request')
# Postprocess
parser.add_argument('--postprocess', dest='postprocess', action='store_true', default=False,
help='Postprocess the reasoning')
# Subtask context
parser.add_argument('--subtask-context', dest='subtaskContext', action='store_true', default=False,
help='Include context in subtasks')
args = parser.parse_args()
FUNCTIONS_MODEL = os.environ.get("FUNCTIONS_MODEL", "functions")
EMBEDDINGS_MODEL = os.environ.get("EMBEDDINGS_MODEL", "all-MiniLM-L6-v2")
LLM_MODEL = os.environ.get("LLM_MODEL", "gpt-4")
VOICE_MODEL= os.environ.get("TTS_MODEL","en-us-kathleen-low.onnx")
DEFAULT_SD_MODEL = os.environ.get("DEFAULT_SD_MODEL", "stablediffusion")
DEFAULT_SD_PROMPT = os.environ.get("DEFAULT_SD_PROMPT", "floating hair, portrait, ((loli)), ((one girl)), cute face, hidden hands, asymmetrical bangs, beautiful detailed eyes, eye shadow, hair ornament, ribbons, bowties, buttons, pleated skirt, (((masterpiece))), ((best quality)), colorful|((part of the head)), ((((mutated hands and fingers)))), deformed, blurry, bad anatomy, disfigured, poorly drawn face, mutation, mutated, extra limb, ugly, poorly drawn hands, missing limb, blurry, floating limbs, disconnected limbs, malformed hands, blur, out of focus, long neck, long body, Octane renderer, lowres, bad anatomy, bad hands, text")
PERSISTENT_DIR = os.environ.get("PERSISTENT_DIR", "/data")
## Constants
REPLY_ACTION = "reply"
PLAN_ACTION = "multitask_action"
embeddings = LocalAIEmbeddings(model=EMBEDDINGS_MODEL)
chroma_client = Chroma(collection_name="memories", persist_directory="db", embedding_function=embeddings)
# Function to create images with LocalAI
def display_avatar(input_text=DEFAULT_SD_PROMPT, model=DEFAULT_SD_MODEL):
response = openai.Image.create(
prompt=input_text,
n=1,
size="128x128",
api_base=os.environ.get("OPENAI_API_BASE", "http://api:8080")+"/v1"
)
image_url = response['data'][0]['url']
# convert the image to ascii art
my_art = AsciiArt.from_url(image_url)
my_art.to_terminal()
# Function to create audio with LocalAI
def tts(input_text, model=VOICE_MODEL):
# strip newlines from text
input_text = input_text.replace("\n", ".")
# Create a temp file to store the audio output
output_file_path = '/tmp/output.wav'
# get from OPENAI_API_BASE env var
url = os.environ.get("OPENAI_API_BASE", "http://api:8080") + '/tts'
headers = {'Content-Type': 'application/json'}
data = {
"input": input_text,
"model": model
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
with open(output_file_path, 'wb') as f:
f.write(response.content)
logger.info('Audio file saved successfully:', output_file_path)
else:
logger.info('Request failed with status code', response.status_code)
# Use aplay to play the audio
os.system('aplay ' + output_file_path)
# remove the audio file
os.remove(output_file_path)
# Function to analyze the user input and pick the next action to do
def needs_to_do_action(user_input,agent_actions={}):
# Get the descriptions and the actions name (the keys)
descriptions=action_description("", agent_actions)
messages = [
{"role": "user",
"content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform, including reasoning, and the confidence interval from 0 to 100.
{descriptions}"""},
{"role": "user",
"content": f"""{user_input}
Function call: """
}
]
functions = [
{
"name": "intent",
"description": """Decide to do an action.""",
"parameters": {
"type": "object",
"properties": {
"confidence": {
"type": "number",
"description": "confidence of the action"
},
"action": {
"type": "string",
"enum": list(agent_actions.keys()),
"description": "user intent"
},
"reasoning": {
"type": "string",
"description": "reasoning behind the intent"
},
},
"required": ["action"]
}
},
]
response = openai.ChatCompletion.create(
#model="gpt-3.5-turbo",
model=FUNCTIONS_MODEL,
messages=messages,
request_timeout=1200,
functions=functions,
stop=None,
temperature=0.1,
#function_call="auto"
function_call={"name": "intent"},
)
response_message = response["choices"][0]["message"]
if response_message.get("function_call"):
function_name = response.choices[0].message["function_call"].name
function_parameters = response.choices[0].message["function_call"].arguments
# read the json from the string
res = json.loads(function_parameters)
logger.info(">>> function name: "+function_name)
logger.info(">>> function parameters: "+function_parameters)
return res
return {"action": REPLY_ACTION}
# This is used to collect the descriptions of the agent actions, used to populate the LLM prompt
def action_description(action, agent_actions):
descriptions=""
# generate descriptions of actions that the agent can pick
for a in agent_actions:
if ( action != "" and action == a ) or (action == ""):
descriptions+=agent_actions[a]["description"]+"\n"
return descriptions
## This function is called to ask the user if does agree on the action to take and execute
def ask_user_confirmation(action_name, action_parameters):
logger.info("==> Ask user confirmation")
logger.info("==> action_name: {action_name}", action_name=action_name)
logger.info("==> action_parameters: {action_parameters}", action_parameters=action_parameters)
# Ask via stdin
logger.info("==> Do you want to execute the action? (y/n)")
user_input = input()
if user_input == "y":
logger.info("==> Executing action")
return True
else:
logger.info("==> Skipping action")
return False
### This function is used to process the functions given a user input.
### It picks a function, executes it and returns the list of messages containing the result.
def process_functions(user_input, action="", agent_actions={}):
descriptions=action_description(action, agent_actions)
messages = [
# {"role": "system", "content": "You are a helpful assistant."},
{"role": "user",
"content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform, including reasoning, and the confidence interval from 0 to 100.
{descriptions}"""},
{"role": "user",
"content": f"""{user_input}
Function call: """
}
]
response = function_completion(messages, action=action,agent_actions=agent_actions)
response_message = response["choices"][0]["message"]
response_result = ""
function_result = {}
if response_message.get("function_call"):
function_name = response.choices[0].message["function_call"].name
function_parameters = response.choices[0].message["function_call"].arguments
logger.info("==> function name: ")
logger.info(function_name)
logger.info("==> function parameters: ")
logger.info(function_parameters)
function_to_call = agent_actions[function_name]["function"]
function_result = function_to_call(function_parameters, agent_actions=agent_actions)
logger.info("==> function result: ")
logger.info(function_result)
messages.append(
{
"role": "assistant",
"content": None,
"function_call": {"name": function_name, "arguments": function_parameters,},
}
)
messages.append(
{
"role": "function",
"name": function_name,
"content": str(function_result)
}
)
return messages, function_result
### function_completion is used to autocomplete functions given a list of messages
def function_completion(messages, action="", agent_actions={}):
function_call = "auto"
if action != "":
function_call={"name": action}
logger.info("==> function_call: ")
logger.info(function_call)
# get the functions from the signatures of the agent actions, if exists
functions = []
for action in agent_actions:
if agent_actions[action].get("signature"):
functions.append(agent_actions[action]["signature"])
response = openai.ChatCompletion.create(
#model="gpt-3.5-turbo",
model=FUNCTIONS_MODEL,
messages=messages,
functions=functions,
request_timeout=1200,
stop=None,
temperature=0.1,
function_call=function_call
)
return response
# Rework the content of each message in the history in a way that is understandable by the LLM
# TODO: switch to templates (?)
def process_history(conversation_history):
messages = ""
for message in conversation_history:
# if there is content append it
if message.get("content") and message["role"] == "function":
messages+="Function result: " + message["content"]+"\n"
elif message.get("function_call"):
# encode message["function_call" to json and appends it
fcall = json.dumps(message["function_call"])
messages+="Assistant calls function: " +fcall+"\n"
elif message.get("content") and message["role"] == "user":
messages+="User message: "+message["content"]+"\n"
elif message.get("content") and message["role"] == "assistant":
messages+="Assistant message: "+message["content"]+"\n"
return messages
### Main evaluate function
### This function evaluates in a continuous loop the user input and the conversation history.
### It returns the conversation history with the latest response from the assistant.
def evaluate(user_input, conversation_history = [],re_evaluate=False, agent_actions={},re_evaluation_in_progress=False, postprocess=False, subtaskContext=False):
messages = [
{
"role": "user",
"content": user_input,
}
]
conversation_history.extend(messages)
# pulling the old history make the context grow exponentially
# and most importantly it repeates the first message with the commands again and again.
# it needs a bit of cleanup and process the messages and piggyback more LocalAI functions templates
# old_history = process_history(conversation_history)
# action_picker_message = "Conversation history:\n"+old_history
# action_picker_message += "\n"
action_picker_message = "Request: "+user_input
if re_evaluation_in_progress:
action_picker_message+="\nRe-evaluation if another action is needed or we have completed the user request."
action_picker_message+="\nReasoning: If no action is needed, I will use "+REPLY_ACTION+" to reply to the user."
try:
action = needs_to_do_action(action_picker_message,agent_actions=agent_actions)
except Exception as e:
logger.error("==> error: ")
logger.error(e)
action = {"action": REPLY_ACTION}
if action["action"] != REPLY_ACTION:
logger.info("==> μAGI wants to call '{action}'", action=action["action"])
logger.info("==> Reasoning '{reasoning}'", reasoning=action["reasoning"])
if action["action"] == PLAN_ACTION:
logger.info("==> It's a plan <==: ")
if postprocess:
action["reasoning"] = post_process(action["reasoning"])
#function_completion_message = "Conversation history:\n"+old_history+"\n"+
function_completion_message = "Request: "+user_input+"\nReasoning: "+action["reasoning"]
responses, function_results = process_functions(function_completion_message, action=action["action"], agent_actions=agent_actions)
# if there are no subtasks, we can just reply,
# otherwise we execute the subtasks
# First we check if it's an object
if isinstance(function_results, dict) and function_results.get("subtasks") and len(function_results["subtasks"]) > 0:
# cycle subtasks and execute functions
subtask_result=""
for subtask in function_results["subtasks"]:
logger.info("==> subtask: ")
logger.info(subtask)
#ctr="Context: "+user_input+"\nThought: "+action["reasoning"]+ "\nRequest: "+subtask["reasoning"]
cr="Context: "+user_input+"\n"
#cr=""
if subtask_result != "" and subtaskContext:
# Include cumulative results of previous subtasks
# TODO: this grows context, maybe we should use a different approach or summarize
if postprocess:
cr+= "Subtask results: "+post_process(subtask_result)+"\n"
else:
cr+="Subtask results: "+subtask_result+"\n"
if postprocess:
cr+= "Request: "+post_process(subtask["reasoning"])
else:
cr+= "Request: "+subtask["reasoning"]
subtask_response, function_results = process_functions(cr, subtask["function"],agent_actions=agent_actions)
subtask_result+=process_history(subtask_response)
responses.extend(subtask_response)
if re_evaluate:
## Better output or this infinite loops..
logger.info("-> Re-evaluate if another action is needed")
## ? conversation history should go after the user_input maybe?
re_eval = user_input +"\n"
re_eval += "Conversation history: \n"
if postprocess:
re_eval+= post_process(process_history(responses[1:])) +"\n"
else:
re_eval+= process_history(responses[1:]) +"\n"
responses = evaluate(re_eval, responses, re_evaluate,agent_actions=agent_actions,re_evaluation_in_progress=True)
if re_evaluation_in_progress:
conversation_history.extend(responses)
return conversation_history
responses.append(
{
"role": "system",
"content": "Return an appropriate answer to the user given the context above."
}
)
response = openai.ChatCompletion.create(
model=LLM_MODEL,
messages=responses,
stop=None,
request_timeout=1200,
temperature=0.1,
)
responses.append(
{
"role": "assistant",
"content": response.choices[0].message["content"],
}
)
# add responses to conversation history by extending the list
conversation_history.extend(responses)
# logger.info the latest response from the conversation history
logger.info(conversation_history[-1]["content"])
tts(conversation_history[-1]["content"])
else:
logger.info("==> no action needed")
if re_evaluation_in_progress:
logger.info("==> μAGI has completed the user request")
logger.info("==> μAGI will reply to the user")
return conversation_history
# get the response from the model
response = openai.ChatCompletion.create(
model=LLM_MODEL,
messages=conversation_history,
stop=None,
temperature=0.1,
request_timeout=1200,
)
# add the response to the conversation history by extending the list
conversation_history.append({ "role": "assistant", "content": response.choices[0].message["content"]})
# logger.info the latest response from the conversation history
logger.info(conversation_history[-1]["content"])
tts(conversation_history[-1]["content"])
return conversation_history
### Fine tune a string before feeding into the LLM
def post_process(string):
messages = [
{
"role": "user",
"content": f"""Summarize the following text, keeping the relevant information:
```
{string}
```
""",
}
]
logger.info("==> Post processing: {string}", string=string)
# get the response from the model
response = openai.ChatCompletion.create(
model=LLM_MODEL,
messages=messages,
stop=None,
temperature=0.1,
request_timeout=1200,
)
result = response["choices"][0]["message"]["content"]
logger.info("==> Processed: {string}", string=result)
return result
### Agent capabilities
### These functions are called by the agent to perform actions
###
def save(memory, agent_actions={}):
q = json.loads(memory)
logger.info(">>> saving to memories: ")
logger.info(q["thought"])
chroma_client.add_texts([q["thought"]],[{"id": str(uuid.uuid4())}])
chroma_client.persist()
return f"The object was saved permanently to memory."
def search(query, agent_actions={}):
q = json.loads(query)
docs = chroma_client.similarity_search(q["reasoning"])
text_res="Memories found in the database:\n"
for doc in docs:
text_res+="- "+doc.page_content+"\n"
return text_res
def calculate_plan(user_input, agent_actions={}):
res = json.loads(user_input)
logger.info("--> Calculating plan: {description}", description=res["description"])
descriptions=action_description("",agent_actions)
messages = [
{"role": "user",
"content": f"""Transcript of AI assistant responding to user requests.
{descriptions}
Request: {res["description"]}
The assistant replies with a plan to answer the request with a list of subtasks with logical steps. The reasoning includes a self-contained, detailed and descriptive instruction to fullfill the task.
Function call: """
}
]
# get list of plannable actions
plannable_actions = []
for action in agent_actions:
if agent_actions[action]["plannable"]:
# append the key of the dict to plannable_actions
plannable_actions.append(action)
functions = [
{
"name": "plan",
"description": """Decide to do an action.""",
"parameters": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": "subtask list",
},
"function": {
"type": "string",
"enum": plannable_actions,
},
},
},
},
},
"required": ["subtasks"]
}
},
]
response = openai.ChatCompletion.create(
#model="gpt-3.5-turbo",
model=FUNCTIONS_MODEL,
messages=messages,
functions=functions,
stop=None,
temperature=0.1,
#function_call="auto"
function_call={"name": "plan"},
)
response_message = response["choices"][0]["message"]
if response_message.get("function_call"):
function_name = response.choices[0].message["function_call"].name
function_parameters = response.choices[0].message["function_call"].arguments
# read the json from the string
res = json.loads(function_parameters)
logger.info("<<< function name: {function_name} >>>> parameters: {parameters}", function_name=function_name,parameters=function_parameters)
return res
return {"action": REPLY_ACTION}
# write file to disk with content
def write_file(arg, agent_actions={}):
arg = json.loads(arg)
filename = arg["filename"]
content = arg["content"]
# create persistent dir if does not exist
if not os.path.exists(PERSISTENT_DIR):
os.makedirs(PERSISTENT_DIR)
# write the file in the directory specified
filename = os.path.join(PERSISTENT_DIR, filename)
with open(filename, 'w') as f:
f.write(content)
return f"File {filename} saved successfully."
def ddg(query: str, num_results: int, backend: str = "api") -> List[Dict[str, str]]:
"""Run query through DuckDuckGo and return metadata.
Args:
query: The query to search for.
num_results: The number of results to return.
Returns:
A list of dictionaries with the following keys:
snippet - The description of the result.
title - The title of the result.
link - The link to the result.
"""
with DDGS() as ddgs:
results = ddgs.text(
query,
backend=backend,
)
if results is None:
return [{"Result": "No good DuckDuckGo Search Result was found"}]
def to_metadata(result: Dict) -> Dict[str, str]:
if backend == "news":
return {
"date": result["date"],
"title": result["title"],
"snippet": result["body"],
"source": result["source"],
"link": result["url"],
}
return {
"snippet": result["body"],
"title": result["title"],
"link": result["href"],
}
formatted_results = []
for i, res in enumerate(results, 1):
if res is not None:
formatted_results.append(to_metadata(res))
if len(formatted_results) == num_results:
break
return formatted_results
## Search on duckduckgo
def search_duckduckgo(args, agent_actions={}):
args = json.loads(args)
list=ddg(args["query"], 5)
l = json.dumps(list)
return l
### End Agent capabilities
###
### Agent action definitions
agent_actions = {
"search_internet": {
"function": search_duckduckgo,
"plannable": True,
"description": 'For searching the internet with a query, the assistant replies with the action "search_internet" and the query to search.',
"signature": {
"name": "search_internet",
"description": """For searching internet.""",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "information to save"
},
},
}
},
},
"write_file": {
"function": write_file,
"plannable": True,
"description": 'The assistant replies with the action "write_file", the filename and content to save for writing a file to disk permanently. This can be used to store the result of complex actions locally.',
"signature": {
"name": "write_file",
"description": """For saving a file to disk with content.""",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "information to save"
},
"content": {
"type": "string",
"description": "information to save"
},
},
}
},
},
"save_memory": {
"function": save,
"plannable": True,
"description": 'The assistant replies with the action "save_memory" and the string to remember or store an information that thinks it is relevant permanently.',
"signature": {
"name": "save_memory",
"description": """Save or store informations into memory.""",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "information to save"
},
},
"required": ["thought"]
}
},
},
"search_memory": {
"function": search,
"plannable": True,
"description": 'The assistant replies with the action "search_memory" for searching between its memories with a query term.',
"signature": {
"name": "search_memory",
"description": """Search in memory""",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": "reasoning behind the intent"
},
},
"required": ["reasoning"]
}
},
},
PLAN_ACTION: {
"function": calculate_plan,
"plannable": False,
"description": 'The assistant for solving complex tasks that involves more than one action or planning actions in sequence, replies with the action "'+PLAN_ACTION+'" and a detailed list of all the subtasks.',
"signature": {
"name": PLAN_ACTION,
"description": """Plan complex tasks.""",
"parameters": {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "reasoning behind the planning"
},
},
"required": ["description"]
}
},
},
REPLY_ACTION: {
"function": None,
"plannable": False,
"description": 'For replying to the user, the assistant replies with the action "'+REPLY_ACTION+'" and the reply to the user directly when there is nothing to do.',
},
}
conversation_history = []
# Set a system prompt if SYSTEM_PROMPT is set
if os.environ.get("SYSTEM_PROMPT") or args.system_prompt:
sprompt = os.environ.get("SYSTEM_PROMPT", args.system_prompt)
conversation_history.append({
"role": "system",
"content": sprompt
})
logger.info("Welcome to μAGI")
# Skip avatar creation if --skip-avatar is set
if not args.skip_avatar:
logger.info("Creating avatar, please wait...")
display_avatar()
if not args.prompt:
logger.info("μAGI has the following actions available at its disposal:")
for action in agent_actions:
logger.info("{action} - {description}", action=action, description=agent_actions[action]["description"])
else:
logger.info(">>> Prompt mode <<<")
logger.info(args.prompt)
# IF in prompt mode just evaluate, otherwise loop
if args.prompt:
evaluate(
args.prompt,
conversation_history,
re_evaluate=args.re_evaluate,
agent_actions=agent_actions,
# Enable to lower context usage but increases LLM calls
postprocess=args.postprocess,
subtaskContext=args.subtaskContext,
)
else:
# TODO: process functions also considering the conversation history? conversation history + input
while True:
user_input = input("> ")
# we are going to use the args to change the evaluation behavior
conversation_history=evaluate(
user_input,
conversation_history,
re_evaluate=args.re_evaluate,
agent_actions=agent_actions,
# Enable to lower context usage but increases LLM calls
postprocess=args.postprocess,
subtaskContext=args.subtaskContext,
)