diff --git a/.github/workflows/test_memgraph.yml b/.github/workflows/test_memgraph.yml index b7ea9d837..c9134bc1f 100644 --- a/.github/workflows/test_memgraph.yml +++ b/.github/workflows/test_memgraph.yml @@ -1,9 +1,9 @@ name: test | memgraph -# on: -# workflow_dispatch: -# pull_request: -# types: [labeled, synchronize] +on: + workflow_dispatch: + pull_request: + types: [labeled, synchronize] concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -17,13 +17,15 @@ jobs: name: test runs-on: ubuntu-22.04 - defaults: - run: - shell: bash + services: + memgraph: + image: memgraph/memgraph-mage:latest + ports: + - 7687:7687 steps: - name: Check out - uses: actions/checkout@master + uses: actions/checkout@v3 - name: Setup Python uses: actions/setup-python@v5 @@ -38,7 +40,7 @@ jobs: installer-parallel: true - name: Install dependencies - run: poetry install -E memgraph --no-interaction + run: poetry install -E neo4j - name: Run default Memgraph env: @@ -51,7 +53,7 @@ jobs: EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} - GRAPH_DATABASE_URL: ${{ secrets.MEMGRAPH_API_URL }} - GRAPH_DATABASE_PASSWORD: ${{ secrets.MEMGRAPH_API_KEY }} - GRAPH_DATABASE_USERNAME: " " + GRAPH_DATABASE_URL: "bolt://localhost:7687" + GRAPH_DATABASE_PASSWORD: "memgraph" + GRAPH_DATABASE_USERNAME: "memgraph" run: poetry run python ./cognee/tests/test_memgraph.py diff --git a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py b/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py index 75c0722a8..bd2cffeb8 100644 --- a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py +++ b/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py @@ -614,7 +614,7 @@ class MemgraphAdapter(GraphDBInterface): return [result["successor"] for result in results] - async def get_neighbours(self, node_id: str) -> List[Dict[str, Any]]: + async def get_neighbors(self, node_id: str) -> List[Dict[str, Any]]: """ Get both predecessors and successors of a node. @@ -634,6 +634,25 @@ class MemgraphAdapter(GraphDBInterface): return predecessors + successors + async def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: + """Get a single node by ID.""" + query = """ + MATCH (node {id: $node_id}) + RETURN node + """ + results = await self.query(query, {"node_id": node_id}) + return results[0]["node"] if results else None + + async def get_nodes(self, node_ids: List[str]) -> List[Dict[str, Any]]: + """Get multiple nodes by their IDs.""" + query = """ + UNWIND $node_ids AS id + MATCH (node {id: id}) + RETURN node + """ + results = await self.query(query, {"node_ids": node_ids}) + return [result["node"] for result in results] + async def get_connections(self, node_id: UUID) -> list: """ Retrieve connections for a given node, including both predecessors and successors. diff --git a/examples/relational_db_with_dlt/fix_foreign_keys.sql b/examples/relational_db_with_dlt/fix_foreign_keys.sql new file mode 100644 index 000000000..fb6ba073a --- /dev/null +++ b/examples/relational_db_with_dlt/fix_foreign_keys.sql @@ -0,0 +1,290 @@ +BEGIN TRANSACTION; + +PRAGMA foreign_keys=OFF; + +/*------------------------------------------------------------------------ +1) pokemon_list +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_list ( + name TEXT, + url TEXT, + _dlt_load_id TEXT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY +); +INSERT INTO new_pokemon_list + SELECT name, url, _dlt_load_id, _dlt_id + FROM pokemon_list; +DROP TABLE pokemon_list; +ALTER TABLE new_pokemon_list RENAME TO pokemon_list; + +/*------------------------------------------------------------------------ +2) pokemon_details (Parent for most child tables) + _dlt_id is a PRIMARY KEY so children can FK to it. +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details ( + base_experience BIGINT, + height BIGINT, + id BIGINT, + is_default BOOLEAN, + name TEXT, + "order" BIGINT, + species__name, + weight BIGINT, + _dlt_load_id TEXT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY +); +INSERT INTO new_pokemon_details + SELECT base_experience, + height, + id, + is_default, + name, + "order", + species__name, + weight, + _dlt_load_id, + _dlt_id + FROM pokemon_details; +DROP TABLE pokemon_details; +ALTER TABLE new_pokemon_details RENAME TO pokemon_details; + +/*------------------------------------------------------------------------ +3) pokemon_details_abilities (Child) + Foreign key from _dlt_parent_id → pokemon_details(_dlt_id) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__abilities ( + ability__name TEXT, + ability__url TEXT, + is_hidden BOOLEAN, + slot BIGINT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + CONSTRAINT fk_abilities + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__abilities + SELECT ability__name, + ability__url, + is_hidden, + slot, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__abilities; +DROP TABLE pokemon_details__abilities; +ALTER TABLE new_pokemon_details__abilities RENAME TO pokemon_details__abilities; + +/*------------------------------------------------------------------------ +4) pokemon_details_forms (Child) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details_forms ( + name TEXT, + url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details_forms + SELECT name, + url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__forms; +DROP TABLE pokemon_details__forms; +ALTER TABLE new_pokemon_details_forms RENAME TO pokemon_details__forms; + +/*------------------------------------------------------------------------ +5) pokemon_details_game_indices (Child) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__game_indices ( + game_index BIGINT, + version__name TEXT, + version__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__game_indices + SELECT game_index, + version__name, + version__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__game_indices; +DROP TABLE pokemon_details__game_indices; +ALTER TABLE new_pokemon_details__game_indices RENAME TO pokemon_details__game_indices; + + +/*------------------------------------------------------------------------ +6) pokemon_details_moves (Child of pokemon_details) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__moves ( + move__name TEXT, + move__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__moves + SELECT move__name, + move__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__moves; +DROP TABLE pokemon_details__moves; +ALTER TABLE new_pokemon_details__moves RENAME TO pokemon_details__moves; + +/*------------------------------------------------------------------------ +7) pokemon_details_moves_version_group_details (Child of pokemon_details_moves) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__moves__version_group_details ( + level_learned_at BIGINT, + version_group__name TEXT, + version_group__url TEXT, + move_learn_method__name TEXT, + move_learn_method__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + "order" BIGINT, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details__moves(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__moves__version_group_details + SELECT level_learned_at, + version_group__name, + version_group__url, + move_learn_method__name, + move_learn_method__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id, + "order" + FROM pokemon_details__moves__version_group_details; +DROP TABLE pokemon_details__moves__version_group_details; +ALTER TABLE new_pokemon_details__moves__version_group_details + RENAME TO pokemon_details__moves__version_group_details; + +/*------------------------------------------------------------------------ +8) pokemon_details_past_abilities (Child of pokemon_details) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__past_abilities ( + generation__name TEXT, + generation__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__past_abilities + SELECT generation__name, + generation__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__past_abilities; +DROP TABLE pokemon_details__past_abilities; +ALTER TABLE new_pokemon_details__past_abilities + RENAME TO pokemon_details__past_abilities; + +/*------------------------------------------------------------------------ +9) pokemon_details_stats (Child of pokemon_details) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details_stats ( + base_stat BIGINT, + effort BIGINT, + stat__name TEXT, + stat__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details_stats + SELECT base_stat, + effort, + stat__name, + stat__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__stats; +DROP TABLE pokemon_details__stats; +ALTER TABLE new_pokemon_details_stats + RENAME TO pokemon_details__stats; + +/*------------------------------------------------------------------------ +10) pokemon_details_types (Child of pokemon_details) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__types ( + slot BIGINT, + type__name TEXT, + type__url TEXT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__types + SELECT slot, + type__name, + type__url, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__types; +DROP TABLE pokemon_details__types; +ALTER TABLE new_pokemon_details__types + RENAME TO pokemon_details__types; + +/*------------------------------------------------------------------------ +11) pokemon_details_past_abilities_abilities (Child of pokemon_details_past_abilities) +------------------------------------------------------------------------*/ +CREATE TABLE IF NOT EXISTS new_pokemon_details__past_abilities__abilities ( + is_hidden BOOLEAN, + slot BIGINT, + _dlt_parent_id VARCHAR(128) NOT NULL, + _dlt_list_idx BIGINT NOT NULL, + _dlt_id VARCHAR(128) NOT NULL PRIMARY KEY, + FOREIGN KEY (_dlt_parent_id) + REFERENCES pokemon_details__past_abilities(_dlt_id) + ON DELETE CASCADE +); +INSERT INTO new_pokemon_details__past_abilities__abilities + SELECT is_hidden, + slot, + _dlt_parent_id, + _dlt_list_idx, + _dlt_id + FROM pokemon_details__past_abilities__abilities; +DROP TABLE pokemon_details__past_abilities__abilities; +ALTER TABLE new_pokemon_details__past_abilities__abilities + RENAME TO pokemon_details__past_abilities__abilities; + + +/* Re-enable FK checks */ +PRAGMA foreign_keys=ON; + +COMMIT; diff --git a/examples/relational_db_with_dlt/relational_db_and_dlt.py b/examples/relational_db_with_dlt/relational_db_and_dlt.py new file mode 100644 index 000000000..7ee3666ea --- /dev/null +++ b/examples/relational_db_with_dlt/relational_db_and_dlt.py @@ -0,0 +1,174 @@ +import dlt +import requests +import asyncio +import threading +import sqlalchemy as sa +import pathlib +import os +from dlt.destinations.impl.sqlalchemy.configuration import SqlalchemyCredentials + + +import cognee +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.api.v1.visualize.visualize import visualize_graph +from cognee.infrastructure.databases.relational import ( + get_migration_relational_engine, + create_db_and_tables as create_relational_db_and_tables, +) +from cognee.infrastructure.databases.vector.pgvector import ( + create_db_and_tables as create_pgvector_db_and_tables, +) +from cognee.tasks.ingestion.migrate_relational_database import migrate_relational_database +from cognee.modules.search.types import SearchType + +from cognee.root_dir import get_absolute_path + + +class PatchedSqlalchemyCredentials(SqlalchemyCredentials): + def __init__(self, connection_string=None): + super().__init__(connection_string) + if not hasattr(self, "_conn_lock"): + self._conn_lock = threading.Lock() + + +BASE_URL = "https://pokeapi.co/api/v2/" + + +@dlt.resource(write_disposition="replace") +def pokemon_list(limit: int = 5): + """Fetch Pokémon list (first 5 Pokémon).""" + response = requests.get(f"{BASE_URL}pokemon", params={"limit": limit}) + response.raise_for_status() + yield response.json()["results"] + + +@dlt.transformer(data_from=pokemon_list) +def pokemon_details(pokemons): + """Fetch full detail for each Pokémon.""" + for pokemon in pokemons: + response = requests.get(pokemon["url"]) + response.raise_for_status() + yield response.json() + + +async def setup_and_process_data(): + """ + Setup configuration and process Pokemon data into a SQLite database with dlt. + """ + engine = sa.create_engine("sqlite:///pokemon_data.db") + + pipeline = dlt.pipeline( + pipeline_name="pokemon_pipeline", + destination=dlt.destinations.sqlalchemy( + PatchedSqlalchemyCredentials("sqlite:///pokemon_data.db?timeout=15") + ), + dataset_name="main", + dev_mode=False, + ) + + info = pipeline.run([pokemon_list, pokemon_details]) + print(f"[setup_and_process_data] Pipeline run complete. Pipeline info:\n{info}") + + # (Optional) Inspect tables for debugging + print("[setup_and_process_data] Verifying data was written to the database.") + with engine.connect() as conn: + tables = conn.execute( + sa.text("SELECT name FROM sqlite_master WHERE type='table';") + ).fetchall() + print(f"[setup_and_process_data] Tables in database: {tables}") + # Example: if 'pokemon_details' is expected, we can see how many rows: + for table_tuple in tables: + table_name = table_tuple[0] + row_count = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table_name}")).fetchone()[0] + print(f" -> Table '{table_name}' has {row_count} row(s).") + + print("[setup_and_process_data] Data loading step finished.\n") + return None + + +async def apply_foreign_key_fixes(): + """ + Apply foreign key fixes to the SQLite database after data processing. + """ + engine = sa.create_engine("sqlite:///pokemon_data.db") + with engine.connect() as conn: + raw_conn = conn.connection.connection + with open( + "examples/relational_db_with_dlt/fix_foreign_keys.sql", "r", encoding="utf-8" + ) as f: + sql_script = f.read() + raw_conn.executescript(sql_script) # runs multiple statements + print("[apply_foreign_key_fixes] Applied foreign key fixes") + + return None + + +async def migrate_to_cognee(): + """ + Migrate the data from the SQLite database to cognee's knowledge graph. + """ + + # Use cognee's absolute path function instead of relative paths + data_directory_path = get_absolute_path(".data_storage") + cognee.config.data_root_directory(data_directory_path) + + cognee_directory_path = get_absolute_path(".cognee_system") + cognee.config.system_root_directory(cognee_directory_path) + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + engine = get_migration_relational_engine() + + await create_relational_db_and_tables() + await create_pgvector_db_and_tables() + + schema = await engine.extract_schema() + + graph_engine = await get_graph_engine() + + await migrate_relational_database(graph_engine, schema=schema) + + +async def visualize_knowledge_graph(): + """ + Generate and save an HTML visualization of the knowledge graph. + """ + home_dir = os.path.expanduser("~") + html_path = os.path.join(home_dir, "graph_visualization.html") + + await visualize_graph(html_path) + + +async def search_knowledge_graph(): + """ + Perform a search query against the knowledge graph. + """ + search_results = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, query_text="What kind of data do you contain?" + ) + print(search_results) + + +async def main(): + print("[main] Starting main function, running setup_and_process_data...") + await setup_and_process_data() + print("[main] Data loaded into SQLite.") + await apply_foreign_key_fixes() + print("[main] Foreign key fixes applied.") + await migrate_to_cognee() + print("[main] Migration to cognee finished.") + await visualize_knowledge_graph() + print("[main] Knowledge graph visualization created.") + await search_knowledge_graph() + print("[main] Knowledge graph search completed.") + + +if __name__ == "__main__": + print("[__main__] Creating and running event loop.") + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + print("[__main__] Event loop closed. Exiting.")