From 4c1c601417664406a715ebe657897ff28756b3a1 Mon Sep 17 00:00:00 2001 From: mudler Date: Sat, 19 Aug 2023 23:47:26 +0200 Subject: [PATCH] move logic to a class that can be imported --- Dockerfile | 2 +- main.py | 689 ++++++--------------------------------- src/localagi/__init__.py | 1 + src/localagi/localagi.py | 592 +++++++++++++++++++++++++++++++++ 4 files changed, 686 insertions(+), 598 deletions(-) create mode 100644 src/localagi/__init__.py create mode 100644 src/localagi/localagi.py diff --git a/Dockerfile b/Dockerfile index 421b35d..8411242 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,5 +14,5 @@ RUN apt-get update -y && \ apt-get clean COPY . /app - +RUN pip install . ENTRYPOINT [ "python", "./main.py" ]; \ No newline at end of file diff --git a/main.py b/main.py index f1b2f9a..014bded 100644 --- a/main.py +++ b/main.py @@ -2,15 +2,14 @@ import openai #from langchain.embeddings import HuggingFaceEmbeddings from langchain.embeddings import LocalAIEmbeddings import uuid -import requests -import ast import sys -from contextlib import redirect_stdout + +from localagi import LocalAGI from loguru import logger from ascii_magic import AsciiArt from duckduckgo_search import DDGS -from typing import Dict, List, Optional -import subprocess +from typing import Dict, List +import os # these three lines swap the stdlib sqlite3 lib with the pysqlite3 package for chroma __import__('pysqlite3') @@ -97,6 +96,24 @@ parser.add_argument('--debug', dest='debug', action='store_true', default=False, # Parse arguments args = parser.parse_args() +STABLEDIFFUSION_MODEL = os.environ.get("STABLEDIFFUSION_MODEL", args.stablediffusion_model) +STABLEDIFFUSION_PROMPT = os.environ.get("STABLEDIFFUSION_PROMPT", args.stablediffusion_prompt) +FUNCTIONS_MODEL = os.environ.get("FUNCTIONS_MODEL", args.functions_model) +EMBEDDINGS_MODEL = os.environ.get("EMBEDDINGS_MODEL", args.embeddings_model) +LLM_MODEL = os.environ.get("LLM_MODEL", args.llm_model) +VOICE_MODEL= os.environ.get("TTS_MODEL",args.tts_model) +STABLEDIFFUSION_MODEL = os.environ.get("STABLEDIFFUSION_MODEL",args.stablediffusion_model) +STABLEDIFFUSION_PROMPT = os.environ.get("STABLEDIFFUSION_PROMPT", args.stablediffusion_prompt) +PERSISTENT_DIR = os.environ.get("PERSISTENT_DIR", "/data") +SYSTEM_PROMPT = "" +if os.environ.get("SYSTEM_PROMPT") or args.system_prompt: + SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", args.system_prompt) + +LOCALAI_API_BASE = args.localai_api_base +TTS_API_BASE = args.tts_api_base +IMAGE_API_BASE = args.images_api_base +EMBEDDINGS_API_BASE = args.embeddings_api_base + # Set log level LOG_LEVEL = "INFO" @@ -134,125 +151,12 @@ embeddings = LocalAIEmbeddings(model=EMBEDDINGS_MODEL,openai_api_base=EMBEDDINGS chroma_client = Chroma(collection_name="memories", persist_directory="db", embedding_function=embeddings) # Function to create images with LocalAI -def display_avatar(input_text=STABLEDIFFUSION_PROMPT, model=STABLEDIFFUSION_MODEL): - response = openai.Image.create( - prompt=input_text, - n=1, - size="128x128", - api_base=IMAGE_API_BASE+"/v1" - ) - image_url = response['data'][0]['url'] +def display_avatar(agi, input_text=STABLEDIFFUSION_PROMPT, model=STABLEDIFFUSION_MODEL): + image_url = agi.get_avatar(input_text, model) # convert the image to ascii art my_art = AsciiArt.from_url(image_url) my_art.to_terminal() -# Function to create audio with LocalAI -def tts(input_text, model=VOICE_MODEL): - # strip newlines from text - input_text = input_text.replace("\n", ".") - # Create a temp file to store the audio output - output_file_path = '/tmp/output.wav' - # get from OPENAI_API_BASE env var - url = TTS_API_BASE + '/tts' - headers = {'Content-Type': 'application/json'} - data = { - "input": input_text, - "model": model - } - - response = requests.post(url, headers=headers, data=json.dumps(data)) - - if response.status_code == 200: - with open(output_file_path, 'wb') as f: - f.write(response.content) - logger.info('Audio file saved successfully:', output_file_path) - else: - logger.info('Request failed with status code', response.status_code) - - try: - # Use aplay to play the audio - os.system('aplay ' + output_file_path) - # remove the audio file - os.remove(output_file_path) - except: - logger.info('Unable to play audio') - -# Function to analyze the user input and pick the next action to do -def needs_to_do_action(user_input,agent_actions={}): - - # Get the descriptions and the actions name (the keys) - descriptions=action_description("", agent_actions) - - messages = [ - {"role": "user", - "content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform and the reasoning. -{descriptions}"""}, - {"role": "user", - "content": f"""{user_input} -Function call: """ - } - ] - functions = [ - { - "name": "intent", - "description": """Decide to do an action.""", - "parameters": { - "type": "object", - "properties": { - "confidence": { - "type": "number", - "description": "confidence of the action" - }, - "reasoning": { - "type": "string", - "description": "reasoning behind the intent" - }, - # "observation": { - # "type": "string", - # "description": "reasoning behind the intent" - # }, - "action": { - "type": "string", - "enum": list(agent_actions.keys()), - "description": "user intent" - }, - }, - "required": ["action"] - } - }, - ] - response = openai.ChatCompletion.create( - #model="gpt-3.5-turbo", - model=FUNCTIONS_MODEL, - messages=messages, - request_timeout=1200, - functions=functions, - api_base=LOCALAI_API_BASE+"/v1", - stop=None, - temperature=0.1, - #function_call="auto" - function_call={"name": "intent"}, - ) - response_message = response["choices"][0]["message"] - if response_message.get("function_call"): - function_name = response.choices[0].message["function_call"].name - function_parameters = response.choices[0].message["function_call"].arguments - # read the json from the string - res = json.loads(function_parameters) - logger.debug(">>> function name: "+function_name) - logger.debug(">>> function parameters: "+function_parameters) - return res - return {"action": REPLY_ACTION} - -# This is used to collect the descriptions of the agent actions, used to populate the LLM prompt -def action_description(action, agent_actions): - descriptions="" - # generate descriptions of actions that the agent can pick - for a in agent_actions: - if ( action != "" and action == a ) or (action == ""): - descriptions+=agent_actions[a]["description"]+"\n" - return descriptions - ## This function is called to ask the user if does agree on the action to take and execute def ask_user_confirmation(action_name, action_parameters): logger.info("==> Ask user confirmation") @@ -268,186 +172,11 @@ def ask_user_confirmation(action_name, action_parameters): logger.info("==> Skipping action") return False -### This function is used to process the functions given a user input. -### It picks a function, executes it and returns the list of messages containing the result. -def process_functions(user_input, action="", agent_actions={}): - - descriptions=action_description(action, agent_actions) - - messages = [ - # {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", - "content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform, including reasoning, and the confidence interval from 0 to 100. -{descriptions}"""}, - {"role": "user", - "content": f"""{user_input} -Function call: """ - } - ] - response = function_completion(messages, action=action,agent_actions=agent_actions) - response_message = response["choices"][0]["message"] - response_result = "" - function_result = {} - if response_message.get("function_call"): - function_name = response.choices[0].message["function_call"].name - function_parameters = response.choices[0].message["function_call"].arguments - logger.debug("==> function parameters: {function_parameters}",function_parameters=function_parameters) - function_to_call = agent_actions[function_name]["function"] - - function_result = function_to_call(function_parameters, agent_actions=agent_actions) - logger.info("==> function result: {function_result}", function_result=function_result) - messages.append( - { - "role": "assistant", - "content": None, - "function_call": {"name": function_name, "arguments": function_parameters,}, - } - ) - messages.append( - { - "role": "function", - "name": function_name, - "content": str(function_result) - } - ) - return messages, function_result - -### function_completion is used to autocomplete functions given a list of messages -def function_completion(messages, action="", agent_actions={}): - function_call = "auto" - if action != "": - function_call={"name": action} - logger.debug("==> function name: {function_call}", function_call=function_call) - # get the functions from the signatures of the agent actions, if exists - functions = [] - for action in agent_actions: - if agent_actions[action].get("signature"): - functions.append(agent_actions[action]["signature"]) - response = openai.ChatCompletion.create( - #model="gpt-3.5-turbo", - model=FUNCTIONS_MODEL, - messages=messages, - functions=functions, - request_timeout=1200, - stop=None, - api_base=LOCALAI_API_BASE+"/v1", - temperature=0.1, - function_call=function_call - ) - - return response - -# Rework the content of each message in the history in a way that is understandable by the LLM -# TODO: switch to templates (?) -def process_history(conversation_history): - messages = "" - for message in conversation_history: - # if there is content append it - if message.get("content") and message["role"] == "function": - messages+="Function result: \n" + message["content"]+"\n" - elif message.get("function_call"): - # encode message["function_call" to json and appends it - fcall = json.dumps(message["function_call"]) - parameters = "calling " + message["function_call"]["name"]+" with arguments:" - args=json.loads(message["function_call"]["arguments"]) - for arg in args: - logger.debug(arg) - logger.debug(args) - v=args[arg] - parameters+=f""" {arg}=\"{v}\"""" - messages+= parameters+"\n" - elif message.get("content") and message["role"] == "user": - messages+=message["content"]+"\n" - elif message.get("content") and message["role"] == "assistant": - messages+="Assistant message: "+message["content"]+"\n" - return messages - -def converse(responses): - response = openai.ChatCompletion.create( - model=LLM_MODEL, - messages=responses, - stop=None, - api_base=LOCALAI_API_BASE+"/v1", - request_timeout=1200, - temperature=0.1, - ) - responses.append( - { - "role": "assistant", - "content": response.choices[0].message["content"], - } - ) - return responses - -### Fine tune a string before feeding into the LLM - -def analyze(responses, prefix="Analyze the following text highlighting the relevant information and identify a list of actions to take if there are any. If there are errors, suggest solutions to fix them", suffix=""): - string = process_history(responses) - messages = [] - - if prefix != "": - messages = [ - { - "role": "user", - "content": f"""{prefix}: - - ``` - {string} - ``` - """, - } - ] - else: - messages = [ - { - "role": "user", - "content": f"""{string}""", - } - ] - - if suffix != "": - messages[0]["content"]+=f"""{suffix}""" - - response = openai.ChatCompletion.create( - model=LLM_MODEL, - messages=messages, - stop=None, - api_base=LOCALAI_API_BASE+"/v1", - request_timeout=1200, - temperature=0.1, - ) - return response.choices[0].message["content"] - -def post_process(string): - messages = [ - { - "role": "user", - "content": f"""Summarize the following text, keeping the relevant information: - -``` -{string} -``` -""", - } - ] - logger.info("==> Post processing: {string}", string=string) - # get the response from the model - response = openai.ChatCompletion.create( - model=LLM_MODEL, - messages=messages, - api_base=LOCALAI_API_BASE+"/v1", - stop=None, - temperature=0.1, - request_timeout=1200, - ) - result = response["choices"][0]["message"]["content"] - logger.info("==> Processed: {string}", string=result) - return result ### Agent capabilities ### These functions are called by the agent to perform actions ### -def save(memory, agent_actions={}): +def save(memory, agent_actions={}, localagi=None): q = json.loads(memory) logger.info(">>> saving to memories: ") logger.info(q["content"]) @@ -455,7 +184,7 @@ def save(memory, agent_actions={}): chroma_client.persist() return f"The object was saved permanently to memory." -def search_memory(query, agent_actions={}): +def search_memory(query, agent_actions={}, localagi=None): q = json.loads(query) docs = chroma_client.similarity_search(q["reasoning"]) text_res="Memories found in the database:\n" @@ -465,87 +194,11 @@ def search_memory(query, agent_actions={}): #if args.postprocess: # return post_process(text_res) #return text_res - return post_process(text_res) - -def generate_plan(user_input, agent_actions={}): - res = json.loads(user_input) - logger.info("--> Calculating plan: {description}", description=res["description"]) - descriptions=action_description("",agent_actions) - - plan_message = "The assistant replies with a plan to answer the request with a list of subtasks with logical steps. The reasoning includes a self-contained, detailed and descriptive instruction to fullfill the task." - if args.plan_message: - plan_message = args.plan_message - # plan_message = "The assistant replies with a plan of 3 steps to answer the request with a list of subtasks with logical steps. The reasoning includes a self-contained, detailed and descriptive instruction to fullfill the task." - - messages = [ - {"role": "user", - "content": f"""Transcript of AI assistant responding to user requests. -{descriptions} - -Request: {plan_message} -Thought: {res["description"]} -Function call: """ - } - ] - # get list of plannable actions - plannable_actions = [] - for action in agent_actions: - if agent_actions[action]["plannable"]: - # append the key of the dict to plannable_actions - plannable_actions.append(action) - - functions = [ - { - "name": "plan", - "description": """Decide to do an action.""", - "parameters": { - "type": "object", - "properties": { - "subtasks": { - "type": "array", - "items": { - "type": "object", - "properties": { - "reasoning": { - "type": "string", - "description": "subtask list", - }, - "function": { - "type": "string", - "enum": plannable_actions, - }, - }, - }, - }, - }, - "required": ["subtasks"] - } - }, - ] - response = openai.ChatCompletion.create( - #model="gpt-3.5-turbo", - model=FUNCTIONS_MODEL, - messages=messages, - functions=functions, - api_base=LOCALAI_API_BASE+"/v1", - stop=None, - temperature=0.1, - #function_call="auto" - function_call={"name": "plan"}, - ) - response_message = response["choices"][0]["message"] - if response_message.get("function_call"): - function_name = response.choices[0].message["function_call"].name - function_parameters = response.choices[0].message["function_call"].arguments - # read the json from the string - res = json.loads(function_parameters) - logger.debug("<<< function name: {function_name} >>>> parameters: {parameters}", function_name=function_name,parameters=function_parameters) - return res - return {"action": REPLY_ACTION} + return localagi.post_process(text_res) # write file to disk with content -def write_file(arg, agent_actions={}): +def write_file(arg, agent_actions={}, localagi=None): arg = json.loads(arg) filename = arg["filename"] content = arg["content"] @@ -605,7 +258,7 @@ def ddg(query: str, num_results: int, backend: str = "api") -> List[Dict[str, st return formatted_results ## Search on duckduckgo -def search_duckduckgo(a, agent_actions={}): +def search_duckduckgo(a, agent_actions={}, localagi=None): a = json.loads(a) list=ddg(a["query"], args.search_results) @@ -622,153 +275,6 @@ def search_duckduckgo(a, agent_actions={}): ### End Agent capabilities ### -### Main evaluate function -### This function evaluates the user input and the conversation history. -### It returns the conversation history with the latest response from the assistant. -def evaluate(user_input, conversation_history = [],re_evaluate=False, agent_actions={},re_evaluation_in_progress=False, postprocess=False, subtaskContext=False, processed_messages=0): - - messages = [ - { - "role": "user", - "content": user_input, - } - ] - - conversation_history.extend(messages) - - # pulling the old history make the context grow exponentially - # and most importantly it repeates the first message with the commands again and again. - # it needs a bit of cleanup and process the messages and piggyback more LocalAI functions templates - # old_history = process_history(conversation_history) - # action_picker_message = "Conversation history:\n"+old_history - # action_picker_message += "\n" - action_picker_message = "Request: "+user_input - - picker_actions = agent_actions - if args.force_action: - aa = {} - aa[args.force_action] = agent_actions[args.force_action] - picker_actions = aa - logger.info("==> Forcing action to '{action}' as requested by the user", action=args.force_action) - - #if re_evaluate and not re_evaluation_in_progress: - # observation = analyze(conversation_history, prefix=True) - # action_picker_message+="\n\Thought: "+observation[-1]["content"] - if re_evaluation_in_progress: - observation = analyze(conversation_history) - action_picker_message="Decide from the output below if we have to do another action:\n" - action_picker_message+="```\n"+user_input+"\n```" - action_picker_message+="\n\nObservation: "+observation - # if there is no action to do, we can just reply to the user with REPLY_ACTION - try: - action = needs_to_do_action(action_picker_message,agent_actions=picker_actions) - except Exception as e: - logger.error("==> error: ") - logger.error(e) - action = {"action": REPLY_ACTION} - - if action["action"] != REPLY_ACTION: - logger.info("==> LocalAGI wants to call '{action}'", action=action["action"]) - #logger.info("==> Observation '{reasoning}'", reasoning=action["observation"]) - logger.info("==> Reasoning '{reasoning}'", reasoning=action["reasoning"]) - # Force executing a plan instead - - reasoning = action["reasoning"] - if action["action"] == PLAN_ACTION: - logger.info("==> LocalAGI wants to create a plan that involves more actions ") - - #if postprocess: - #reasoning = post_process(reasoning) - function_completion_message="" - if processed_messages > 0: - function_completion_message += process_history(conversation_history)+"\n" - function_completion_message += "Request: "+user_input+"\nReasoning: "+reasoning - responses, function_results = process_functions(function_completion_message, action=action["action"], agent_actions=agent_actions) - # if there are no subtasks, we can just reply, - # otherwise we execute the subtasks - # First we check if it's an object - if isinstance(function_results, dict) and function_results.get("subtasks") and len(function_results["subtasks"]) > 0: - # cycle subtasks and execute functions - subtask_result="" - for subtask in function_results["subtasks"]: - #ctr="Context: "+user_input+"\nThought: "+action["reasoning"]+ "\nRequest: "+subtask["reasoning"] - #cr="Request: "+user_input+"\n" - cr="" - if subtask_result != "" and subtaskContext: - # Include cumulative results of previous subtasks - # TODO: this grows context, maybe we should use a different approach or summarize - ##if postprocess: - ## cr+= "Subtask results: "+post_process(subtask_result)+"\n" - ##else: - cr+="\n"+subtask_result+"\n" - subtask_reasoning = subtask["reasoning"] - cr+="Reasoning: "+action["reasoning"]+ "\n" - cr+="\nFunction to call:" +subtask["function"]+"\n" - logger.info("==> subtask '{subtask}' ({reasoning})", subtask=subtask["function"], reasoning=subtask_reasoning) - if postprocess: - cr+= "Assistant: "+post_process(subtask_reasoning) - else: - cr+= "Assistant: "+subtask_reasoning - subtask_response, function_results = process_functions(cr, subtask["function"],agent_actions=agent_actions) - subtask_result+=str(function_results)+"\n" - # if postprocess: - # subtask_result=post_process(subtask_result) - responses.append(subtask_response[-1]) - if re_evaluate: - ## Better output or this infinite loops.. - logger.info("-> Re-evaluate if another action is needed") - ## ? conversation history should go after the user_input maybe? - re_eval = "" - # This is probably not needed as already in the history: - #re_eval = user_input +"\n" - #re_eval += "Conversation history: \n" - if postprocess: - re_eval+= post_process(process_history(responses[1:])) +"\n" - else: - re_eval+= process_history(responses[1:]) +"\n" - responses = evaluate(re_eval, responses, re_evaluate,agent_actions=agent_actions,re_evaluation_in_progress=True) - - if re_evaluation_in_progress: - conversation_history.extend(responses) - return conversation_history - - # unwrap the list of responses - conversation_history.append(responses[-1]) - - #responses = converse(responses) - - # TODO: this needs to be optimized - responses = analyze(responses[1:], suffix=f"Return an appropriate answer given the context above\n") - - # add responses to conversation history by extending the list - conversation_history.append( - { - "role": "assistant", - "content": responses, - } - ) - # logger.info the latest response from the conversation history - logger.info(conversation_history[-1]["content"]) - tts(conversation_history[-1]["content"]) - else: - logger.info("==> no action needed") - - if re_evaluation_in_progress: - logger.info("==> LocalAGI has completed the user request") - logger.info("==> LocalAGI will reply to the user") - return conversation_history - - # get the response from the model - response = converse(conversation_history) - - # add the response to the conversation history by extending the list - conversation_history.extend(response) - # logger.info the latest response from the conversation history - logger.info(conversation_history[-1]["content"]) - tts(conversation_history[-1]["content"]) - return conversation_history - - ### Agent action definitions agent_actions = { "search_internet": { @@ -849,89 +355,78 @@ agent_actions = { } }, }, - PLAN_ACTION: { - "function": generate_plan, - "plannable": False, - "description": 'The assistant for solving complex tasks that involves calling more functions in sequence, replies with the action "'+PLAN_ACTION+'".', - "signature": { - "name": PLAN_ACTION, - "description": """Plan complex tasks.""", - "parameters": { - "type": "object", - "properties": { - "description": { - "type": "string", - "description": "reasoning behind the planning" - }, - }, - "required": ["description"] - } - }, - }, - REPLY_ACTION: { - "function": None, - "plannable": False, - "description": 'For replying to the user, the assistant replies with the action "'+REPLY_ACTION+'" and the reply to the user directly when there is nothing to do.', - }, } -conversation_history = [] +if __name__ == "__main__": + conversation_history = [] -# Set a system prompt if SYSTEM_PROMPT is set -if SYSTEM_PROMPT != "": - conversation_history.append({ - "role": "system", - "content": SYSTEM_PROMPT - }) - -logger.info("Welcome to LocalAGI") - -# Skip avatar creation if --skip-avatar is set -if not args.skip_avatar: - logger.info("Creating avatar, please wait...") - display_avatar() - -actions = "" -for action in agent_actions: - actions+=" '"+action+"'" -logger.info("LocalAGI internally can do the following actions:{actions}", actions=actions) - -if not args.prompt: - logger.info(">>> Interactive mode <<<") -else: - logger.info(">>> Prompt mode <<<") - logger.info(args.prompt) - -processed_messages = 0 -# IF in prompt mode just evaluate, otherwise loop -if args.prompt: - conversation_history=evaluate( - args.prompt, - conversation_history, - re_evaluate=args.re_evaluate, + # Create a LocalAGI instance + logger.info("Creating LocalAGI instance") + localagi = LocalAGI( agent_actions=agent_actions, - # Enable to lower context usage but increases LLM calls - postprocess=args.postprocess, - subtaskContext=args.subtaskContext, - processed_messages=processed_messages, - ) - processed_messages+=1 + embeddings_model=EMBEDDINGS_MODEL, + embeddings_api_base=EMBEDDINGS_API_BASE, + llm_model=LLM_MODEL, + tts_model=VOICE_MODEL, + tts_api_base=TTS_API_BASE, + functions_model=FUNCTIONS_MODEL, + api_base=LOCALAI_API_BASE, + stablediffusion_api_base=IMAGE_API_BASE, + stablediffusion_model=STABLEDIFFUSION_MODEL, + force_action=args.force_action, + plan_message=args.plan_message, + ) -if not args.prompt or args.interactive: - # TODO: process functions also considering the conversation history? conversation history + input - logger.info(">>> Ready! What can I do for you? ( try with: plan a roadtrip to San Francisco ) <<<") + # Set a system prompt if SYSTEM_PROMPT is set + if SYSTEM_PROMPT != "": + conversation_history.append({ + "role": "system", + "content": SYSTEM_PROMPT + }) - while True: - user_input = input(">>> ") - # we are going to use the args to change the evaluation behavior - conversation_history=evaluate( - user_input, + logger.info("Welcome to LocalAGI") + + # Skip avatar creation if --skip-avatar is set + if not args.skip_avatar: + logger.info("Creating avatar, please wait...") + display_avatar(localagi) + + actions = "" + for action in agent_actions: + actions+=" '"+action+"'" + logger.info("LocalAGI internally can do the following actions:{actions}", actions=actions) + + if not args.prompt: + logger.info(">>> Interactive mode <<<") + else: + logger.info(">>> Prompt mode <<<") + logger.info(args.prompt) + + # IF in prompt mode just evaluate, otherwise loop + if args.prompt: + conversation_history=localagi.evaluate( + args.prompt, conversation_history, re_evaluate=args.re_evaluate, - agent_actions=agent_actions, # Enable to lower context usage but increases LLM calls postprocess=args.postprocess, subtaskContext=args.subtaskContext, - processed_messages=processed_messages, ) - processed_messages+=1 \ No newline at end of file + localagi.tts_play(conversation_history[-1]["content"]) + + if not args.prompt or args.interactive: + # TODO: process functions also considering the conversation history? conversation history + input + logger.info(">>> Ready! What can I do for you? ( try with: plan a roadtrip to San Francisco ) <<<") + + while True: + user_input = input(">>> ") + # we are going to use the args to change the evaluation behavior + conversation_history=localagi.evaluate( + user_input, + conversation_history, + re_evaluate=args.re_evaluate, + # Enable to lower context usage but increases LLM calls + postprocess=args.postprocess, + subtaskContext=args.subtaskContext, + ) + localagi.tts_play(conversation_history[-1]["content"]) diff --git a/src/localagi/__init__.py b/src/localagi/__init__.py new file mode 100644 index 0000000..5bb5e2c --- /dev/null +++ b/src/localagi/__init__.py @@ -0,0 +1 @@ +from .localagi import * \ No newline at end of file diff --git a/src/localagi/localagi.py b/src/localagi/localagi.py new file mode 100644 index 0000000..be53e3a --- /dev/null +++ b/src/localagi/localagi.py @@ -0,0 +1,592 @@ +import os +import openai +import requests +from loguru import logger +import json + +DEFAULT_API_BASE = "http://api:8080" +VOICE_MODEL = "en-us-kathleen-low.onnx" +STABLEDIFFUSION_MODEL = "stablediffusion" +FUNCTIONS_MODEL = "functions" +EMBEDDINGS_MODEL = "all-MiniLM-L6-v2" +LLM_MODEL = "gpt-4" + +# LocalAGI class +class LocalAGI: + # Constructor + def __init__(self, + plan_action="plan", + reply_action="reply", + force_action="", + agent_actions={}, + plan_message="", + api_base=DEFAULT_API_BASE, + tts_api_base="", + stablediffusion_api_base="", + embeddings_api_base="", + tts_model=VOICE_MODEL, + stablediffusion_model=STABLEDIFFUSION_MODEL, + functions_model=FUNCTIONS_MODEL, + embeddings_model=EMBEDDINGS_MODEL, + llm_model=LLM_MODEL, + tts_player="aplay", + ): + self.api_base = api_base + self.agent_actions = agent_actions + self.plan_message = plan_message + self.force_action = force_action + self.processed_messages=0 + self.tts_player = tts_player + self.agent_actions[plan_action] = { + "function": self.generate_plan, + "plannable": False, + "description": 'The assistant for solving complex tasks that involves calling more functions in sequence, replies with the action "'+plan_action+'".', + "signature": { + "name": plan_action, + "description": """Plan complex tasks.""", + "parameters": { + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "reasoning behind the planning" + }, + }, + "required": ["description"] + } + }, + } + self.agent_actions[reply_action] = { + "function": None, + "plannable": False, + "description": 'For replying to the user, the assistant replies with the action "'+reply_action+'" and the reply to the user directly when there is nothing to do.', + } + self.tts_api_base = tts_api_base if tts_api_base else self.api_base + self.stablediffusion_api_base = stablediffusion_api_base if stablediffusion_api_base else self.api_base + self.embeddings_api_base = embeddings_api_base if embeddings_api_base else self.api_base + self.tts_model = tts_model + self.stablediffusion_model = stablediffusion_model + self.functions_model = functions_model + self.embeddings_model = embeddings_model + self.llm_model = llm_model + self.reply_action = reply_action + # Function to create images with LocalAI + def get_avatar(self, input_text): + response = openai.Image.create( + prompt=input_text, + n=1, + size="128x128", + api_base=self.sta+"/v1" + ) + return response['data'][0]['url'] + + def tts_play(self, input_text): + output_file_path = '/tmp/output.wav' + self.tts(input_text, output_file_path) + try: + # Use aplay to play the audio + os.system(f"{self.tts_player} {output_file_path}") + # remove the audio file + os.remove(output_file_path) + except: + logger.info('Unable to play audio') + + # Function to create audio with LocalAI + def tts(self, input_text, output_file_path): + # strip newlines from text + input_text = input_text.replace("\n", ".") + + # get from OPENAI_API_BASE env var + url = self.tts_api_base + '/tts' + headers = {'Content-Type': 'application/json'} + data = { + "input": input_text, + "model": self.tts_model, + } + + response = requests.post(url, headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + with open(output_file_path, 'wb') as f: + f.write(response.content) + logger.info('Audio file saved successfully:', output_file_path) + else: + logger.info('Request failed with status code', response.status_code) + + # Function to analyze the user input and pick the next action to do + def needs_to_do_action(self, user_input, agent_actions={}): + if len(agent_actions) == 0: + agent_actions = self.agent_actions + # Get the descriptions and the actions name (the keys) + descriptions=self.action_description("", agent_actions) + + messages = [ + {"role": "user", + "content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform and the reasoning. + {descriptions}"""}, + {"role": "user", + "content": f"""{user_input} + Function call: """ + } + ] + functions = [ + { + "name": "intent", + "description": """Decide to do an action.""", + "parameters": { + "type": "object", + "properties": { + "confidence": { + "type": "number", + "description": "confidence of the action" + }, + "reasoning": { + "type": "string", + "description": "reasoning behind the intent" + }, + # "observation": { + # "type": "string", + # "description": "reasoning behind the intent" + # }, + "action": { + "type": "string", + "enum": list(agent_actions.keys()), + "description": "user intent" + }, + }, + "required": ["action"] + } + }, + ] + response = openai.ChatCompletion.create( + #model="gpt-3.5-turbo", + model=self.functions_model, + messages=messages, + request_timeout=1200, + functions=functions, + api_base=self.api_base+"/v1", + stop=None, + temperature=0.1, + #function_call="auto" + function_call={"name": "intent"}, + ) + response_message = response["choices"][0]["message"] + if response_message.get("function_call"): + function_name = response.choices[0].message["function_call"].name + function_parameters = response.choices[0].message["function_call"].arguments + # read the json from the string + res = json.loads(function_parameters) + logger.debug(">>> function name: "+function_name) + logger.debug(">>> function parameters: "+function_parameters) + return res + return {"action": self.reply_action} + + # This is used to collect the descriptions of the agent actions, used to populate the LLM prompt + def action_description(self, action, agent_actions): + descriptions="" + # generate descriptions of actions that the agent can pick + for a in agent_actions: + if ( action != "" and action == a ) or (action == ""): + descriptions+=agent_actions[a]["description"]+"\n" + return descriptions + + + ### This function is used to process the functions given a user input. + ### It picks a function, executes it and returns the list of messages containing the result. + def process_functions(self, user_input, action="",): + + descriptions=self.action_description(action, self.agent_actions) + + messages = [ + # {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", + "content": f"""Transcript of AI assistant responding to user requests. Replies with the action to perform, including reasoning, and the confidence interval from 0 to 100. + {descriptions}"""}, + {"role": "user", + "content": f"""{user_input} + Function call: """ + } + ] + response = self.function_completion(messages, action=action) + response_message = response["choices"][0]["message"] + response_result = "" + function_result = {} + if response_message.get("function_call"): + function_name = response.choices[0].message["function_call"].name + function_parameters = response.choices[0].message["function_call"].arguments + logger.debug("==> function parameters: {function_parameters}",function_parameters=function_parameters) + function_to_call = self.agent_actions[function_name]["function"] + + function_result = function_to_call(function_parameters, agent_actions=self.agent_actions, localagi=self) + logger.info("==> function result: {function_result}", function_result=function_result) + messages.append( + { + "role": "assistant", + "content": None, + "function_call": {"name": function_name, "arguments": function_parameters,}, + } + ) + messages.append( + { + "role": "function", + "name": function_name, + "content": str(function_result) + } + ) + return messages, function_result + + ### function_completion is used to autocomplete functions given a list of messages + def function_completion(self, messages, action=""): + function_call = "auto" + if action != "": + function_call={"name": action} + logger.debug("==> function name: {function_call}", function_call=function_call) + # get the functions from the signatures of the agent actions, if exists + functions = [] + for action in self.agent_actions: + if self.agent_actions[action].get("signature"): + functions.append(self.agent_actions[action]["signature"]) + response = openai.ChatCompletion.create( + #model="gpt-3.5-turbo", + model=self.functions_model, + messages=messages, + functions=functions, + request_timeout=1200, + stop=None, + api_base=self.api_base+"/v1", + temperature=0.1, + function_call=function_call + ) + + return response + + # Rework the content of each message in the history in a way that is understandable by the LLM + # TODO: switch to templates (?) + def process_history(self, conversation_history): + messages = "" + for message in conversation_history: + # if there is content append it + if message.get("content") and message["role"] == "function": + messages+="Function result: \n" + message["content"]+"\n" + elif message.get("function_call"): + # encode message["function_call" to json and appends it + fcall = json.dumps(message["function_call"]) + parameters = "calling " + message["function_call"]["name"]+" with arguments:" + args=json.loads(message["function_call"]["arguments"]) + for arg in args: + logger.debug(arg) + logger.debug(args) + v=args[arg] + parameters+=f""" {arg}=\"{v}\"""" + messages+= parameters+"\n" + elif message.get("content") and message["role"] == "user": + messages+=message["content"]+"\n" + elif message.get("content") and message["role"] == "assistant": + messages+="Assistant message: "+message["content"]+"\n" + return messages + + def converse(self, responses): + response = openai.ChatCompletion.create( + model=self.llm_model, + messages=responses, + stop=None, + api_base=self.api_base+"/v1", + request_timeout=1200, + temperature=0.1, + ) + responses.append( + { + "role": "assistant", + "content": response.choices[0].message["content"], + } + ) + return responses + + ### Fine tune a string before feeding into the LLM + + def analyze(self, responses, prefix="Analyze the following text highlighting the relevant information and identify a list of actions to take if there are any. If there are errors, suggest solutions to fix them", suffix=""): + string = self.process_history(responses) + messages = [] + + if prefix != "": + messages = [ + { + "role": "user", + "content": f"""{prefix}: + + ``` + {string} + ``` + """, + } + ] + else: + messages = [ + { + "role": "user", + "content": f"""{string}""", + } + ] + + if suffix != "": + messages[0]["content"]+=f"""{suffix}""" + + response = openai.ChatCompletion.create( + model=self.llm_model, + messages=messages, + stop=None, + api_base=self.api_base+"/v1", + request_timeout=1200, + temperature=0.1, + ) + return response.choices[0].message["content"] + + def post_process(self, string): + messages = [ + { + "role": "user", + "content": f"""Summarize the following text, keeping the relevant information: + + ``` + {string} + ``` + """, + } + ] + logger.info("==> Post processing: {string}", string=string) + # get the response from the model + response = openai.ChatCompletion.create( + model=self.llm_model, + messages=messages, + api_base=self.api_base+"/v1", + stop=None, + temperature=0.1, + request_timeout=1200, + ) + result = response["choices"][0]["message"]["content"] + logger.info("==> Processed: {string}", string=result) + return result + + def generate_plan(self, user_input, agent_actions={}, localagi=None): + res = json.loads(user_input) + logger.info("--> Calculating plan: {description}", description=res["description"]) + descriptions=self.action_description("",agent_actions) + + plan_message = "The assistant replies with a plan to answer the request with a list of subtasks with logical steps. The reasoning includes a self-contained, detailed and descriptive instruction to fullfill the task." + if self.plan_message: + plan_message = self.plan_message + # plan_message = "The assistant replies with a plan of 3 steps to answer the request with a list of subtasks with logical steps. The reasoning includes a self-contained, detailed and descriptive instruction to fullfill the task." + + messages = [ + {"role": "user", + "content": f"""Transcript of AI assistant responding to user requests. + {descriptions} + + Request: {plan_message} + Thought: {res["description"]} + Function call: """ + } + ] + # get list of plannable actions + plannable_actions = [] + for action in agent_actions: + if agent_actions[action]["plannable"]: + # append the key of the dict to plannable_actions + plannable_actions.append(action) + + functions = [ + { + "name": "plan", + "description": """Decide to do an action.""", + "parameters": { + "type": "object", + "properties": { + "subtasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "reasoning": { + "type": "string", + "description": "subtask list", + }, + "function": { + "type": "string", + "enum": plannable_actions, + }, + }, + }, + }, + }, + "required": ["subtasks"] + } + }, + ] + response = openai.ChatCompletion.create( + #model="gpt-3.5-turbo", + model=self.functions_model, + messages=messages, + functions=functions, + api_base=self.api_base+"/v1", + stop=None, + temperature=0.1, + #function_call="auto" + function_call={"name": "plan"}, + ) + response_message = response["choices"][0]["message"] + if response_message.get("function_call"): + function_name = response.choices[0].message["function_call"].name + function_parameters = response.choices[0].message["function_call"].arguments + # read the json from the string + res = json.loads(function_parameters) + logger.debug("<<< function name: {function_name} >>>> parameters: {parameters}", function_name=function_name,parameters=function_parameters) + return res + return {"action": self.reply_action} + + def evaluate(self,user_input, conversation_history = [],re_evaluate=False,re_evaluation_in_progress=False, postprocess=False, subtaskContext=False): + messages = [ + { + "role": "user", + "content": user_input, + } + ] + + conversation_history.extend(messages) + + # pulling the old history make the context grow exponentially + # and most importantly it repeates the first message with the commands again and again. + # it needs a bit of cleanup and process the messages and piggyback more LocalAI functions templates + # old_history = process_history(conversation_history) + # action_picker_message = "Conversation history:\n"+old_history + # action_picker_message += "\n" + action_picker_message = "Request: "+user_input + + picker_actions = self.agent_actions + if self.force_action: + aa = {} + aa[self.force_action] = self.agent_actions[self.force_action] + picker_actions = aa + logger.info("==> Forcing action to '{action}' as requested by the user", action=self.force_action) + + #if re_evaluate and not re_evaluation_in_progress: + # observation = analyze(conversation_history, prefix=True) + # action_picker_message+="\n\Thought: "+observation[-1]["content"] + if re_evaluation_in_progress: + observation = self.analyze(conversation_history) + action_picker_message="Decide from the output below if we have to do another action:\n" + action_picker_message+="```\n"+user_input+"\n```" + action_picker_message+="\n\nObservation: "+observation + # if there is no action to do, we can just reply to the user with REPLY_ACTION + try: + action = self.needs_to_do_action(action_picker_message,agent_actions=picker_actions) + except Exception as e: + logger.error("==> error: ") + logger.error(e) + action = {"action": self.reply_action} + + if action["action"] != self.reply_action: + logger.info("==> LocalAGI wants to call '{action}'", action=action["action"]) + #logger.info("==> Observation '{reasoning}'", reasoning=action["observation"]) + logger.info("==> Reasoning '{reasoning}'", reasoning=action["reasoning"]) + # Force executing a plan instead + + reasoning = action["reasoning"] + if action["action"] == self.reply_action: + logger.info("==> LocalAGI wants to create a plan that involves more actions ") + + #if postprocess: + #reasoning = post_process(reasoning) + function_completion_message="" + if self.processed_messages > 0: + function_completion_message += self.process_history(conversation_history)+"\n" + function_completion_message += "Request: "+user_input+"\nReasoning: "+reasoning + responses, function_results = self.process_functions(function_completion_message, action=action["action"]) + # if there are no subtasks, we can just reply, + # otherwise we execute the subtasks + # First we check if it's an object + if isinstance(function_results, dict) and function_results.get("subtasks") and len(function_results["subtasks"]) > 0: + # cycle subtasks and execute functions + subtask_result="" + for subtask in function_results["subtasks"]: + #ctr="Context: "+user_input+"\nThought: "+action["reasoning"]+ "\nRequest: "+subtask["reasoning"] + #cr="Request: "+user_input+"\n" + cr="" + if subtask_result != "" and subtaskContext: + # Include cumulative results of previous subtasks + # TODO: this grows context, maybe we should use a different approach or summarize + ##if postprocess: + ## cr+= "Subtask results: "+post_process(subtask_result)+"\n" + ##else: + cr+="\n"+subtask_result+"\n" + subtask_reasoning = subtask["reasoning"] + cr+="Reasoning: "+action["reasoning"]+ "\n" + cr+="\nFunction to call:" +subtask["function"]+"\n" + logger.info("==> subtask '{subtask}' ({reasoning})", subtask=subtask["function"], reasoning=subtask_reasoning) + if postprocess: + cr+= "Assistant: "+self.post_process(subtask_reasoning) + else: + cr+= "Assistant: "+subtask_reasoning + subtask_response, function_results = self.process_functions(cr, subtask["function"]) + subtask_result+=str(function_results)+"\n" + # if postprocess: + # subtask_result=post_process(subtask_result) + responses.append(subtask_response[-1]) + if re_evaluate: + ## Better output or this infinite loops.. + logger.info("-> Re-evaluate if another action is needed") + ## ? conversation history should go after the user_input maybe? + re_eval = "" + # This is probably not needed as already in the history: + #re_eval = user_input +"\n" + #re_eval += "Conversation history: \n" + if postprocess: + re_eval+= self.post_process(self.process_history(responses[1:])) +"\n" + else: + re_eval+= self.process_history(responses[1:]) +"\n" + responses = self.evaluate(re_eval, + responses, + re_evaluate, + re_evaluation_in_progress=True) + + if re_evaluation_in_progress: + conversation_history.extend(responses) + return conversation_history + + # unwrap the list of responses + conversation_history.append(responses[-1]) + + #responses = converse(responses) + + # TODO: this needs to be optimized + responses = self.analyze(responses[1:], suffix=f"Return an appropriate answer given the context above\n") + + # add responses to conversation history by extending the list + conversation_history.append( + { + "role": "assistant", + "content": responses, + } + ) + + self.processed_messages+=1 + # logger.info the latest response from the conversation history + logger.info(conversation_history[-1]["content"]) + #self.tts(conversation_history[-1]["content"]) + else: + logger.info("==> no action needed") + + if re_evaluation_in_progress: + logger.info("==> LocalAGI has completed the user request") + logger.info("==> LocalAGI will reply to the user") + return conversation_history + + # get the response from the model + response = self.converse(conversation_history) + self.processed_messages+=1 + + # add the response to the conversation history by extending the list + conversation_history.extend(response) + # logger.info the latest response from the conversation history + logger.info(conversation_history[-1]["content"]) + #self.tts(conversation_history[-1]["content"]) + return conversation_history \ No newline at end of file