Update summarization logic,

This commit is contained in:
Vasilije 2023-12-15 18:12:02 +01:00
parent 9c8927b79f
commit 7d0ee16d46
11 changed files with 1507 additions and 974 deletions

View file

@ -79,7 +79,17 @@ async def add_memory(
async with session_scope(session=AsyncSessionLocal()) as session:
from main import load_documents_to_vectorstore
output = await load_documents_to_vectorstore(session, decoded_payload['user_id'], loader_settings=decoded_payload['settings'])
if 'settings' in decoded_payload and decoded_payload['settings'] is not None:
settings_for_loader = decoded_payload['settings']
else:
settings_for_loader = None
if 'content' in decoded_payload and decoded_payload['content'] is not None:
content = decoded_payload['content']
else:
content = None
output = await load_documents_to_vectorstore(session, decoded_payload['user_id'], content=content, loader_settings=settings_for_loader)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
@ -107,11 +117,8 @@ async def user_query_to_graph(payload: Payload):
async def document_to_graph_db(payload: Payload):
try:
decoded_payload = payload.payload
# Execute the query - replace this with the actual execution method
async with session_scope(session=AsyncSessionLocal()) as session:
# Assuming you have a method in Neo4jGraphDB to execute the query
result = await add_documents_to_graph_db(postgres_session =session, user_id = decoded_payload['user_id'], loader_settins =decoded_payload['settings'])
result = await add_documents_to_graph_db(session =session, user_id = decoded_payload['user_id'], loader_settings =decoded_payload['settings'])
return result
except Exception as e:

View file

@ -23,9 +23,9 @@ from langchain.document_loaders import TextLoader
from langchain.document_loaders import DirectoryLoader
async def classify_documents(query:str, document_id:str, loader_settings:dict):
async def classify_documents(query:str, document_id:str, content:str):
document_context = await _document_loader(query, loader_settings)
document_context = content
logging.info("This is the document context", document_context)
llm = ChatOpenAI(temperature=0, model=config.model)

View file

@ -34,6 +34,9 @@ class Config:
graph_database_url: str = os.getenv('GRAPH_DB_URL')
graph_database_username: str = os.getenv('GRAPH_DB_USER')
graph_database_password: str = os.getenv('GRAPH_DB_PW')
weaviate_url: str = os.getenv('WEAVIATE_URL')
weaviate_api_key: str = os.getenv('WEAVIATE_API_KEY')
# Client ID

View file

@ -1,6 +1,6 @@
from datetime import datetime
from sqlalchemy import Column, String, DateTime, ForeignKey
from sqlalchemy import Column, String, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
import os
import sys
@ -11,6 +11,7 @@ class DocsModel(Base):
id = Column(String, primary_key=True)
operation_id = Column(String, ForeignKey('operations.id'), index=True)
doc_name = Column(String, nullable=True)
graph_summary = Column(Boolean, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)

View file

@ -231,41 +231,41 @@ class BaseMemory:
embeddings: Optional[str] = None,
):
from ast import literal_eval
class DynamicSchema(Schema):
pass
default_version = 'current_timestamp'
version_in_params = params.get("version", default_version)
# Check and update metadata version in DB.
schema_fields = params
def create_field(field_type, **kwargs):
field_mapping = {
"Str": fields.Str,
"Int": fields.Int,
"Float": fields.Float,
"Bool": fields.Bool,
}
return field_mapping[field_type](**kwargs)
# Dynamic Schema Creation
params['user_id'] = self.user_id
schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed
logging.info(f"params : {params}")
# Schema Validation
schema_instance = schema_instance
print("Schema fields: ", [field for field in schema_instance._declared_fields])
loaded_params = schema_instance.load(params)
# from ast import literal_eval
# class DynamicSchema(Schema):
# pass
#
# default_version = 'current_timestamp'
# version_in_params = params.get("version", default_version)
#
# # Check and update metadata version in DB.
# schema_fields = params
#
# def create_field(field_type, **kwargs):
# field_mapping = {
# "Str": fields.Str,
# "Int": fields.Int,
# "Float": fields.Float,
# "Bool": fields.Bool,
# }
# return field_mapping[field_type](**kwargs)
#
# # Dynamic Schema Creation
# params['user_id'] = self.user_id
#
#
# schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed
#
# logging.info(f"params : {params}")
#
# # Schema Validation
# schema_instance = schema_instance
# print("Schema fields: ", [field for field in schema_instance._declared_fields])
# loaded_params = schema_instance.load(params)
return await self.vector_db.add_memories(
observation=observation, loader_settings=loader_settings,
params=loaded_params, namespace=namespace, metadata_schema_class = schema_instance, embeddings=embeddings
params=params, namespace=namespace, metadata_schema_class = None, embeddings=embeddings
)
# Add other db_type conditions if necessary

