diff --git a/cognee/infrastructure/databases/relational/config.py b/cognee/infrastructure/databases/relational/config.py index f0ddfc01d..e76a15cd3 100644 --- a/cognee/infrastructure/databases/relational/config.py +++ b/cognee/infrastructure/databases/relational/config.py @@ -11,9 +11,11 @@ class RelationalConfig(BaseSettings): db_port: str = "5432" db_user: str = "cognee" db_password: str = "cognee" - database_engine: object = create_relational_engine(db_path, db_name) + db_provider: str = "duckdb" + database_engine: object = create_relational_engine(db_path, db_name, db_provider) db_file_path: str = os.path.join(db_path, db_name) + model_config = SettingsConfigDict(env_file = ".env", extra = "allow") def create_engine(self): @@ -29,6 +31,7 @@ class RelationalConfig(BaseSettings): "db_user": self.db_user, "db_password": self.db_password, "db_engine": self.database_engine, + "db_provider": self.db_provider, } @lru_cache diff --git a/cognee/infrastructure/databases/relational/create_relational_engine.py b/cognee/infrastructure/databases/relational/create_relational_engine.py index 416152f59..2d4d961e8 100644 --- a/cognee/infrastructure/databases/relational/create_relational_engine.py +++ b/cognee/infrastructure/databases/relational/create_relational_engine.py @@ -1,10 +1,33 @@ -from cognee.infrastructure.files.storage import LocalStorage -from cognee.infrastructure.databases.relational import DuckDBAdapter +from enum import Enum -def create_relational_engine(db_path: str, db_name: str): +from cognee.infrastructure.databases.relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter +from cognee.infrastructure.files.storage import LocalStorage +from cognee.infrastructure.databases.relational import DuckDBAdapter, get_relationaldb_config + + +class DBProvider(Enum): + DUCKDB = "duckdb" + POSTGRES = "postgres" + + + +def create_relational_engine(db_path: str, db_name: str, db_provider:str): LocalStorage.ensure_directory_exists(db_path) - return DuckDBAdapter( - db_name = db_name, - db_path = db_path, - ) + llm_config = get_relationaldb_config() + + provider = DBProvider(llm_config.llm_provider) + + + if provider == DBProvider.DUCKDB: + + return DuckDBAdapter( + db_name = db_name, + db_path = db_path, + ) + elif provider == DBProvider.POSTGRES: + return SQLAlchemyAdapter( + db_name = db_name, + db_path = db_path, + db_type = db_provider, + ) \ No newline at end of file diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py new file mode 100644 index 000000000..292eaea78 --- /dev/null +++ b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py @@ -0,0 +1,125 @@ +import os +from sqlalchemy import create_engine, MetaData, Table, Column, String, Boolean, TIMESTAMP, text +from sqlalchemy.orm import sessionmaker + +class SQLAlchemyAdapter(): + def __init__(self, db_type: str, db_path: str, db_name: str): + self.db_location = os.path.abspath(os.path.join(db_path, db_name)) + self.engine = create_engine(f"{db_type}:///{self.db_location}") + self.Session = sessionmaker(bind=self.engine) + + def get_datasets(self): + with self.engine.connect() as connection: + result = connection.execute(text("SELECT DISTINCT schema_name FROM information_schema.tables;")) + tables = [row['schema_name'] for row in result] + return list( + filter( + lambda schema_name: not schema_name.endswith("staging") and schema_name != "cognee", + tables + ) + ) + + def get_files_metadata(self, dataset_name: str): + with self.engine.connect() as connection: + result = connection.execute(text(f"SELECT id, name, file_path, extension, mime_type FROM {dataset_name}.file_metadata;")) + return [dict(row) for row in result] + + def create_table(self, schema_name: str, table_name: str, table_config: list[dict]): + fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config] + with self.engine.connect() as connection: + connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name};")) + connection.execute(text(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({', '.join(fields_query_parts)});")) + + def delete_table(self, table_name: str): + with self.engine.connect() as connection: + connection.execute(text(f"DROP TABLE IF EXISTS {table_name};")) + + def insert_data(self, schema_name: str, table_name: str, data: list[dict]): + columns = ", ".join(data[0].keys()) + values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data]) + insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};") + with self.engine.connect() as connection: + connection.execute(insert_query, data) + + def get_data(self, table_name: str, filters: dict = None): + with self.engine.connect() as connection: + query = f"SELECT * FROM {table_name}" + if filters: + filter_conditions = " AND ".join([ + f"{key} IN ({', '.join([f':{key}{i}' for i in range(len(value))])})" if isinstance(value, list) + else f"{key} = :{key}" for key, value in filters.items() + ]) + query += f" WHERE {filter_conditions};" + query = text(query) + results = connection.execute(query, filters) + else: + query += ";" + query = text(query) + results = connection.execute(query) + return {result["data_id"]: result["status"] for result in results} + + def execute_query(self, query): + with self.engine.connect() as connection: + result = connection.execute(text(query)) + return [dict(row) for row in result] + + def load_cognify_data(self, data): + metadata = MetaData() + cognify_table = Table( + 'cognify', metadata, + Column('document_id', String), + Column('layer_id', String), + Column('created_at', TIMESTAMP, server_default=text('CURRENT_TIMESTAMP')), + Column('updated_at', TIMESTAMP, nullable=True, default=None), + Column('processed', Boolean, default=False), + Column('document_id_target', String, nullable=True) + ) + metadata.create_all(self.engine) + insert_query = cognify_table.insert().values(document_id=text(':document_id'), layer_id=text(':layer_id')) + with self.engine.connect() as connection: + connection.execute(insert_query, data) + + def fetch_cognify_data(self, excluded_document_id: str): + with self.engine.connect() as connection: + connection.execute(text(""" + CREATE TABLE IF NOT EXISTS cognify ( + document_id STRING, + layer_id STRING, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT NULL, + processed BOOLEAN DEFAULT FALSE, + document_id_target STRING NULL + ); + """)) + query = text(f""" + SELECT document_id, layer_id, created_at, updated_at, processed + FROM cognify + WHERE document_id != :excluded_document_id AND processed = FALSE; + """) + records = connection.execute(query, {'excluded_document_id': excluded_document_id}).fetchall() + if records: + document_ids = tuple(record['document_id'] for record in records) + update_query = text(f"UPDATE cognify SET processed = TRUE WHERE document_id IN :document_ids;") + connection.execute(update_query, {'document_ids': document_ids}) + return [dict(record) for record in records] + + def delete_cognify_data(self): + with self.engine.connect() as connection: + connection.execute(text(""" + CREATE TABLE IF NOT EXISTS cognify ( + document_id STRING, + layer_id STRING, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT NULL, + processed BOOLEAN DEFAULT FALSE, + document_id_target STRING NULL + ); + """)) + connection.execute(text("DELETE FROM cognify;")) + connection.execute(text("DROP TABLE cognify;")) + + def delete_database(self): + from cognee.infrastructure.files.storage import LocalStorage + LocalStorage.remove(self.db_location) + if LocalStorage.file_exists(self.db_location + ".wal"): + LocalStorage.remove(self.db_location + ".wal") diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/__init__.py b/cognee/infrastructure/databases/relational/sqlalchemy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py b/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py index 63bc2e94a..ef87a752c 100644 --- a/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py +++ b/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py @@ -5,6 +5,9 @@ from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase +from cognee.infrastructure.databases.relational import get_relationaldb_config +from cognee.infrastructure.databases.relational.create_relational_engine import create_relational_engine + DATABASE_URL = "sqlite+aiosqlite:///./test.db" @@ -17,8 +20,10 @@ class User(SQLAlchemyBaseUserTableUUID, Base): +llm_config = get_relationaldb_config() -engine = create_async_engine(DATABASE_URL) + +engine = create_relational_engine(llm_config.db_path, llm_config.db_name, llm_config.db_provider) async_session_maker = async_sessionmaker(engine, expire_on_commit=False)