tests: Add gh action to test relational db migration [COG-1591] (#718)

<!-- .github/pull_request_template.md -->

## Description
Add relational db migration action 

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-04-11 14:02:44 +02:00 committed by GitHub
parent 228fba8096
commit 22b363b297
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 324 additions and 29 deletions

View file

@ -75,6 +75,7 @@ jobs:
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: "neo4j"
GRAPH_DATABASE_URL: ${{ secrets.NEO4J_API_URL }}
GRAPH_DATABASE_PASSWORD: ${{ secrets.NEO4J_API_KEY }}
GRAPH_DATABASE_USERNAME: "neo4j"

View file

@ -0,0 +1,215 @@
name: Relational DB Migration Tests
on:
workflow_call:
inputs:
python-version:
required: false
type: string
default: '3.11.x'
secrets:
LLM_MODEL:
required: true
LLM_ENDPOINT:
required: true
LLM_API_KEY:
required: true
LLM_API_VERSION:
required: true
EMBEDDING_MODEL:
required: true
EMBEDDING_ENDPOINT:
required: true
EMBEDDING_API_KEY:
required: true
EMBEDDING_API_VERSION:
required: true
OPENAI_API_KEY:
required: true
GRAPHISTRY_USERNAME:
required: true
GRAPHISTRY_PASSWORD:
required: true
jobs:
run-relational-db-migration-test-networkx:
name: NetworkX Relational DB Migration Test
runs-on: ubuntu-22.04
defaults:
run:
shell: bash
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: test_migration_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Install specific db dependency
run: |
poetry install -E postgres
- name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql)
env:
PGPASSWORD: cognee
run: |
# Wait until the PostgreSQL service is available
until pg_isready -h localhost -p 5432; do
echo "Waiting for postgres..."
sleep 2
done
# Execute the SQL script against the test_migration_db database
psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql
- name: Run relational db test
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: poetry run python ./cognee/tests/test_relational_db_migration.py
run-relational-db-migration-test-kuzu:
name: Kuzu Relational DB Migration Test
runs-on: ubuntu-22.04
defaults:
run:
shell: bash
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: test_migration_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Install specific db dependency
run: |
poetry install -E postgres -E kuzu
- name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql)
env:
PGPASSWORD: cognee
run: |
# Wait until the PostgreSQL service is available
until pg_isready -h localhost -p 5432; do
echo "Waiting for postgres..."
sleep 2
done
# Execute the SQL script against the test_migration_db database
psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql
- name: Run relational db test
env:
ENV: 'dev'
GRAPH_DATABASE_PROVIDER: 'kuzu'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: poetry run python ./cognee/tests/test_relational_db_migration.py
run-relational-db-migration-test-neo4j:
name: Neo4j Relational DB Migration Test
runs-on: ubuntu-22.04
defaults:
run:
shell: bash
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: test_migration_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Install specific db dependency
run: |
poetry install -E postgres -E neo4j
- name: Run PostgreSQL Script to create test data (Chinook_PostgreSql.sql)
env:
PGPASSWORD: cognee
run: |
# Wait until the PostgreSQL service is available
until pg_isready -h localhost -p 5432; do
echo "Waiting for postgres..."
sleep 2
done
# Execute the SQL script against the test_migration_db database
psql -h localhost -U cognee -d test_migration_db -f ./cognee/tests/test_data/Chinook_PostgreSql.sql
- name: Run relational db test
env:
ENV: 'dev'
GRAPH_DATABASE_PROVIDER: "neo4j"
GRAPH_DATABASE_URL: ${{ secrets.NEO4J_API_URL }}
GRAPH_DATABASE_PASSWORD: ${{ secrets.NEO4J_API_KEY }}
GRAPH_DATABASE_USERNAME: "neo4j"
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: poetry run python ./cognee/tests/test_relational_db_migration.py

View file

@ -45,6 +45,12 @@ jobs:
uses: ./.github/workflows/graph_db_tests.yml
secrets: inherit
relational-db-migration-tests:
name: Relational DB Migration Tests
needs: [ basic-tests, e2e-tests ]
uses: ./.github/workflows/relational_db_migration_tests.yml
secrets: inherit
notebook-tests:
name: Notebook Tests
needs: [basic-tests, e2e-tests]
@ -90,6 +96,7 @@ jobs:
vector-db-tests,
example-tests,
gemini-tests,
relational-db-migration-tests,
docker-compose-test,
docker-ci-test,
]
@ -108,6 +115,7 @@ jobs:
example-tests,
gemini-tests,
ollama-tests,
relational-db-migration-tests,
docker-compose-test,
docker-ci-test,
]
@ -123,6 +131,7 @@ jobs:
"${{ needs.python-version-tests.result }}" == "success" &&
"${{ needs.vector-db-tests.result }}" == "success" &&
"${{ needs.example-tests.result }}" == "success" &&
"${{ needs.relational-db-migration-tests.result }}" == "success" &&
"${{ needs.gemini-tests.result }}" == "success" &&
"${{ needs.docker-compose-test.result }}" == "success" &&
"${{ needs.docker-ci-test.result }}" == "success" &&

