diff --git a/.github/workflows/graph_db_tests.yml b/.github/workflows/graph_db_tests.yml index 0fc1ae214..2b3a4dd58 100644 --- a/.github/workflows/graph_db_tests.yml +++ b/.github/workflows/graph_db_tests.yml @@ -75,6 +75,7 @@ jobs: EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + GRAPH_DATABASE_PROVIDER: "neo4j" GRAPH_DATABASE_URL: ${{ secrets.NEO4J_API_URL }} GRAPH_DATABASE_PASSWORD: ${{ secrets.NEO4J_API_KEY }} GRAPH_DATABASE_USERNAME: "neo4j" diff --git a/.github/workflows/relational_db_migration_tests.yml b/.github/workflows/relational_db_migration_tests.yml new file mode 100644 index 000000000..2c5688313 --- /dev/null +++ b/.github/workflows/relational_db_migration_tests.yml @@ -0,0 +1,215 @@ +name: Relational DB Migration Tests + +on: + workflow_call: + inputs: + python-version: + required: false + type: string + default: '3.11.x' + secrets: + LLM_MODEL: + required: true + LLM_ENDPOINT: + required: true + LLM_API_KEY: + required: true + LLM_API_VERSION: + required: true + EMBEDDING_MODEL: + required: true + EMBEDDING_ENDPOINT: + required: true + EMBEDDING_API_KEY: + required: true + EMBEDDING_API_VERSION: + required: true + OPENAI_API_KEY: + required: true + GRAPHISTRY_USERNAME: + required: true + GRAPHISTRY_PASSWORD: + required: true + +jobs: + run-relational-db-migration-test-networkx: + name: NetworkX Relational DB Migration Test + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: cognee + POSTGRES_PASSWORD: cognee + POSTGRES_DB: test_migration_db + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Install specific db dependency + run: | + poetry install -E postgres + + - name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql) + env: + PGPASSWORD: cognee + run: | + # Wait until the PostgreSQL service is available + until pg_isready -h localhost -p 5432; do + echo "Waiting for postgres..." + sleep 2 + done + + # Execute the SQL script against the test_migration_db database + psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql + + - name: Run relational db test + env: + ENV: 'dev' + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: poetry run python ./cognee/tests/test_relational_db_migration.py + + run-relational-db-migration-test-kuzu: + name: Kuzu Relational DB Migration Test + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: cognee + POSTGRES_PASSWORD: cognee + POSTGRES_DB: test_migration_db + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Install specific db dependency + run: | + poetry install -E postgres -E kuzu + + - name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql) + env: + PGPASSWORD: cognee + run: | + # Wait until the PostgreSQL service is available + until pg_isready -h localhost -p 5432; do + echo "Waiting for postgres..." + sleep 2 + done + + # Execute the SQL script against the test_migration_db database + psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql + + - name: Run relational db test + env: + ENV: 'dev' + GRAPH_DATABASE_PROVIDER: 'kuzu' + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: poetry run python ./cognee/tests/test_relational_db_migration.py + + run-relational-db-migration-test-neo4j: + name: Neo4j Relational DB Migration Test + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: cognee + POSTGRES_PASSWORD: cognee + POSTGRES_DB: test_migration_db + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Install specific db dependency + run: | + poetry install -E postgres -E neo4j + + - name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql) + env: + PGPASSWORD: cognee + run: | + # Wait until the PostgreSQL service is available + until pg_isready -h localhost -p 5432; do + echo "Waiting for postgres..." + sleep 2 + done + + # Execute the SQL script against the test_migration_db database + psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql + + - name: Run relational db test + env: + ENV: 'dev' + GRAPH_DATABASE_PROVIDER: "neo4j" + GRAPH_DATABASE_URL: ${{ secrets.NEO4J_API_URL }} + GRAPH_DATABASE_PASSWORD: ${{ secrets.NEO4J_API_KEY }} + GRAPH_DATABASE_USERNAME: "neo4j" + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: poetry run python ./cognee/tests/test_relational_db_migration.py diff --git a/.github/workflows/test-suites.yml b/.github/workflows/test_suites.yml similarity index 90% rename from .github/workflows/test-suites.yml rename to .github/workflows/test_suites.yml index b7d7d9bb2..05e6d1f80 100644 --- a/.github/workflows/test-suites.yml +++ b/.github/workflows/test_suites.yml @@ -45,6 +45,12 @@ jobs: uses: ./.github/workflows/graph_db_tests.yml secrets: inherit + relational-db-migration-tests: + name: Relational DB Migration Tests + needs: [ basic-tests, e2e-tests ] + uses: ./.github/workflows/relational_db_migration_tests.yml + secrets: inherit + notebook-tests: name: Notebook Tests needs: [basic-tests, e2e-tests] @@ -90,6 +96,7 @@ jobs: vector-db-tests, example-tests, gemini-tests, + relational-db-migration-tests, docker-compose-test, docker-ci-test, ] @@ -108,6 +115,7 @@ jobs: example-tests, gemini-tests, ollama-tests, + relational-db-migration-tests, docker-compose-test, docker-ci-test, ] @@ -123,6 +131,7 @@ jobs: "${{ needs.python-version-tests.result }}" == "success" && "${{ needs.vector-db-tests.result }}" == "success" && "${{ needs.example-tests.result }}" == "success" && + "${{ needs.relational-db-migration-tests.result }}" == "success" && "${{ needs.gemini-tests.result }}" == "success" && "${{ needs.docker-compose-test.result }}" == "success" && "${{ needs.docker-ci-test.result }}" == "success" && diff --git a/cognee/api/v1/config/config.py b/cognee/api/v1/config/config.py index 2f7b406a8..a24a7bb92 100644 --- a/cognee/api/v1/config/config.py +++ b/cognee/api/v1/config/config.py @@ -8,7 +8,7 @@ from cognee.infrastructure.data.chunking.config import get_chunk_config from cognee.infrastructure.databases.vector import get_vectordb_config from cognee.infrastructure.databases.graph.config import get_graph_config from cognee.infrastructure.llm.config import get_llm_config -from cognee.infrastructure.databases.relational import get_relational_config +from cognee.infrastructure.databases.relational import get_relational_config, get_migration_config from cognee.infrastructure.files.storage import LocalStorage @@ -131,6 +131,20 @@ class config: message=f"'{key}' is not a valid attribute of the config." ) + @staticmethod + def set_migration_db_config(config_dict: dict): + """ + Updates the relational db config with values from config_dict. + """ + migration_db_config = get_migration_config() + for key, value in config_dict.items(): + if hasattr(migration_db_config, key): + object.__setattr__(migration_db_config, key, value) + else: + raise InvalidAttributeError( + message=f"'{key}' is not a valid attribute of the config." + ) + @staticmethod def set_graph_db_config(config_dict: dict) -> None: """ diff --git a/cognee/infrastructure/databases/relational/config.py b/cognee/infrastructure/databases/relational/config.py index 0b3759ba8..bfa07b174 100644 --- a/cognee/infrastructure/databases/relational/config.py +++ b/cognee/infrastructure/databases/relational/config.py @@ -35,12 +35,12 @@ def get_relational_config(): class MigrationConfig(BaseSettings): migration_db_path: Union[str, None] = None - migration_db_name: str = None + migration_db_name: Union[str, None] = None migration_db_host: Union[str, None] = None migration_db_port: Union[str, None] = None migration_db_username: Union[str, None] = None migration_db_password: Union[str, None] = None - migration_db_provider: str = None + migration_db_provider: Union[str, None] = None model_config = SettingsConfigDict(env_file=".env", extra="allow") diff --git a/cognee/tasks/ingestion/migrate_relational_database.py b/cognee/tasks/ingestion/migrate_relational_database.py index ff282f0b7..946b5d721 100644 --- a/cognee/tasks/ingestion/migrate_relational_database.py +++ b/cognee/tasks/ingestion/migrate_relational_database.py @@ -1,5 +1,4 @@ import logging -from decimal import Decimal from uuid import uuid5, NAMESPACE_OID from sqlalchemy import text from cognee.infrastructure.databases.relational.get_migration_relational_engine import ( @@ -9,8 +8,6 @@ from cognee.infrastructure.databases.relational.get_migration_relational_engine from cognee.tasks.storage.index_data_points import index_data_points from cognee.tasks.storage.index_graph_edges import index_graph_edges -from uuid import uuid4 - from cognee.modules.engine.models import TableRow, TableType logger = logging.getLogger(__name__) @@ -148,12 +145,33 @@ async def migrate_relational_database(graph_db, schema): ) ) + def _remove_duplicate_edges(edge_mapping): + seen = set() + unique_original_shape = [] + + for tup in edge_mapping: + # We go through all the tuples in the edge_mapping and we only add unique tuples to the list + # To eliminate duplicate edges. + source_id, target_id, rel_name, rel_dict = tup + # We need to convert the dictionary to a frozenset to be able to compare values for it + rel_dict_hashable = frozenset(sorted(rel_dict.items())) + hashable_tup = (source_id, target_id, rel_name, rel_dict_hashable) + + # We use the seen set to keep track of unique edges + if hashable_tup not in seen: + # A list that has frozensets elements instead of dictionaries is needed to be able to compare values + seen.add(hashable_tup) + # append the original tuple shape (with the dictionary) if it's the first time we see it + unique_original_shape.append(tup) + + return unique_original_shape + # Add all nodes and edges to the graph # NOTE: Nodes and edges have to be added in batch for speed optimization, Especially for NetworkX. # If we'd create nodes and add them to graph in real time the process would take too long. # Every node and edge added to NetworkX is saved to file which is very slow when not done in batches. await graph_db.add_nodes(list(node_mapping.values())) - await graph_db.add_edges(edge_mapping) + await graph_db.add_edges(_remove_duplicate_edges(edge_mapping)) # In these steps we calculate the vector embeddings of our nodes and edges and save them to vector database # Cognee uses this information to perform searches on the knowledge graph. diff --git a/cognee/tests/test_data/Chinook_PostgreSql.sql b/cognee/tests/test_data/Chinook_PostgreSql.sql index d8bc13acb..56ace7018 100644 --- a/cognee/tests/test_data/Chinook_PostgreSql.sql +++ b/cognee/tests/test_data/Chinook_PostgreSql.sql @@ -16,16 +16,16 @@ /******************************************************************************* Drop database if it exists ********************************************************************************/ -DROP DATABASE IF EXISTS chinook; +DROP DATABASE IF EXISTS test_migration_db; /******************************************************************************* Create database ********************************************************************************/ -CREATE DATABASE chinook; +CREATE DATABASE test_migration_db; -\c chinook; +\c test_migration_db; /******************************************************************************* @@ -15967,4 +15967,3 @@ COMMIT; /* Optionally reclaim space if large # of rows removed: VACUUM FULL; */ - diff --git a/cognee/tests/test_relational_db_migration.py b/cognee/tests/test_relational_db_migration.py index 2fb78e6cf..556a83b7a 100644 --- a/cognee/tests/test_relational_db_migration.py +++ b/cognee/tests/test_relational_db_migration.py @@ -1,6 +1,5 @@ import json -import pytest -import pytest_asyncio +import pathlib import os from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.relational import ( @@ -27,7 +26,6 @@ def normalize_node_name(node_name: str) -> str: return node_name -@pytest_asyncio.fixture() async def setup_test_db(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) @@ -35,14 +33,13 @@ async def setup_test_db(): await create_relational_db_and_tables() await create_pgvector_db_and_tables() - relational_engine = get_migration_relational_engine() - return relational_engine + migration_engine = get_migration_relational_engine() + return migration_engine -@pytest.mark.asyncio -async def test_relational_db_migration(setup_test_db): - relational_engine = setup_test_db - schema = await relational_engine.extract_schema() +async def relational_db_migration(): + migration_engine = await setup_test_db() + schema = await migration_engine.extract_schema() graph_engine = await get_graph_engine() await migrate_relational_database(graph_engine, schema=schema) @@ -56,8 +53,8 @@ async def test_relational_db_migration(setup_test_db): # 2. Assert that the search results contain "AC/DC" assert any("AC/DC" in r for r in search_results), "AC/DC not found in search results!" - relational_db_provider = os.getenv("MIGRATION_DB_PROVIDER", "sqlite").lower() - if relational_db_provider == "postgres": + migration_db_provider = migration_engine.engine.dialect.name + if migration_db_provider == "postgresql": relationship_label = "reports_to" else: relationship_label = "ReportsTo" @@ -119,7 +116,7 @@ async def test_relational_db_migration(setup_test_db): found_edges.add((src_name, tgt_name)) distinct_node_names.update([src_name, tgt_name]) else: - pytest.fail(f"Unsupported graph database provider: {graph_db_provider}") + raise ValueError(f"Unsupported graph database provider: {graph_db_provider}") assert len(distinct_node_names) == 8, ( f"Expected 8 distinct node references, found {len(distinct_node_names)}" @@ -139,7 +136,7 @@ async def test_relational_db_migration(setup_test_db): assert e in found_edges, f"Edge {e} not found in the actual '{relationship_label}' edges!" # 4. Verify the total number of nodes and edges in the graph - if relational_db_provider == "sqlite": + if migration_db_provider == "sqlite": if graph_db_provider == "neo4j": query_str = """ MATCH (n) @@ -154,11 +151,11 @@ async def test_relational_db_migration(setup_test_db): elif graph_db_provider == "kuzu": query_nodes = "MATCH (n:Node) RETURN count(n) as c" rows_n = await graph_engine.query(query_nodes) - node_count = rows_n[0]["c"] + node_count = rows_n[0][0] query_edges = "MATCH (n:Node)-[r:EDGE]->(m:Node) RETURN count(r) as c" rows_e = await graph_engine.query(query_edges) - edge_count = rows_e[0]["c"] + edge_count = rows_e[0][0] elif graph_db_provider == "networkx": nodes, edges = await graph_engine.get_graph_data() @@ -170,7 +167,7 @@ async def test_relational_db_migration(setup_test_db): assert node_count == 227, f"Expected 227 nodes, got {node_count}" assert edge_count == 580, f"Expected 580 edges, got {edge_count}" - elif relational_db_provider == "postgres": + elif migration_db_provider == "postgresql": if graph_db_provider == "neo4j": query_str = """ MATCH (n) @@ -185,11 +182,11 @@ async def test_relational_db_migration(setup_test_db): elif graph_db_provider == "kuzu": query_nodes = "MATCH (n:Node) RETURN count(n) as c" rows_n = await graph_engine.query(query_nodes) - node_count = rows_n[0]["c"] + node_count = rows_n[0][0] query_edges = "MATCH (n:Node)-[r:EDGE]->(m:Node) RETURN count(r) as c" rows_e = await graph_engine.query(query_edges) - edge_count = rows_e[0]["c"] + edge_count = rows_e[0][0] elif graph_db_provider == "networkx": nodes, edges = await graph_engine.get_graph_data() @@ -204,3 +201,45 @@ async def test_relational_db_migration(setup_test_db): print(f"Node & edge count validated: node_count={node_count}, edge_count={edge_count}.") print(f"All checks passed for {graph_db_provider} provider with '{relationship_label}' edges!") + + +async def test_migration_sqlite(): + database_to_migrate_path = os.path.join(pathlib.Path(__file__).parent, "test_data/") + + cognee.config.set_migration_db_config( + { + "migration_db_path": database_to_migrate_path, + "migration_db_name": "migration_database.sqlite", + "migration_db_provider": "sqlite", + } + ) + + await relational_db_migration() + + +async def test_migration_postgres(): + # To run test manually you first need to run the Chinook_PostgreSql.sql script in the test_data directory + cognee.config.set_migration_db_config( + { + "migration_db_name": "test_migration_db", + "migration_db_host": "127.0.0.1", + "migration_db_port": "5432", + "migration_db_username": "cognee", + "migration_db_password": "cognee", + "migration_db_provider": "postgres", + } + ) + await relational_db_migration() + + +async def main(): + print("Starting SQLite database migration test...") + await test_migration_sqlite() + print("Starting PostgreSQL database migration test...") + await test_migration_postgres() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main())