View file

@ -20,7 +20,10 @@ from langchain.schema import Document
import weaviate
load_dotenv()
from ...config import Config
config = Config()
config.load()
LTM_MEMORY_ID_DEFAULT = "00000"
ST_MEMORY_ID_DEFAULT = "0000"
@ -153,18 +156,26 @@ class WeaviateVectorDB(VectorDB):
# Assuming _document_loader returns a list of documents
documents = await _document_loader(observation, loader_settings)
logging.info("here are the docs %s", str(documents))
chunk_count = 0
for doc in documents[0]:
document_to_load = self._stuct(doc.page_content, params, metadata_schema_class)
chunk_count += 1
params['chunk_order'] = chunk_count
# document_to_load = self._stuct(doc.page_content, params, metadata_schema_class)
logging.info("Loading document with provided loader settings %s", str(document_to_load))
# logging.info("Loading document with provided loader settings %s", str(document_to_load))
retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
Document(metadata=params, page_content=doc.page_content)])
else:
document_to_load = self._stuct(observation, params, metadata_schema_class)
chunk_count = 0
documents = await _document_loader(observation, loader_settings)
for doc in documents[0]:
chunk_count += 1
params['chunk_order'] = chunk_count
# document_to_load = self._stuct(observation, params, metadata_schema_class)
logging.info("Loading document with defautl loader settings %s", str(document_to_load))
retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
# logging.info("Loading document with defautl loader settings %s", str(document_to_load))
retriever.add_documents([
Document(metadata=params, page_content=doc)])
async def fetch_memories(self, observation: str, namespace: str = None, search_type: str = 'hybrid', **kwargs):
"""
@ -185,7 +196,22 @@ class WeaviateVectorDB(VectorDB):
client = self.init_weaviate(namespace =self.namespace)
if search_type is None:
search_type = 'hybrid'
logging.info("The search type is 2 %", search_type)
logging.info("The search type is s%", search_type)
if search_type == 'summary':
from weaviate.classes import Filter
client = weaviate.connect_to_wcs(
cluster_url=config.weaviate_url,
auth_credentials=weaviate.AuthApiKey(config.weaviate_api_key)
)
summary_collection = client.collections.get(self.namespace)
response = summary_collection.query.fetch_objects(
filters=Filter("user_id").equal(self.user_id) &
Filter("chunk_order").less_than(25),
limit=15
)
return response
if not namespace:
namespace = self.namespace
@ -280,7 +306,6 @@ class WeaviateVectorDB(VectorDB):
)
else:
# Delete all objects
print("HERE IS THE USER ID", self.user_id)
return client.batch.delete_objects(
class_name=namespace,
where={

View file

@ -4,6 +4,8 @@ import string
import uuid
from graphviz import Digraph
from sqlalchemy.orm import contains_eager
# from graph_database.graph import KnowledgeGraph
@ -125,4 +127,137 @@ def generate_letter_uuid(length=8):
letters = string.ascii_uppercase # A-Z
return "".join(random.choice(letters) for _ in range(length))
from cognitive_architecture.database.postgres.models.operation import Operation
from cognitive_architecture.database.postgres.database_crud import session_scope, add_entity, update_entity, fetch_job_id
from cognitive_architecture.database.postgres.models.metadatas import MetaDatas
from cognitive_architecture.database.postgres.models.docs import DocsModel
from cognitive_architecture.database.postgres.models.memory import MemoryModel
from cognitive_architecture.database.postgres.models.user import User
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
import logging
async def get_vectordb_namespace(session: AsyncSession, user_id: str):
try:
result = await session.execute(
select(MemoryModel.memory_name).where(MemoryModel.user_id == user_id).order_by(MemoryModel.created_at.desc())
)
namespace = [row[0] for row in result.fetchall()]
return namespace
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_vectordb_document_name(session: AsyncSession, user_id: str):
try:
result = await session.execute(
select(DocsModel.doc_name).where(DocsModel.user_id == user_id).order_by(DocsModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_model_id_name(session: AsyncSession, id: str):
try:
result = await session.execute(
select(MemoryModel.memory_name).where(MemoryModel.id == id).order_by(MemoryModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_unsumarized_vector_db_namespace(session: AsyncSession, user_id: str):
"""
Asynchronously retrieves the latest memory names and document details for a given user.
This function executes a database query to fetch memory names and document details
associated with operations performed by a specific user. It leverages explicit joins
with the 'docs' and 'memories' tables and applies eager loading to optimize performance.
Parameters:
- session (AsyncSession): The database session for executing the query.
- user_id (str): The unique identifier of the user.
Returns:
- Tuple[List[str], List[Tuple[str, str]]]: A tuple containing a list of memory names and
a list of tuples with document names and their corresponding IDs.
Returns None if an exception occurs.
Raises:
- Exception: Propagates any exceptions that occur during query execution.
Example Usage:
"""
try:
result = await session.execute(
select(Operation)
.join(Operation.docs) # Explicit join with docs table
.join(Operation.memories) # Explicit join with memories table
.options(
contains_eager(Operation.docs), # Informs ORM of the join for docs
contains_eager(Operation.memories) # Informs ORM of the join for memories
)
.where(
(Operation.user_id == user_id) & # Filter by user_id
(Operation.docs.graph_summary == False) # Filter by user_id
)
.order_by(Operation.created_at.desc()) # Order by creation date
)
operations = result.unique().scalars().all()
# Extract memory names and document names and IDs
memory_names = [memory.memory_name for op in operations for memory in op.memories]
docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs]
return memory_names, docs
except Exception as e:
# Handle the exception as needed
print(f"An error occurred: {e}")
return None
async def get_memory_name_by_doc_id(session: AsyncSession, docs_id: str):
"""
Asynchronously retrieves memory names associated with a specific document ID.
This function executes a database query to fetch memory names linked to a document
through operations. The query is filtered based on a given document ID and retrieves
only the memory names without loading the entire Operation entity.
Parameters:
- session (AsyncSession): The database session for executing the query.
- docs_id (str): The unique identifier of the document.
Returns:
- List[str]: A list of memory names associated with the given document ID.
Returns None if an exception occurs.
Raises:
- Exception: Propagates any exceptions that occur during query execution.
"""
try:
result = await session.execute(
select(MemoryModel.memory_name)
.join(Operation, Operation.id == MemoryModel.operation_id) # Join with Operation
.join(DocsModel, DocsModel.operation_id == Operation.id) # Join with DocsModel
.where(DocsModel.id == docs_id) # Filtering based on the passed document ID
.distinct() # To avoid duplicate memory names
)
memory_names = [row[0] for row in result.fetchall()]
return memory_names
except Exception as e:
# Handle the exception as needed
print(f"An error occurred: {e}")
return None

View file

@ -29,6 +29,7 @@ services:
- 8000:8000
- 443:443
- 80:80
- 50051:50051
depends_on:
- postgres
- neo4j

View file

@ -35,132 +35,37 @@ from sqlalchemy.orm import selectinload, joinedload, contains_eager
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from cognitive_architecture.utils import get_document_names, generate_letter_uuid, get_memory_name_by_doc_id, get_unsumarized_vector_db_namespace, get_vectordb_namespace, get_vectordb_document_name
async def get_vectordb_namespace(session: AsyncSession, user_id: str):
try:
result = await session.execute(
select(MemoryModel.memory_name).where(MemoryModel.user_id == user_id).order_by(MemoryModel.created_at.desc())
)
namespace = [row[0] for row in result.fetchall()]
return namespace
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_vectordb_document_name(session: AsyncSession, user_id: str):
try:
result = await session.execute(
select(DocsModel.doc_name).where(DocsModel.user_id == user_id).order_by(DocsModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def fetch_document_vectordb_namespace(session: AsyncSession, user_id: str, namespace_id:str):
memory = await Memory.create_memory(user_id, session, namespace=namespace_id, memory_label=namespace_id)
async def get_model_id_name(session: AsyncSession, id: str):
try:
result = await session.execute(
select(MemoryModel.memory_name).where(MemoryModel.id == id).order_by(MemoryModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
# Managing memory attributes
existing_user = await Memory.check_existing_user(user_id, session)
print("here is the existing user", existing_user)
await memory.manage_memory_attributes(existing_user)
print("Namespace id is %s", namespace_id)
await memory.add_dynamic_memory_class(namespace_id.lower(), namespace_id)
dynamic_memory_class = getattr(memory, namespace_id.lower(), None)
methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]
async def get_vectordb_data(session: AsyncSession, user_id: str):
"""
Asynchronously retrieves the latest memory names and document details for a given user.
if dynamic_memory_class is not None:
for method_name in methods_to_add:
await memory.add_method_to_class(dynamic_memory_class, method_name)
print(f"Memory method {method_name} has been added")
else:
print(f"No attribute named in memory.")
This function executes a database query to fetch memory names and document details
associated with operations performed by a specific user. It leverages explicit joins
with the 'docs' and 'memories' tables and applies eager loading to optimize performance.
print("Available memory classes:", await memory.list_memory_classes())
result = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation="placeholder", search_type="summary")
Parameters:
- session (AsyncSession): The database session for executing the query.
- user_id (str): The unique identifier of the user.
return result, namespace_id
Returns:
- Tuple[List[str], List[Tuple[str, str]]]: A tuple containing a list of memory names and
a list of tuples with document names and their corresponding IDs.
Returns None if an exception occurs.
Raises:
- Exception: Propagates any exceptions that occur during query execution.
Example Usage:
"""
try:
result = await session.execute(
select(Operation)
.join(Operation.docs) # Explicit join with docs table
.join(Operation.memories) # Explicit join with memories table
.options(
contains_eager(Operation.docs), # Informs ORM of the join for docs
contains_eager(Operation.memories) # Informs ORM of the join for memories
)
.where(
(Operation.user_id == user_id) # Filter by user_id
# Optionally, you can add more filters here
)
.order_by(Operation.created_at.desc()) # Order by creation date
)
operations = result.unique().scalars().all()
# Extract memory names and document names and IDs
memory_names = [memory.memory_name for op in operations for memory in op.memories]
docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs]
return memory_names, docs
except Exception as e:
# Handle the exception as needed
print(f"An error occurred: {e}")
return None
async def get_memory_name_by_doc_id(session: AsyncSession, docs_id: str):
"""
Asynchronously retrieves memory names associated with a specific document ID.
This function executes a database query to fetch memory names linked to a document
through operations. The query is filtered based on a given document ID and retrieves
only the memory names without loading the entire Operation entity.
Parameters:
- session (AsyncSession): The database session for executing the query.
- docs_id (str): The unique identifier of the document.
Returns:
- List[str]: A list of memory names associated with the given document ID.
Returns None if an exception occurs.
Raises:
- Exception: Propagates any exceptions that occur during query execution.
"""
try:
result = await session.execute(
select(MemoryModel.memory_name)
.join(Operation, Operation.id == MemoryModel.operation_id) # Join with Operation
.join(DocsModel, DocsModel.operation_id == Operation.id) # Join with DocsModel
.where(DocsModel.id == docs_id) # Filtering based on the passed document ID
.distinct() # To avoid duplicate memory names
)
memory_names = [row[0] for row in result.fetchall()]
return memory_names
except Exception as e:
# Handle the exception as needed
print(f"An error occurred: {e}")
return None
async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None):
async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, content:str=None, job_id:str=None, loader_settings:dict=None):
namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
namespace_class = namespace_id + "_class"
@ -184,7 +89,11 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
),
)
memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id=job_id, memory_label=namespace_id)
document_names = get_document_names(loader_settings.get("path", "None"))
if content is not None:
document_names = [content[:30]]
if loader_settings is not None:
document_names = get_document_names(loader_settings.get("path", "None"))
for doc in document_names:
await add_entity(
session,
@ -227,10 +136,10 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
print("Available memory classes:", await memory.list_memory_classes())
result = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=params, loader_settings=loader_settings)
observation=content, params=params, loader_settings=loader_settings)
await update_entity(session, Operation, job_id, "SUCCESS")
return result
return result, namespace_id
async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str):
@ -264,16 +173,15 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu
async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str, loader_settings:dict=None, stupid_local_testing_flag=False): #clean this up Vasilije, don't be sloppy
async def add_documents_to_graph_db(session: AsyncSession, user_id: str= None, loader_settings:dict=None, stupid_local_testing_flag=False): #clean this up Vasilije, don't be sloppy
""""""
try:
# await update_document_vectordb_namespace(postgres_session, user_id)
memory_names, docs = await get_vectordb_data(postgres_session, user_id)
memory_names, docs = await get_unsumarized_vector_db_namespace(session, user_id)
logging.info("Memory names are", memory_names)
logging.info("Docs are", docs)
for doc, memory_name in zip(docs, memory_names):
doc_name, doc_id = doc
logging.info("hereee %s", doc_name)
if stupid_local_testing_flag:
classification = [{
"DocumentCategory": "Literature",
@ -316,7 +224,12 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str
# select doc from the store
neo4j_graph_db.update_document_node_with_namespace(user_id, vectordb_namespace=memory_name, document_id=doc_id)
else:
classification = await classify_documents(doc_name, document_id =doc_id, loader_settings=loader_settings)
try:
classification_content = fetch_document_vectordb_namespace(session, user_id, memory_name)
except:
classification_content = "None"
classification = await classify_documents(doc_name, document_id =doc_id, content=classification_content)
logging.info("Classification is", str(classification))
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username,
@ -327,6 +240,7 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str
# select doc from the store
neo4j_graph_db.update_document_node_with_namespace(user_id, vectordb_namespace=memory_name,
document_id=doc_id)
await update_entity(session, DocsModel, doc_id, True)
except:
pass

2046
level_4/poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -38,7 +38,7 @@ pypdf = "^3.12.0"
fastjsonschema = "^2.18.0"
marvin = "^1.3.0"
dlt = { version ="^0.3.8", extras = ["duckdb"]}
weaviate-client = "^3.22.1"
weaviate-client = "4.*"
python-multipart = "^0.0.6"
deepeval = "^0.20.12"
pymupdf = "^1.23.3"
@ -58,6 +58,7 @@ networkx = "^3.2.1"
graphviz = "^0.20.1"
greenlet = "^3.0.1"
neo4j = "^5.14.1"
grpcio = "^1.60.0"