Added initial API logic, crud for graph, connected vectordb and graph

This commit is contained in:
Vasilije 2023-11-16 00:07:16 +01:00
parent 35f46e6b28
commit 6914c86c72
5 changed files with 275 additions and 27 deletions

View file

@ -1,10 +1,67 @@
from langchain.prompts import ChatPromptTemplate
import json
#TO DO, ADD ALL CLASSIFIERS HERE
from langchain.chains import create_extraction_chain
from langchain.chat_models import ChatOpenAI
from ..config import Config
config = Config()
config.load()
OPENAI_API_KEY = config.openai_key
from langchain.document_loaders import TextLoader
from langchain.document_loaders import DirectoryLoader
async def classify_documents(query):
llm = ChatOpenAI(temperature=0, model=config.model)
prompt_classify = ChatPromptTemplate.from_template(
"""You are a summarizer and classifier. Determine what book this is and where does it belong in the output : {query}"""
)
json_structure = [{
"name": "summarizer",
"description": "Summarization and classification",
"parameters": {
"type": "object",
"properties": {
"DocumentCategory": {
"type": "string",
"description": "The classification of documents in groups such as legal, medical, etc."
},
"Title": {
"type": "string",
"description": "The title of the document"
},
"Summary": {
"type": "string",
"description": "The summary of the document"
}
}, "required": ["DocumentCategory", "Title", "Summary"] }
}]
chain_filter = prompt_classify | llm.bind(function_call={"name": "summarizer"}, functions=json_structure)
classifier_output = await chain_filter.ainvoke({"query": query})
arguments_str = classifier_output.additional_kwargs['function_call']['arguments']
print("This is the arguments string", arguments_str)
arguments_dict = json.loads(arguments_str)
classfier_value = arguments_dict.get('summarizer', None)
print("This is the classifier value", classfier_value)
return classfier_value
# classify retrievals according to type of retrieval
def classify_retrieval():
pass
@ -12,4 +69,4 @@ def classify_retrieval():
# classify documents according to type of document
def classify_call():
pass
pass

View file

