… db migration <!-- .github/pull_request_template.md --> ## Description <!-- Please provide a clear, human-generated description of the changes in this PR. DO NOT use AI-generated descriptions. We want to understand your thought process and reasoning. --> ## Type of Change <!-- Please check the relevant option --> - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Other (please specify): ## Screenshots/Videos (if applicable) <!-- Add screenshots or videos to help explain your changes --> ## Pre-submission Checklist <!-- Please check all boxes that apply before submitting your PR --> - [ ] **I have tested my changes thoroughly before submitting this PR** - [ ] **This PR contains minimal changes necessary to address the issue/feature** - [ ] My code follows the project's coding standards and style guidelines - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added necessary documentation (if applicable) - [ ] All new and existing tests pass - [ ] I have searched existing PRs to ensure this change hasn't been submitted already - [ ] I have linked any relevant issues in the description - [ ] My commits have clear and descriptive messages ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
110 lines
4.5 KiB
Python
110 lines
4.5 KiB
Python
from pathlib import Path
|
|
import asyncio
|
|
import os
|
|
|
|
import cognee
|
|
from cognee.infrastructure.databases.relational.config import get_migration_config
|
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
|
from cognee.api.v1.visualize.visualize import visualize_graph
|
|
from cognee.infrastructure.databases.relational import (
|
|
get_migration_relational_engine,
|
|
)
|
|
from cognee.modules.search.types import SearchType
|
|
from cognee.infrastructure.databases.relational import (
|
|
create_db_and_tables as create_relational_db_and_tables,
|
|
)
|
|
from cognee.infrastructure.databases.vector.pgvector import (
|
|
create_db_and_tables as create_vector_db_and_tables,
|
|
)
|
|
|
|
# Prerequisites:
|
|
# 1. Copy `.env.template` and rename it to `.env`.
|
|
# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field:
|
|
# LLM_API_KEY = "your_key_here"
|
|
# 3. Fill all relevant MIGRATION_DB information for the database you want to migrate to graph / Cognee
|
|
|
|
# NOTE: If you don't have a DB you want to migrate you can try it out with our
|
|
# test database at the following location:
|
|
# MIGRATION_DB_PATH="/{path_to_your_local_cognee}/cognee/tests/test_data"
|
|
# MIGRATION_DB_NAME="migration_database.sqlite"
|
|
# MIGRATION_DB_PROVIDER="sqlite"
|
|
|
|
|
|
async def main():
|
|
# Clean all data stored in Cognee
|
|
await cognee.prune.prune_data()
|
|
await cognee.prune.prune_system(metadata=True)
|
|
|
|
# Needed to create appropriate database tables only on the Cognee side
|
|
await create_relational_db_and_tables()
|
|
await create_vector_db_and_tables()
|
|
|
|
# In case environment variables are not set use the example database from the Cognee repo
|
|
migration_db_provider = os.environ.get("MIGRATION_DB_PROVIDER", "sqlite")
|
|
migration_db_path = os.environ.get(
|
|
"MIGRATION_DB_PATH",
|
|
os.path.join(Path(__file__).resolve().parent.parent.parent, "cognee/tests/test_data"),
|
|
)
|
|
migration_db_name = os.environ.get("MIGRATION_DB_NAME", "migration_database.sqlite")
|
|
|
|
migration_config = get_migration_config()
|
|
migration_config.migration_db_provider = migration_db_provider
|
|
migration_config.migration_db_path = migration_db_path
|
|
migration_config.migration_db_name = migration_db_name
|
|
|
|
engine = get_migration_relational_engine()
|
|
|
|
print("\nExtracting schema of database to migrate.")
|
|
schema = await engine.extract_schema()
|
|
print(f"Migrated database schema:\n{schema}")
|
|
|
|
graph = await get_graph_engine()
|
|
print("Migrating relational database to graph database based on schema.")
|
|
from cognee.tasks.ingestion import migrate_relational_database
|
|
|
|
await migrate_relational_database(graph, schema=schema)
|
|
print("Relational database migration complete.")
|
|
|
|
# Make sure to set top_k at a high value for a broader search, the default value is only 10!
|
|
# top_k represent the number of graph tripplets to supply to the LLM to answer your question
|
|
search_results = await cognee.search(
|
|
query_type=SearchType.GRAPH_COMPLETION,
|
|
query_text="What kind of data do you contain?",
|
|
top_k=200,
|
|
)
|
|
print(f"Search results: {search_results}")
|
|
|
|
# Having a top_k value set to too high might overwhelm the LLM context when specific questions need to be answered.
|
|
# For this kind of question we've set the top_k to 50
|
|
search_results = await cognee.search(
|
|
query_type=SearchType.GRAPH_COMPLETION,
|
|
query_text="What invoices are related to Leonie Köhler?",
|
|
top_k=50,
|
|
)
|
|
print(f"Search results: {search_results}")
|
|
|
|
search_results = await cognee.search(
|
|
query_type=SearchType.GRAPH_COMPLETION,
|
|
query_text="What invoices are related to Luís Gonçalves?",
|
|
top_k=50,
|
|
)
|
|
print(f"Search results: {search_results}")
|
|
|
|
# If you check the relational database for this example you can see that the search results successfully found all
|
|
# the invoices related to the two customers, without any hallucinations or additional information
|
|
|
|
# Define location where to store html visualization of graph of the migrated database
|
|
home_dir = os.path.expanduser("~")
|
|
destination_file_path = os.path.join(home_dir, "graph_visualization.html")
|
|
print("Adding html visualization of graph database after migration.")
|
|
await visualize_graph(destination_file_path)
|
|
print(f"Visualization can be found at: {destination_file_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(main())
|
|
finally:
|
|
loop.run_until_complete(loop.shutdown_asyncgens())
|