diff --git a/level_2/chunkers/chunkers.py b/level_2/chunkers/chunkers.py index e96a0d251..b5da83c34 100644 --- a/level_2/chunkers/chunkers.py +++ b/level_2/chunkers/chunkers.py @@ -1,6 +1,7 @@ from langchain.document_loaders import PyPDFLoader - -from level_2.shared.chunk_strategy import ChunkStrategy +import sys, os +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from shared.chunk_strategy import ChunkStrategy import re def chunk_data(chunk_strategy=None, source_data=None, chunk_size=None, chunk_overlap=None): diff --git a/level_2/database/database.py b/level_2/database/database.py new file mode 100644 index 000000000..178f1811b --- /dev/null +++ b/level_2/database/database.py @@ -0,0 +1,59 @@ +import os +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from contextlib import contextmanager +from sqlalchemy.exc import OperationalError +from time import sleep +import sys +from dotenv import load_dotenv +load_dotenv() + +# this is needed to import classes from other modules +script_dir = os.path.dirname(os.path.abspath(__file__)) +# Get the parent directory of your script and add it to sys.path +parent_dir = os.path.dirname(script_dir) +sys.path.append(parent_dir) + + +# in seconds +MAX_RETRIES = 3 +RETRY_DELAY = 5 + +username = os.getenv('POSTGRES_USER') +password = os.getenv('POSTGRES_PASSWORD') +database_name = os.getenv('POSTGRES_DB') +host = os.getenv('POSTGRES_HOST') + + + +SQLALCHEMY_DATABASE_URL = f"postgresql://{username}:{password}@{host}:5432/{database_name}" + +engine = create_engine( + SQLALCHEMY_DATABASE_URL, + pool_recycle=3600, # recycle connections after 1 hour + pool_pre_ping=True # test the connection for liveness upon each checkout +) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +Base = declarative_base() + +@contextmanager +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + +def safe_db_operation(db_op, *args, **kwargs): + for attempt in range(MAX_RETRIES): + with get_db() as db: + try: + return db_op(db, *args, **kwargs) + except OperationalError as e: + db.rollback() + if "server closed the connection unexpectedly" in str(e) and attempt < MAX_RETRIES - 1: + sleep(RETRY_DELAY) + else: + raise \ No newline at end of file diff --git a/level_2/docker-compose.yml b/level_2/docker-compose.yml index b591144e3..d19ad7c3e 100644 --- a/level_2/docker-compose.yml +++ b/level_2/docker-compose.yml @@ -14,9 +14,18 @@ services: ports: - 8000:8000 - 443:443 - - - + postgres: + image: postgres + container_name: postgres + environment: + - POSTGRES_HOST_AUTH_METHOD= trust + - POSTGRES_USER=bla + - POSTGRES_PASSWORD=bla + - POSTGRES_DB=bubu + networks: + - promethai_mem_backend + ports: + - "5432:5432" networks: promethai_mem_backend: name: promethai_mem_backend diff --git a/level_2/level_2_pdf_vectorstore__dlt_contracts.py b/level_2/level_2_pdf_vectorstore__dlt_contracts.py index 073035802..757914547 100644 --- a/level_2/level_2_pdf_vectorstore__dlt_contracts.py +++ b/level_2/level_2_pdf_vectorstore__dlt_contracts.py @@ -80,30 +80,6 @@ from vectordb.basevectordb import BaseMemory from modulators.modulators import DifferentiableLayer -class SemanticMemory(BaseMemory): - def __init__( - self, - user_id: str, - memory_id: Optional[str], - index_name: Optional[str], - db_type: str = "weaviate", - ): - super().__init__( - user_id, memory_id, index_name, db_type, namespace="SEMANTICMEMORY") - - -class EpisodicMemory(BaseMemory): - def __init__( - self, - user_id: str, - memory_id: Optional[str], - index_name: Optional[str], - db_type: str = "weaviate", - ): - super().__init__( - user_id, memory_id, index_name, db_type, namespace="EPISODICMEMORY" - ) - class EpisodicBuffer(BaseMemory): def __init__( @@ -819,37 +795,93 @@ class EpisodicBuffer(BaseMemory): return result_parsing.json() -class LongTermMemory: - def __init__( - self, - user_id: str = "676", - memory_id: Optional[str] = None, - index_name: Optional[str] = None, - db_type: str = "weaviate", - ): - self.user_id = user_id - self.memory_id = memory_id - self.ltm_memory_id = str(uuid.uuid4()) - self.index_name = index_name - self.db_type = db_type - self.semantic_memory = SemanticMemory(user_id, memory_id, index_name, db_type) - self.episodic_memory = EpisodicMemory(user_id, memory_id, index_name, db_type) +# class LongTermMemory: +# def __init__( +# self, +# user_id: str = "676", +# memory_id: Optional[str] = None, +# index_name: Optional[str] = None, +# db_type: str = "weaviate", +# ): +# self.user_id = user_id +# self.memory_id = memory_id +# self.ltm_memory_id = str(uuid.uuid4()) +# self.index_name = index_name +# self.db_type = db_type +# self.semantic_memory = SemanticMemory(user_id, memory_id, index_name, db_type) +# self.episodic_memory = EpisodicMemory(user_id, memory_id, index_name, db_type) + +# +# class ShortTermMemory: +# def __init__( +# self, +# user_id: str = "676", +# memory_id: Optional[str] = None, +# index_name: Optional[str] = None, +# db_type: str = "weaviate", +# ): +# self.user_id = user_id +# self.memory_id = memory_id +# self.stm_memory_id = str(uuid.uuid4()) +# self.index_name = index_name +# self.db_type = db_type +# self.episodic_buffer = EpisodicBuffer(user_id, memory_id, index_name, db_type) +# +# class PythonClass: +# def __init__(self, name): +# self.name = name +# self.attributes = set() # Using set to store unique attribute names +# self.methods = set() # Using set to store unique method names +# self.inheritance = None +# self.associations = [] +# +# def add_method(self, method_name): +# self.methods.add(method_name) +# +# def add_attribute(self, attribute_name): +# self.attributes.add(attribute_name) +# async def call_method(self, method_name, *args, **kwargs): +# if method_name in self.methods: +# method = getattr(self, method_name, None) +# if method: +# return await method(*args, **kwargs) +# raise AttributeError(f"{self.name} object has no attribute {method_name}") +# +# def get_attribute(self, attribute_name): +# return self.attributes.get(attribute_name) -class ShortTermMemory: - def __init__( - self, - user_id: str = "676", - memory_id: Optional[str] = None, - index_name: Optional[str] = None, - db_type: str = "weaviate", - ): - self.user_id = user_id - self.memory_id = memory_id - self.stm_memory_id = str(uuid.uuid4()) - self.index_name = index_name - self.db_type = db_type - self.episodic_buffer = EpisodicBuffer(user_id, memory_id, index_name, db_type) +class DynamicBaseMemory(BaseMemory): + def __init__(self, name, user_id, memory_id, index_name, db_type, namespace): + super().__init__(user_id, memory_id, index_name, db_type, namespace) + self.name = name + self.attributes = set() + self.methods = set() + self.inheritance = None + self.associations = [] + + def add_method(self, method_name): + self.methods.add(method_name) + + def add_attribute(self, attribute_name): + self.attributes.add(attribute_name) + + def get_attribute(self, attribute_name): + return attribute_name in self.attributes + + def add_association(self, associated_memory): + if associated_memory not in self.associations: + self.associations.append(associated_memory) + # Optionally, establish a bidirectional association + associated_memory.associations.append(self) + +class Attribute: + def __init__(self, name): + self.name = name + +class Method: + def __init__(self, name): + self.name = name class Memory: @@ -857,15 +889,17 @@ class Memory: OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__( - self, - user_id: str = "676", - index_name: str = None, - knowledge_source: str = None, - knowledge_type: str = None, - db_type: str = "weaviate", - namespace: str = None, - ) -> None: + def __init__(self, user_id: str = "676", index_name: str = None, + knowledge_source: str = None, knowledge_type: str = None, + db_type: str = "weaviate", namespace: str = None, ) -> None: + self.memory_class = DynamicBaseMemory('Memory', user_id, str(uuid.uuid4()), index_name, db_type, namespace) + self.semantic_memory_class = DynamicBaseMemory('SemanticMemory', user_id, str(uuid.uuid4()), index_name, + db_type, namespace) + self.episodic_memory_class = DynamicBaseMemory('EpisodicMemory', user_id, str(uuid.uuid4()), index_name, + db_type, namespace) + self.episodic_buffer_class = DynamicBaseMemory('EpisodicBuffer', user_id, str(uuid.uuid4()), index_name, + db_type, namespace) + self.user_id = user_id self.index_name = index_name self.db_type = db_type @@ -875,150 +909,45 @@ class Memory: self.long_term_memory = None self.short_term_memory = None self.namespace = namespace - load_dotenv() - # Asynchronous factory function for creating LongTermMemory - async def async_create_long_term_memory( - self, user_id, memory_id, index_name, db_type - ): - # Perform asynchronous initialization steps if needed - return LongTermMemory( - user_id=self.user_id, - memory_id=self.memory_id, - index_name=self.index_name, - db_type=self.db_type, - ) + attributes_list = ['user_id', 'index_name', 'db_type', 'knowledge_source', 'knowledge_type', 'memory_id', + 'long_term_memory', 'short_term_memory', 'namespace'] + for attr in attributes_list: + self.memory_class.add_attribute(attr) - async def async_init(self): - # Asynchronous initialization of LongTermMemory and ShortTermMemory - self.long_term_memory = await self.async_create_long_term_memory( - user_id=self.user_id, - memory_id=self.memory_id, - index_name=self.index_name, - db_type=self.db_type, - ) - self.short_term_memory = await self.async_create_short_term_memory( - user_id=self.user_id, - memory_id=self.memory_id, - index_name=self.index_name, - db_type=self.db_type, - ) + methods_list = ['async_create_long_term_memory', 'async_init', 'add_memories', "fetch_memories", 'async_create_short_term_memory', + '_create_buffer_context', '_get_task_list', '_run_main_buffer', + '_available_operations', '_provide_feedback'] + for class_instance in [self.memory_class, self.semantic_memory_class, self.episodic_memory_class, self.episodic_buffer_class]: + for method in methods_list: + class_instance.add_method(method) - async def async_create_short_term_memory( - self, user_id, memory_id, index_name, db_type - ): - # Perform asynchronous initialization steps if needed - return ShortTermMemory( - user_id=self.user_id, - memory_id=self.memory_id, - index_name=self.index_name, - db_type=self.db_type, - ) + async def dynamic_method_call(self, dynamic_base_memory_instance, method_name: str, *args, **kwargs): + if method_name in dynamic_base_memory_instance.methods: + method = getattr(dynamic_base_memory_instance, method_name, None) + if method: + return await method(*args, **kwargs) + raise AttributeError(f"{dynamic_base_memory_instance.name} object has no attribute {method_name}") - async def _add_semantic_memory( - self, observation: str, loader_settings: dict = None, params: dict = None - ): - return await self.long_term_memory.semantic_memory.add_memories( - observation=observation, - loader_settings=loader_settings, - params=params, - ) + def add_dynamic_memory_class(self, class_name: str, namespace: str): + new_memory_class = DynamicBaseMemory(class_name, self.user_id, str(uuid.uuid4()), self.index_name, + self.db_type, namespace) + setattr(self, f"{class_name.lower()}_class", new_memory_class) + return new_memory_class - async def _fetch_semantic_memory(self, observation, params): - return await self.long_term_memory.semantic_memory.fetch_memories( - observation=observation, params=params - ) + def add_attribute_to_class(self, class_instance, attribute_name: str): + class_instance.add_attribute(attribute_name) - async def _delete_semantic_memory(self, params: str = None): - return await self.long_term_memory.semantic_memory.delete_memories( - params=params - ) + def add_method_to_class(self, class_instance, method_name: str): + class_instance.add_method(method_name) - async def _add_episodic_memory( - self, observation: str, loader_settings: dict = None, params: dict = None - ): - return await self.long_term_memory.episodic_memory.add_memories( - observation=observation, loader_settings=loader_settings, params=params - ) - - async def _fetch_episodic_memory(self, observation, params: str = None): - return await self.long_term_memory.episodic_memory.fetch_memories( - observation=observation, params=params - ) - - async def _delete_episodic_memory(self, params: str = None): - return await self.long_term_memory.episodic_memory.delete_memories( - params=params - ) - - - async def _add_buffer_memory( - self, - user_input: str, - namespace: str = None, - loader_settings: dict = None, - params: dict = None, - ): - return await self.short_term_memory.episodic_buffer.add_memories( - observation=user_input, loader_settings=loader_settings, params=params - ) - - async def _fetch_buffer_memory(self, user_input: str): - return await self.short_term_memory.episodic_buffer.fetch_memories( - observation=user_input - ) - - async def _delete_buffer_memory(self, params: str = None): - return await self.short_term_memory.episodic_buffer.delete_memories( - params=params - ) - - async def _create_buffer_context( - self, - user_input: str, - params: dict = None, - attention_modulators: dict = None, - ): - return await self.short_term_memory.episodic_buffer.buffer_context( - user_input=user_input, - params=params, - attention_modulators=attention_modulators, - ) - async def _get_task_list( - self, - user_input: str, - params: str = None, - attention_modulators: dict = None, - ): - return await self.short_term_memory.episodic_buffer.get_task_list( - user_input=user_input, - params=params, - attention_modulators=attention_modulators, - ) - async def _run_main_buffer( - self, - user_input: str, - params: dict = None, - attention_modulators: dict = None, - ): - return await self.short_term_memory.episodic_buffer.main_buffer( - user_input=user_input, - params=params, - attention_modulators=attention_modulators, - ) - - async def _available_operations(self): - return await self.long_term_memory.episodic_buffer.available_operations() - - async def _provide_feedback(self, score:str =None, params: dict = None, attention_modulators: dict = None): - return await self.short_term_memory.episodic_buffer.provide_feedback(score=score, params=params, attention_modulators=attention_modulators) async def main(): # if you want to run the script as a standalone script, do so with the examples below - memory = Memory(user_id="TestUser") - await memory.async_init() + # memory = Memory(user_id="TestUser") + # await memory.async_init() params = { "version": "1.0", "agreement_id": "AG123456", @@ -1037,8 +966,18 @@ async def main(): "source": "url", "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" } - load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params) - print(load_jack_london) + # memory_instance = Memory(namespace='SEMANTICMEMORY') + # sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation') + memory_instance = Memory(namespace='PROCEDURALMEMORY') + procedural_memory_class = memory_instance.add_dynamic_memory_class('ProceduralMemory', 'PROCEDURALMEMORY') + memory_instance.add_method_to_class(procedural_memory_class, 'add_memories') + + sss = await memory_instance.dynamic_method_call(memory_instance.proceduralmemory_class, 'add_memories', + observation='some_observation', params=params) + + print(sss) + # load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params) + # print(load_jack_london) modulator = {"relevance": 0.1, "frequency": 0.1} @@ -1061,7 +1000,161 @@ async def main(): # print(run_main_buffer) # del_semantic = await memory._delete_semantic_memory() # print(del_semantic) + # def __init__( + # self, + # user_id: str = "676", + # index_name: str = None, + # knowledge_source: str = None, + # knowledge_type: str = None, + # db_type: str = "weaviate", + # namespace: str = None, + # ) -> None: + # self.user_id = user_id + # self.index_name = index_name + # self.db_type = db_type + # self.knowledge_source = knowledge_source + # self.knowledge_type = knowledge_type + # self.memory_id = str(uuid.uuid4()) + # self.long_term_memory = None + # self.short_term_memory = None + # self.namespace = namespace + # load_dotenv() + # Asynchronous factory function for creating LongTermMemory + # async def async_create_long_term_memory( + # self, user_id, memory_id, index_name, db_type + # ): + # # Perform asynchronous initialization steps if needed + # return LongTermMemory( + # user_id=self.user_id, + # memory_id=self.memory_id, + # index_name=self.index_name, + # db_type=self.db_type, + # ) + # + # async def async_init(self): + # # Asynchronous initialization of LongTermMemory and ShortTermMemory + # self.long_term_memory = await self.async_create_long_term_memory( + # user_id=self.user_id, + # memory_id=self.memory_id, + # index_name=self.index_name, + # db_type=self.db_type, + # ) + # self.short_term_memory = await self.async_create_short_term_memory( + # user_id=self.user_id, + # memory_id=self.memory_id, + # index_name=self.index_name, + # db_type=self.db_type, + # ) + # + # async def async_create_short_term_memory( + # self, user_id, memory_id, index_name, db_type + # ): + # # Perform asynchronous initialization steps if needed + # return ShortTermMemory( + # user_id=self.user_id, + # memory_id=self.memory_id, + # index_name=self.index_name, + # db_type=self.db_type, + # ) + # + # async def _add_semantic_memory( + # self, observation: str, loader_settings: dict = None, params: dict = None + # ): + # return await self.long_term_memory.semantic_memory.add_memories( + # observation=observation, + # loader_settings=loader_settings, + # params=params, + # ) + # + # async def _fetch_semantic_memory(self, observation, params): + # return await self.long_term_memory.semantic_memory.fetch_memories( + # observation=observation, params=params + # ) + # + # async def _delete_semantic_memory(self, params: str = None): + # return await self.long_term_memory.semantic_memory.delete_memories( + # params=params + # ) + # + # async def _add_episodic_memory( + # self, observation: str, loader_settings: dict = None, params: dict = None + # ): + # return await self.long_term_memory.episodic_memory.add_memories( + # observation=observation, loader_settings=loader_settings, params=params + # ) + # + # async def _fetch_episodic_memory(self, observation, params: str = None): + # return await self.long_term_memory.episodic_memory.fetch_memories( + # observation=observation, params=params + # ) + # + # async def _delete_episodic_memory(self, params: str = None): + # return await self.long_term_memory.episodic_memory.delete_memories( + # params=params + # ) + # + # + # async def _add_buffer_memory( + # self, + # user_input: str, + # namespace: str = None, + # loader_settings: dict = None, + # params: dict = None, + # ): + # return await self.short_term_memory.episodic_buffer.add_memories( + # observation=user_input, loader_settings=loader_settings, params=params + # ) + # + # async def _fetch_buffer_memory(self, user_input: str): + # return await self.short_term_memory.episodic_buffer.fetch_memories( + # observation=user_input + # ) + # + # async def _delete_buffer_memory(self, params: str = None): + # return await self.short_term_memory.episodic_buffer.delete_memories( + # params=params + # ) + # + # async def _create_buffer_context( + # self, + # user_input: str, + # params: dict = None, + # attention_modulators: dict = None, + # ): + # return await self.short_term_memory.episodic_buffer.buffer_context( + # user_input=user_input, + # params=params, + # attention_modulators=attention_modulators, + # ) + # async def _get_task_list( + # self, + # user_input: str, + # params: str = None, + # attention_modulators: dict = None, + # ): + # return await self.short_term_memory.episodic_buffer.get_task_list( + # user_input=user_input, + # params=params, + # attention_modulators=attention_modulators, + # ) + # async def _run_main_buffer( + # self, + # user_input: str, + # params: dict = None, + # attention_modulators: dict = None, + # ): + # return await self.short_term_memory.episodic_buffer.main_buffer( + # user_input=user_input, + # params=params, + # attention_modulators=attention_modulators, + # ) + # + # async def _available_operations(self): + # return await self.long_term_memory.episodic_buffer.available_operations() + # + # async def _provide_feedback(self, score:str =None, params: dict = None, attention_modulators: dict = None): + # return await self.short_term_memory.episodic_buffer.provide_feedback(score=score, params=params, attention_modulators=attention_modulators) if __name__ == "__main__": import asyncio diff --git a/level_2/loaders/loaders.py b/level_2/loaders/loaders.py index 15573e4d0..8c4521682 100644 --- a/level_2/loaders/loaders.py +++ b/level_2/loaders/loaders.py @@ -1,8 +1,10 @@ import os from io import BytesIO - +import sys, os import fitz -from level_2.chunkers.chunkers import chunk_data +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from chunkers.chunkers import chunk_data from langchain.document_loaders import PyPDFLoader import requests diff --git a/level_2/models/memory.py b/level_2/models/memory.py new file mode 100644 index 000000000..9474253ff --- /dev/null +++ b/level_2/models/memory.py @@ -0,0 +1,22 @@ +# memory.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + +class Memory(Base): + __tablename__ = 'memories' + + id = Column(Integer, primary_key=True) + user_id = Column(Integer, ForeignKey('users.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + user = relationship("User", back_populates="memories") + metadatas = relationship("MetaDatas", back_populates="memory", cascade="all, delete-orphan") + + def __repr__(self): + return f"" diff --git a/level_2/models/memory_graph.py b/level_2/models/memory_graph.py new file mode 100644 index 000000000..b367a8042 --- /dev/null +++ b/level_2/models/memory_graph.py @@ -0,0 +1,17 @@ +from sqlalchemy import create_engine, Column, Integer, String +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class MemoryAssociation(Base): + __tablename__ = 'memory_associations' + + id = Column(Integer, primary_key=True) + user_id = Column(String) + source_memory_id = Column(String) + target_memory_id = Column(String) \ No newline at end of file diff --git a/level_2/models/metadatas.py b/level_2/models/metadatas.py new file mode 100644 index 000000000..fac60a000 --- /dev/null +++ b/level_2/models/metadatas.py @@ -0,0 +1,24 @@ +# metadata.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class MetaDatas(Base): + __tablename__ = 'metadata' + + id = Column(Integer, primary_key=True) + version = Column(String, nullable=False) + field = Column(String, nullable=False) + memory_id = Column(Integer, ForeignKey('memories.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + memory = relationship("Memory", back_populates="metadata") + + def __repr__(self): + return f"" diff --git a/level_2/models/operation.py b/level_2/models/operation.py new file mode 100644 index 000000000..d7ed1f3b8 --- /dev/null +++ b/level_2/models/operation.py @@ -0,0 +1,22 @@ +# operation.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class Operation(Base): + __tablename__ = 'operations' + + id = Column(Integer, primary_key=True) + session_id = Column(Integer, ForeignKey('sessions.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + session = relationship("Session", back_populates="operations") + + def __repr__(self): + return f"" diff --git a/level_2/models/sessions.py b/level_2/models/sessions.py new file mode 100644 index 000000000..25ccb7518 --- /dev/null +++ b/level_2/models/sessions.py @@ -0,0 +1,23 @@ +# session.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class Session(Base): + __tablename__ = 'sessions' + + id = Column(Integer, primary_key=True) + user_id = Column(Integer, ForeignKey('users.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + user = relationship("User", back_populates="sessions") + operations = relationship("Operation", back_populates="session", cascade="all, delete-orphan") + + def __repr__(self): + return f"" diff --git a/level_2/models/test_output.py b/level_2/models/test_output.py new file mode 100644 index 000000000..3a46ad5cf --- /dev/null +++ b/level_2/models/test_output.py @@ -0,0 +1,22 @@ +# test_output.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class TestOutput(Base): + __tablename__ = 'test_outputs' + + id = Column(Integer, primary_key=True) + test_set_id = Column(Integer, ForeignKey('test_sets.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + test_set = relationship("TestSet", back_populates="test_outputs") + + def __repr__(self): + return f"" diff --git a/level_2/models/test_set.py b/level_2/models/test_set.py new file mode 100644 index 000000000..8d43222be --- /dev/null +++ b/level_2/models/test_set.py @@ -0,0 +1,23 @@ +# test_set.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey +from sqlalchemy.orm import relationship +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class TestSet(Base): + __tablename__ = 'test_sets' + + id = Column(Integer, primary_key=True) + user_id = Column(Integer, ForeignKey('users.id'), index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + user = relationship("User", back_populates="test_sets") + test_outputs = relationship("TestOutput", back_populates="test_set", cascade="all, delete-orphan") + + def __repr__(self): + return f"" diff --git a/level_2/models/user.py b/level_2/models/user.py new file mode 100644 index 000000000..87943e33a --- /dev/null +++ b/level_2/models/user.py @@ -0,0 +1,25 @@ +# user.py +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from database.database import Base + + +class User(Base): + __tablename__ = 'users' + + id = Column(Integer, primary_key=True) + name = Column(String, nullable=False, unique=True, index=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + memories = relationship("Memory", back_populates="user", cascade="all, delete-orphan") + sessions = relationship("Session", back_populates="user", cascade="all, delete-orphan") + test_sets = relationship("TestSet", back_populates="user", cascade="all, delete-orphan") + + def __repr__(self): + return f"" diff --git a/level_2/modulators/modulators.py b/level_2/modulators/modulators.py index 4efefb044..5da391cfa 100644 --- a/level_2/modulators/modulators.py +++ b/level_2/modulators/modulators.py @@ -1,5 +1,46 @@ import numpy as np +# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client +import json +from enum import Enum +from io import BytesIO +from typing import Dict, List, Union, Any +import logging +logging.basicConfig(level=logging.INFO) +import marvin +import requests +from deep_translator import GoogleTranslator +from dotenv import load_dotenv +from langchain.agents import initialize_agent, AgentType +from langchain.document_loaders import PyPDFLoader +from langchain.output_parsers import PydanticOutputParser +from langchain.retrievers import WeaviateHybridSearchRetriever +from langchain.tools import tool +from marvin import ai_classifier +from pydantic import parse_obj_as +from weaviate.gql.get import HybridFusion +import numpy as np +load_dotenv() +from langchain import OpenAI +from langchain.chat_models import ChatOpenAI +from typing import Optional, Dict, List, Union + +import tracemalloc + +tracemalloc.start() + +import os +from datetime import datetime +from langchain import PromptTemplate +from langchain.chains.openai_functions import create_structured_output_chain +from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate +from langchain.embeddings.openai import OpenAIEmbeddings +from pydantic import BaseModel, Field +from dotenv import load_dotenv +from langchain.schema import Document, SystemMessage, HumanMessage +import uuid +import humanize +import weaviate class DifferentiableLayer: def __init__(self, attention_modulators: dict): @@ -30,3 +71,169 @@ class DifferentiableLayer: async def get_weights(self): return self.weights + + + async def _summarizer(self, text: str, document:str, max_tokens: int = 1200): + """Summarize text using OpenAI API, to reduce amount of code for modulators contributing to context""" + class Summaries(BaseModel): + """Schema for documentGroups""" + summary: str = Field( + ..., + description="Summarized document") + class SummaryContextList(BaseModel): + """Buffer raw context processed by the buffer""" + + summaries: List[Summaries] = Field(..., description="List of summaries") + observation: str = Field(..., description="The original user query") + + parser = PydanticOutputParser(pydantic_object=SummaryContextList) + prompt = PromptTemplate( + template=" \n{format_instructions}\nSummarize the observation briefly based on the user query, observation is: {query}\n. The document is: {document}", + input_variables=["query", "document"], + partial_variables={"format_instructions": parser.get_format_instructions()}, + ) + + _input = prompt.format_prompt(query=text, document=document) + document_context_result = self.llm_base(_input.to_string()) + document_context_result_parsed = parser.parse(document_context_result) + document_context_result_parsed = json.loads(document_context_result_parsed.json()) + document_summary = document_context_result_parsed["summaries"][0]["summary"] + + return document_summary + + async def memory_route(self, text_time_diff: str): + @ai_classifier + class MemoryRoute(Enum): + """Represents classifer for freshness of memories""" + + data_uploaded_now = "1" + data_uploaded_very_recently = "0.9" + data_uploaded_recently = "0.7" + data_uploaded_more_than_a_month_ago = "0.5" + data_uploaded_more_than_three_months_ago = "0.3" + data_uploaded_more_than_six_months_ago = "0.1" + + namespace = MemoryRoute(str(text_time_diff)) + + return namespace + + async def freshness(self, observation: str, namespace: str = None, memory=None) -> list[str]: + """Freshness - Score between 0 and 1 on how often was the information updated in episodic or semantic memory in the past""" + logging.info("Starting with Freshness") + + lookup_value = await self.fetch_memories( + observation=observation, namespace=namespace + ) + unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"][ + "lastUpdateTimeUnix" + ] + + # Convert Unix timestamp to datetime + last_update_datetime = datetime.fromtimestamp(int(unix_t) / 1000) + time_difference = datetime.now() - last_update_datetime + time_difference_text = humanize.naturaltime(time_difference) + namespace_ = await self.memory_route(str(time_difference_text)) + return [namespace_.value, lookup_value] + + async def frequency(self, observation: str, namespace: str, memory) -> list[str]: + """Frequency - Score between 0 and 1 on how often was the information processed in episodic memory in the past + Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory + """ + logging.info("Starting with Frequency") + weaviate_client = self.init_client(namespace=namespace) + + result_output = await self.fetch_memories( + observation=observation, params=None, namespace=namespace + ) + number_of_relevant_events = len(result_output["data"]["Get"]["EPISODICMEMORY"]) + number_of_total_events = ( + weaviate_client.query.aggregate(namespace).with_meta_count().do() + ) + frequency = float(number_of_relevant_events) / float( + number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][ + "count" + ] + ) + summary = await self._summarizer(text=observation, document=result_output["data"]["Get"]["EPISODICMEMORY"][0]) + logging.info("Frequency summary is %s", str(summary)) + return [str(frequency), summary] + + async def repetition(self, observation: str, namespace: str, memory) -> list[str]: + """Repetition - Score between 0 and 1 based on how often and at what intervals a memory has been revisited. + Accounts for the spacing effect, where memories accessed at increasing intervals are given higher scores. + # TO DO -> add metadata column to make sure that the access is not equal to update, and run update vector function each time a memory is accessed + """ + logging.info("Starting with Repetition") + + result_output = await self.fetch_memories( + observation=observation, params=None, namespace=namespace + ) + + access_times = result_output["data"]["Get"]["EPISODICMEMORY"][0]["_additional"]["lastUpdateTimeUnix"] + # Calculate repetition score based on access times + if not access_times or len(access_times) == 1: + return ["0", result_output["data"]["Get"]["EPISODICMEMORY"][0]] + + # Sort access times + access_times = sorted(access_times) + # Calculate intervals between consecutive accesses + intervals = [access_times[i + 1] - access_times[i] for i in range(len(access_times) - 1)] + # A simple scoring mechanism: Longer intervals get higher scores, as they indicate spaced repetition + repetition_score = sum([1.0 / (interval + 1) for interval in intervals]) / len(intervals) + summary = await self._summarizer(text = observation, document=result_output["data"]["Get"]["EPISODICMEMORY"][0]) + logging.info("Repetition is %s", str(repetition_score)) + logging.info("Repetition summary is %s", str(summary)) + return [str(repetition_score), summary] + + async def relevance(self, observation: str, namespace: str, memory) -> list[str]: + """ + Fetches the fusion relevance score for a given observation from the episodic memory. + Learn more about fusion scores here on Weaviate docs: https://weaviate.io/blog/hybrid-search-fusion-algorithms + Parameters: + - observation: The user's query or observation. + - namespace: The namespace for the data. + + Returns: + - The relevance score between 0 and 1. + """ + logging.info("Starting with Relevance") + score = memory["_additional"]["score"] + logging.info("Relevance is %s", str(score)) + return [score, "fusion score"] + + async def saliency(self, observation: str, namespace=None, memory=None) -> list[str]: + """Determines saliency by scoring the set of retrieved documents against each other and trying to determine saliency + """ + logging.info("Starting with Saliency") + class SaliencyRawList(BaseModel): + """Schema for documentGroups""" + summary: str = Field( + ..., + description="Summarized document") + saliency_score: str = Field( + None, description="The score between 0 and 1") + class SailencyContextList(BaseModel): + """Buffer raw context processed by the buffer""" + + docs: List[SaliencyRawList] = Field(..., description="List of docs") + observation: str = Field(..., description="The original user query") + + parser = PydanticOutputParser(pydantic_object=SailencyContextList) + prompt = PromptTemplate( + template="Determine saliency of documents compared to the other documents retrieved \n{format_instructions}\nSummarize the observation briefly based on the user query, observation is: {query}\n", + input_variables=["query"], + partial_variables={"format_instructions": parser.get_format_instructions()}, + ) + + _input = prompt.format_prompt(query=observation) + document_context_result = self.llm_base(_input.to_string()) + document_context_result_parsed = parser.parse(document_context_result) + document_context_result_parsed = json.loads(document_context_result_parsed.json()) + saliency_score = document_context_result_parsed["docs"][0]["saliency_score"] + saliency_values = document_context_result_parsed["docs"][0]["summary"] + + logging.info("Saliency is %s", str(saliency_score)) + logging.info("Saliency summary is %s", str(saliency_values)) + + return [saliency_score, saliency_values] + diff --git a/level_2/schema/semantic/semantic_schema.py b/level_2/schema/semantic/semantic_schema.py index 25ef3ed1b..94580980e 100644 --- a/level_2/schema/semantic/semantic_schema.py +++ b/level_2/schema/semantic/semantic_schema.py @@ -6,17 +6,17 @@ class DocumentMetadataSchemaV1(Schema): ltm_memory_id = fields.Str(required=True) st_memory_id = fields.Str(required=True) buffer_id = fields.Str(required=True) - version = fields.Str(missing="") - agreement_id = fields.Str(missing="") - privacy_policy = fields.Str(missing="") - terms_of_service = fields.Str(missing="") - format = fields.Str(missing="") - schema_version = fields.Str(missing="") - checksum = fields.Str(missing="") - owner = fields.Str(missing="") - license = fields.Str(missing="") - validity_start = fields.Str(missing="") - validity_end = fields.Str(missing="") + version = fields.Str(load_default="") + agreement_id = fields.Str(load_default="") + privacy_policy = fields.Str(load_default="") + terms_of_service = fields.Str(load_default="") + format = fields.Str(load_default="") + schema_version = fields.Str(load_default="") + checksum = fields.Str(load_default="") + owner = fields.Str(load_default="") + license = fields.Str(load_default="") + validity_start = fields.Str(load_default="") + validity_end = fields.Str(load_default="") class DocumentMetadataSchemaV2(Schema): user_id = fields.Str(required=True) @@ -24,18 +24,18 @@ class DocumentMetadataSchemaV2(Schema): ltm_memory_id = fields.Str(required=True) st_memory_id = fields.Str(required=True) buffer_id = fields.Str(required=True) - version = fields.Str(missing="") - agreement_id = fields.Str(missing="") - privacy_policy = fields.Str(missing="") - terms_of_service = fields.Str(missing="") - format = fields.Str(missing="") - schema_version = fields.Str(missing="") - checksum = fields.Str(missing="") - owner = fields.Str(missing="") - license = fields.Str(missing="") - validity_start = fields.Str(missing="") - validity_end = fields.Str(missing="") - random = fields.Str(missing="") + version = fields.Str(load_default="") + agreement_id = fields.Str(load_default="") + privacy_policy = fields.Str(load_default="") + terms_of_service = fields.Str(load_default="") + format = fields.Str(load_default="") + schema_version = fields.Str(load_default="") + checksum = fields.Str(load_default="") + owner = fields.Str(load_default="") + license = fields.Str(load_default="") + validity_start = fields.Str(load_default="") + validity_end = fields.Str(load_default="") + random = fields.Str(load_default="") class DocumentSchema(Schema): metadata = fields.Nested(DocumentMetadataSchemaV1, required=True) diff --git a/level_2/scripts/create_database.py b/level_2/scripts/create_database.py new file mode 100644 index 000000000..e9f0f8d60 --- /dev/null +++ b/level_2/scripts/create_database.py @@ -0,0 +1,65 @@ +import sys +import os + +# this is needed to import classes from other modules +script_dir = os.path.dirname(os.path.abspath(__file__)) +# Get the parent directory of your script and add it to sys.path +parent_dir = os.path.dirname(script_dir) +sys.path.append(parent_dir) + +from database.database import Base, engine +import models.memory +import models.metadatas +import models.operation +import models.sessions +import models.test_output +import models.test_set +import models.user +from sqlalchemy import create_engine, text +import psycopg2 +from dotenv import load_dotenv +load_dotenv() + + +def create_admin_engine(username, password, host): + admin_url = f"postgresql://{username}:{password}@{host}:5432/bubu" + return create_engine(admin_url) + + +def database_exists(username, password, host, db_name): + engine = create_admin_engine(username, password, host) + connection = engine.connect() + query = text(f"SELECT 1 FROM pg_database WHERE datname='{db_name}'") + result = connection.execute(query).fetchone() + connection.close() + engine.dispose() + return result is not None + + +def create_database(username, password, host, db_name): + engine = create_admin_engine(username, password, host) + connection = engine.raw_connection() + connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cursor = connection.cursor() + cursor.execute(f"CREATE DATABASE {db_name}") + cursor.close() + connection.close() + engine.dispose() + + +def create_tables(): + Base.metadata.create_all(bind=engine) + + +if __name__ == "__main__": + username = os.getenv('POSTGRES_USER') + password = os.getenv('POSTGRES_PASSWORD') + database_name = os.getenv('POSTGRES_DB') + host = os.getenv('POSTGRES_HOST') + + if not database_exists(username, password, host, database_name): + print(f"Database {database_name} does not exist. Creating...") + create_database(username, password, host, database_name) + print(f"Database {database_name} created successfully.") + + create_tables() \ No newline at end of file diff --git a/level_2/vectordb/vectordb.py b/level_2/vectordb/vectordb.py index 4f3524e90..4a6fdf3d9 100644 --- a/level_2/vectordb/vectordb.py +++ b/level_2/vectordb/vectordb.py @@ -5,11 +5,11 @@ from io import BytesIO import sys import os - -from marshmallow import Schema, fields -from level_2.loaders.loaders import _document_loader -# Add the parent directory to sys.path sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from marshmallow import Schema, fields +from loaders.loaders import _document_loader +# Add the parent directory to sys.path + logging.basicConfig(level=logging.INFO) import marvin @@ -23,7 +23,7 @@ import os from datetime import datetime from langchain.embeddings.openai import OpenAIEmbeddings from dotenv import load_dotenv -from level_2.schema.semantic.semantic_schema import DocumentSchema, SCHEMA_VERSIONS, DocumentMetadataSchemaV1 +from schema.semantic.semantic_schema import DocumentSchema, SCHEMA_VERSIONS, DocumentMetadataSchemaV1 from langchain.schema import Document import weaviate