View file

@ -8,7 +8,7 @@ from cognee.infrastructure.data.chunking.config import get_chunk_config
from cognee.infrastructure.databases.vector import get_vectordb_config
from cognee.infrastructure.databases.graph.config import get_graph_config
from cognee.infrastructure.llm.config import get_llm_config
from cognee.infrastructure.databases.relational import get_relational_config
from cognee.infrastructure.databases.relational import get_relational_config, get_migration_config
from cognee.infrastructure.files.storage import LocalStorage
@ -131,6 +131,20 @@ class config:
message=f"'{key}' is not a valid attribute of the config."
)
@staticmethod
def set_migration_db_config(config_dict: dict):
"""
Updates the relational db config with values from config_dict.
"""
migration_db_config = get_migration_config()
for key, value in config_dict.items():
if hasattr(migration_db_config, key):
object.__setattr__(migration_db_config, key, value)
else:
raise InvalidAttributeError(
message=f"'{key}' is not a valid attribute of the config."
)
@staticmethod
def set_graph_db_config(config_dict: dict) -> None:
"""

View file

@ -35,12 +35,12 @@ def get_relational_config():
class MigrationConfig(BaseSettings):
migration_db_path: Union[str, None] = None
migration_db_name: str = None
migration_db_name: Union[str, None] = None
migration_db_host: Union[str, None] = None
migration_db_port: Union[str, None] = None
migration_db_username: Union[str, None] = None
migration_db_password: Union[str, None] = None
migration_db_provider: str = None
migration_db_provider: Union[str, None] = None
model_config = SettingsConfigDict(env_file=".env", extra="allow")

View file

@ -1,5 +1,4 @@
import logging
from decimal import Decimal
from uuid import uuid5, NAMESPACE_OID
from sqlalchemy import text
from cognee.infrastructure.databases.relational.get_migration_relational_engine import (
@ -9,8 +8,6 @@ from cognee.infrastructure.databases.relational.get_migration_relational_engine
from cognee.tasks.storage.index_data_points import index_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from uuid import uuid4
from cognee.modules.engine.models import TableRow, TableType
logger = logging.getLogger(__name__)
@ -148,12 +145,33 @@ async def migrate_relational_database(graph_db, schema):
)
)
def _remove_duplicate_edges(edge_mapping):
seen = set()
unique_original_shape = []
for tup in edge_mapping:
# We go through all the tuples in the edge_mapping and we only add unique tuples to the list
# To eliminate duplicate edges.
source_id, target_id, rel_name, rel_dict = tup
# We need to convert the dictionary to a frozenset to be able to compare values for it
rel_dict_hashable = frozenset(sorted(rel_dict.items()))
hashable_tup = (source_id, target_id, rel_name, rel_dict_hashable)
# We use the seen set to keep track of unique edges
if hashable_tup not in seen:
# A list that has frozensets elements instead of dictionaries is needed to be able to compare values
seen.add(hashable_tup)
# append the original tuple shape (with the dictionary) if it's the first time we see it
unique_original_shape.append(tup)
return unique_original_shape
# Add all nodes and edges to the graph
# NOTE: Nodes and edges have to be added in batch for speed optimization, Especially for NetworkX.
# If we'd create nodes and add them to graph in real time the process would take too long.
# Every node and edge added to NetworkX is saved to file which is very slow when not done in batches.
await graph_db.add_nodes(list(node_mapping.values()))
await graph_db.add_edges(edge_mapping)
await graph_db.add_edges(_remove_duplicate_edges(edge_mapping))
# In these steps we calculate the vector embeddings of our nodes and edges and save them to vector database
# Cognee uses this information to perform searches on the knowledge graph.

View file

@ -16,16 +16,16 @@
/*******************************************************************************
Drop database if it exists
********************************************************************************/
DROP DATABASE IF EXISTS chinook;
DROP DATABASE IF EXISTS test_migration_db;
/*******************************************************************************
Create database
********************************************************************************/
CREATE DATABASE chinook;
CREATE DATABASE test_migration_db;
\c chinook;
\c test_migration_db;
/*******************************************************************************
@ -15967,4 +15967,3 @@ COMMIT;
/* Optionally reclaim space if large # of rows removed:
VACUUM FULL;
*/

View file

@ -1,6 +1,5 @@
import json
import pytest
import pytest_asyncio
import pathlib
import os
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import (
@ -27,7 +26,6 @@ def normalize_node_name(node_name: str) -> str:
return node_name
@pytest_asyncio.fixture()
async def setup_test_db():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
@ -35,14 +33,13 @@ async def setup_test_db():
await create_relational_db_and_tables()
await create_pgvector_db_and_tables()
relational_engine = get_migration_relational_engine()
return relational_engine
migration_engine = get_migration_relational_engine()
return migration_engine
@pytest.mark.asyncio
async def test_relational_db_migration(setup_test_db):
relational_engine = setup_test_db
schema = await relational_engine.extract_schema()
async def relational_db_migration():
migration_engine = await setup_test_db()
schema = await migration_engine.extract_schema()
graph_engine = await get_graph_engine()
await migrate_relational_database(graph_engine, schema=schema)
@ -56,8 +53,8 @@ async def test_relational_db_migration(setup_test_db):
# 2. Assert that the search results contain "AC/DC"
assert any("AC/DC" in r for r in search_results), "AC/DC not found in search results!"
relational_db_provider = os.getenv("MIGRATION_DB_PROVIDER", "sqlite").lower()
if relational_db_provider == "postgres":
migration_db_provider = migration_engine.engine.dialect.name
if migration_db_provider == "postgresql":
relationship_label = "reports_to"
else:
relationship_label = "ReportsTo"
@ -119,7 +116,7 @@ async def test_relational_db_migration(setup_test_db):
found_edges.add((src_name, tgt_name))
distinct_node_names.update([src_name, tgt_name])
else:
pytest.fail(f"Unsupported graph database provider: {graph_db_provider}")
raise ValueError(f"Unsupported graph database provider: {graph_db_provider}")
assert len(distinct_node_names) == 8, (
f"Expected 8 distinct node references, found {len(distinct_node_names)}"
@ -139,7 +136,7 @@ async def test_relational_db_migration(setup_test_db):
assert e in found_edges, f"Edge {e} not found in the actual '{relationship_label}' edges!"
# 4. Verify the total number of nodes and edges in the graph
if relational_db_provider == "sqlite":
if migration_db_provider == "sqlite":
if graph_db_provider == "neo4j":
query_str = """
MATCH (n)
@ -154,11 +151,11 @@ async def test_relational_db_migration(setup_test_db):
elif graph_db_provider == "kuzu":
query_nodes = "MATCH (n:Node) RETURN count(n) as c"
rows_n = await graph_engine.query(query_nodes)
node_count = rows_n[0]["c"]
node_count = rows_n[0][0]
query_edges = "MATCH (n:Node)-[r:EDGE]->(m:Node) RETURN count(r) as c"
rows_e = await graph_engine.query(query_edges)
edge_count = rows_e[0]["c"]
edge_count = rows_e[0][0]
elif graph_db_provider == "networkx":
nodes, edges = await graph_engine.get_graph_data()
@ -170,7 +167,7 @@ async def test_relational_db_migration(setup_test_db):
assert node_count == 227, f"Expected 227 nodes, got {node_count}"
assert edge_count == 580, f"Expected 580 edges, got {edge_count}"
elif relational_db_provider == "postgres":
elif migration_db_provider == "postgresql":
if graph_db_provider == "neo4j":
query_str = """
MATCH (n)
@ -185,11 +182,11 @@ async def test_relational_db_migration(setup_test_db):
elif graph_db_provider == "kuzu":
query_nodes = "MATCH (n:Node) RETURN count(n) as c"
rows_n = await graph_engine.query(query_nodes)
node_count = rows_n[0]["c"]
node_count = rows_n[0][0]
query_edges = "MATCH (n:Node)-[r:EDGE]->(m:Node) RETURN count(r) as c"
rows_e = await graph_engine.query(query_edges)
edge_count = rows_e[0]["c"]
edge_count = rows_e[0][0]
elif graph_db_provider == "networkx":
nodes, edges = await graph_engine.get_graph_data()
@ -204,3 +201,45 @@ async def test_relational_db_migration(setup_test_db):
print(f"Node & edge count validated: node_count={node_count}, edge_count={edge_count}.")
print(f"All checks passed for {graph_db_provider} provider with '{relationship_label}' edges!")
async def test_migration_sqlite():
database_to_migrate_path = os.path.join(pathlib.Path(__file__).parent, "test_data/")
cognee.config.set_migration_db_config(
{
"migration_db_path": database_to_migrate_path,
"migration_db_name": "migration_database.sqlite",
"migration_db_provider": "sqlite",
}
)
await relational_db_migration()
async def test_migration_postgres():
# To run test manually you first need to run the Chinook_PostgreSql.sql script in the test_data directory
cognee.config.set_migration_db_config(
{
"migration_db_name": "test_migration_db",
"migration_db_host": "127.0.0.1",
"migration_db_port": "5432",
"migration_db_username": "cognee",
"migration_db_password": "cognee",
"migration_db_provider": "postgres",
}
)
await relational_db_migration()
async def main():
print("Starting SQLite database migration test...")
await test_migration_sqlite()
print("Starting PostgreSQL database migration test...")
await test_migration_postgres()
if __name__ == "__main__":
import asyncio
asyncio.run(main())