@ -485,23 +485,14 @@ class Neo4jGraphDB(AbstractGraphDB):
def update_document_node_with_namespace(self, user_id: str, vectordb_namespace: str, document_title: str):
# Generate the Cypher query
cypher_query = '''
MATCH (user:User {userId: $user_id})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {title: $document_title})
SET document.vectordbNamespace = $vectordb_namespace
cypher_query = f'''
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{title: '{document_title}'}})
SET document.vectordbNamespace = '{vectordb_namespace}'
RETURN document
'''
# Parameters for the query
parameters = {
'user_id': user_id,
'vectordb_namespace': vectordb_namespace,
'document_title': document_title
}
# Execute the query with the provided parameters
result = self.query(cypher_query, parameters)
return result
return cypher_query

View file

@ -13,8 +13,6 @@ class Operation(Base):
user_id = Column(String, ForeignKey('users.id'), index=True) # Link to User
operation_type = Column(String, nullable=True)
operation_status = Column(String, nullable=True)
operation_params = Column(String, nullable=True)
number_of_files = Column(Integer, nullable=True)
test_set_id = Column(String, ForeignKey('test_sets.id'), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)

View file

@ -1,3 +1,6 @@
import os
import random
import string
import uuid
from graphviz import Digraph
@ -32,7 +35,44 @@ class Edge:
# dot.render("knowledge_graph.gv", view=True)
#
#
def get_document_names(doc_input):
"""
Get a list of document names.
This function takes doc_input, which can be a folder path, a single document file path, or a document name as a string.
It returns a list of document names based on the doc_input.
Args:
doc_input (str): The doc_input can be a folder path, a single document file path, or a document name as a string.
Returns:
list: A list of document names.
Example usage:
- Folder path: get_document_names(".data")
- Single document file path: get_document_names(".data/example.pdf")
- Document name provided as a string: get_document_names("example.docx")
"""
if isinstance(doc_input, list):
return doc_input
if os.path.isdir(doc_input):
# doc_input is a folder
folder_path = doc_input
document_names = []
for filename in os.listdir(folder_path):
if os.path.isfile(os.path.join(folder_path, filename)):
document_names.append(filename)
return document_names
elif os.path.isfile(doc_input):
# doc_input is a single document file
return [os.path.basename(doc_input)]
elif isinstance(doc_input, str):
# doc_input is a document name provided as a string
return [doc_input]
else:
# doc_input is not valid
return []
def format_dict(d):
# Initialize an empty list to store formatted items
@ -76,4 +116,13 @@ def create_edge_variable_mapping(edges):
# Construct a unique identifier for the edge
variable_name = f"edge{edge['source']}to{edge['target']}".lower()
mapping[(edge['source'], edge['target'])] = variable_name
return mapping
return mapping
def generate_letter_uuid(length=8):
"""Generate a random string of uppercase letters with the specified length."""
letters = string.ascii_uppercase # A-Z
return "".join(random.choice(letters) for _ in range(length))

View file

@ -8,10 +8,12 @@ from dotenv import load_dotenv
from level_4.cognitive_architecture.database.postgres.database_crud import session_scope
from cognitive_architecture.database.postgres.database import AsyncSessionLocal
from cognitive_architecture.utils import generate_letter_uuid
import instructor
from openai import OpenAI
from level_4.cognitive_architecture.vectorstore_manager import Memory
# Adds response_model to ChatCompletion
# Allows the return of Pydantic model rather than raw JSON
instructor.patch(OpenAI())
@ -28,7 +30,7 @@ config.load()
print(config.model)
print(config.openai_key)
from cognitive_architecture.utils import get_document_names
import logging
@ -46,6 +48,17 @@ async def get_vectordb_namespace(session: AsyncSession, user_id: str):
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_vectordb_document_name(session: AsyncSession, user_id: str):
try:
result = await session.execute(
select(DocsModel.doc_name).where(DocsModel.user_id == user_id).order_by(DocsModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
# async def retrieve_job_by_id(session, user_id, job_id):
# try:
# result = await session.execute(
@ -61,7 +74,7 @@ async def get_vectordb_namespace(session: AsyncSession, user_id: str):
async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None):
async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None, job_id:str=None):
"""
Update the Document node with the Vectordb_namespace for the given user. If the namespace is not provided,
it will be retrieved from the PostgreSQL database.
@ -93,6 +106,124 @@ async def update_document_vectordb_namespace(postgres_session: AsyncSession, use
async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None):
if job_id is None:
job_id = str(uuid.uuid4())
await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_status="RUNNING",
operation_type="DATA_LOAD",
test_set_id="none",
),
)
document_names = get_document_names(loader_settings.get("path", "None"))
for doc in document_names:
await add_entity(
session,
DocsModel(
id=str(uuid.uuid4()),
operation_id=job_id,
doc_name=doc
)
)
namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
namespace_class = namespace_id + "_class"
memory = await Memory.create_memory(user_id, session, namespace=namespace_class)
# Managing memory attributes
existing_user = await Memory.check_existing_user(user_id, session)
print("here is the existing user", existing_user)
await memory.manage_memory_attributes(existing_user)
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
dynamic_memory_class = getattr(memory, namespace_class.lower(), None)
await memory.add_dynamic_memory_class(dynamic_memory_class, namespace_class)
await memory.add_method_to_class(dynamic_memory_class, "add_memories")
# await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories")
sss = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=params, loader_settings=loader_settings)
await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_status="FINISHED",
operation_type="DATA_LOAD",
test_set_id="none",
),
)
async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str):
job_id = str(uuid.uuid4())
await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_status="RUNNING",
operation_type="USER_QUERY_TO_GRAPH_DB",
test_set_id="none",
),
)
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, password=config.graph_database_password)
# # Generate the Cypher query for a specific user
# user_id = 'user123' # Replace with the actual user ID
cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input)
result = neo4j_graph_db.query(cypher_query)
await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_status="SUCCESS",
operation_type="USER_QUERY_TO_GRAPH_DB",
test_set_id="none",
),
)
return result
async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str):
""""""
try:
await update_document_vectordb_namespace(postgres_session, user_id)
from cognitive_architecture.classifiers.classifier import classify_documents
classification = await classify_documents("Lord of the Rings")
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username,
password=config.graph_database_password)
rs = neo4j_graph_db.create_document_node_cypher(classification, user_id)
neo4j_graph_db.query(rs, classification)
namespace_title_dict = get_vectordb_namespace #fix this
neo4j_graph_db.update_document_node_with_namespace(user_id, namespace=, document_title="Lord of the Rings")
except:
pass
# query_input = "I walked in the forest yesterday and added to my list I need to buy some milk in the store"
#
@ -199,7 +330,7 @@ async def main():
neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein')
# # Generate the Cypher query for a specific user
# user_id = 'user123' # Replace with the actual user ID
cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,"I walked in the forest yesterday and added to my list I need to buy some milk in the store")
#cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,"I walked in the forest yesterday and added to my list I need to buy some milk in the store")
# result = neo4j_graph_db.query(cypher_query)
call_of_the_wild_summary = {
"user_id": user_id,
@ -220,12 +351,30 @@ async def main():
"answering the primal call of nature."
)
}
rs = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, user_id)
# rs = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, user_id)
#
# neo4j_graph_db.query(rs, call_of_the_wild_summary)
# print(cypher_query)
neo4j_graph_db.query(rs, call_of_the_wild_summary)
print(cypher_query)
from cognitive_architecture.classifiers.classifier import classify_documents
neo4j_graph_db.update_document_node_with_namespace(user_id, document_title="The Call of the Wild")
ff = await classify_documents("Lord of the Rings")
print(ff)
# vector_db_namespaces = await get_vectordb_namespace(session, user_id)
#
# if vector_db_namespaces == []:
# vector_db_namespaces = ["None"]
#
#
# print(vector_db_namespaces)
# for value in vector_db_namespaces:
# print(value)
#
# oo = neo4j_graph_db.update_document_node_with_namespace(user_id,vectordb_namespace= value,document_title="The Call of the Wild")
# logging.info("gg", oo)
# neo4j_graph_db.query(oo)
@ -283,3 +432,7 @@ if __name__ == "__main__":
asyncio.run(main())
#1. decompose query
#2. add document to vectordb
#3. add document to graph
#4. fetch relevant memories from semantic, episodic