From d8703aa4971d081edf856574c7cdb08d0282d9cd Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Thu, 30 Oct 2025 21:19:15 +0100 Subject: [PATCH] fix: add migration and backwards compatibility --- ..._replace_graph_ledger_table_with_nodes_.py | 119 ++++++++++++++++++ cognee-mcp/src/cognee_client.py | 2 +- cognee/__init__.py | 1 + cognee/api/client.py | 3 + cognee/api/v1/cognify/code_graph_pipeline.py | 6 +- cognee/api/v1/delete/__init__.py | 14 +++ cognee/api/v1/delete/routers/__init__.py | 1 + .../v1/delete/routers/get_delete_router.py | 13 +- cognee/modules/graph/models/Edge.py | 8 +- cognee/modules/graph/models/Node.py | 10 +- .../modules/pipelines/operations/pipeline.py | 2 +- .../modules/pipelines/operations/run_tasks.py | 6 +- .../operations/run_tasks_distributed.py | 4 +- examples/low_level/pipeline.py | 23 ++-- examples/low_level/product_recommendation.py | 8 +- notebooks/cognee_demo.ipynb | 2 +- 16 files changed, 195 insertions(+), 27 deletions(-) create mode 100644 alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py create mode 100644 cognee/api/v1/delete/__init__.py create mode 100644 cognee/api/v1/delete/routers/__init__.py diff --git a/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py b/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py new file mode 100644 index 000000000..b709cf53b --- /dev/null +++ b/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py @@ -0,0 +1,119 @@ +"""Replace graph ledger table with nodes and edges tables + +Revision ID: 84e5d08260d6 +Revises: 211ab850ef3d +Create Date: 2025-10-30 13:36:23.226706 + +""" + +from uuid import NAMESPACE_OID, uuid4, uuid5 +from typing import Sequence, Union +from datetime import datetime, timezone + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "84e5d08260d6" +down_revision: Union[str, None] = "211ab850ef3d" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + + table_names = inspector.get_table_names() + + if "graph_relationship_ledger" in table_names: + op.drop_table("graph_relationship_ledger") + + if "nodes" not in table_names: + op.create_table( + "nodes", + sa.Column("id", sa.UUID, primary_key=True), + sa.Column("slug", sa.UUID, nullable=False), + sa.Column("user_id", sa.UUID, nullable=False), + sa.Column("data_id", sa.UUID, nullable=False), + sa.Column("dataset_id", sa.UUID, index=True), + sa.Column("label", sa.String()), + sa.Column("type", sa.String(), nullable=False), + sa.Column("indexed_fields", sa.JSON(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ), + ) + + if "edges" not in table_names: + op.create_table( + "edges", + sa.Column("id", sa.UUID, primary_key=True), + sa.Column("user_id", sa.UUID, nullable=False), + sa.Column("data_id", sa.UUID, index=True), + sa.Column("dataset_id", sa.UUID, index=True), + sa.Column("source_node_id", sa.UUID, nullable=False), + sa.Column("destination_node_id", sa.UUID, nullable=False), + sa.Column("label", sa.String()), + sa.Column("relationship_name", sa.String(), nullable=False), + sa.Column("props", sa.JSON()), + sa.Column( + "created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ), + ) + + existing_indexes = [index["name"] for index in inspector.get_indexes("nodes")] + + if "index_node_dataset_slug" not in existing_indexes: + op.create_index("index_node_dataset_slug", "nodes", ["dataset_id", "slug"]) + + if "index_node_dataset_data" not in existing_indexes: + op.create_index("index_node_dataset_data", "nodes", ["dataset_id", "data_id"]) + + +def downgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + + table_names = inspector.get_table_names() + + if "nodes" in table_names: + op.drop_table("nodes") + + if "edges" in table_names: + op.drop_table("edges") + + if "graph_relationship_ledger" not in table_names: + op.create_table( + "graph_relationship_ledger", + sa.Column( + "id", + sa.UUID, + primary_key=True, + default=lambda: uuid5(NAMESPACE_OID, f"{datetime.now(timezone.utc).timestamp()}"), + ), + sa.Column("source_node_id", sa.UUID, nullable=False), + sa.Column("destination_node_id", sa.UUID, nullable=False), + sa.Column("creator_function", sa.String(), nullable=False), + sa.Column("node_label", sa.String(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ), + sa.Column( + "deleted_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ), + sa.Column("user_id", sa.UUID, nullable=False), + ) + + op.create_index("idx_graph_relationship_id", "graph_relationship_ledger", ["id"]) + op.create_index( + "idx_graph_relationship_ledger_source_node_id", + "graph_relationship_ledger", + ["source_node_id"], + ) + op.create_index( + "idx_graph_relationship_ledger_destination_node_id", + "graph_relationship_ledger", + ["destination_node_id"], + ) diff --git a/cognee-mcp/src/cognee_client.py b/cognee-mcp/src/cognee_client.py index 442a97c5c..070278f03 100644 --- a/cognee-mcp/src/cognee_client.py +++ b/cognee-mcp/src/cognee_client.py @@ -196,7 +196,7 @@ class CogneeClient: ) return results - async def delete(self, data_id: UUID, dataset_id: UUID) -> Dict[str, Any]: + async def delete(self, data_id: UUID, dataset_id: UUID, mode: str = "soft") -> Dict[str, Any]: """ Delete data from a dataset. diff --git a/cognee/__init__.py b/cognee/__init__.py index dfcf742d2..6e4d2a903 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -16,6 +16,7 @@ from cognee.shared.logging_utils import setup_logging logger = setup_logging() from .api.v1.add import add +from .api.v1.delete import delete from .api.v1.cognify import cognify from .modules.memify import memify from .api.v1.update import update diff --git a/cognee/api/client.py b/cognee/api/client.py index 6684df976..6766c12de 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -25,6 +25,7 @@ from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_ from cognee.api.v1.search.routers import get_search_router from cognee.api.v1.memify.routers import get_memify_router from cognee.api.v1.add.routers import get_add_router +from cognee.api.v1.delete.routers import get_delete_router from cognee.api.v1.responses.routers import get_responses_router from cognee.api.v1.sync.routers import get_sync_router from cognee.api.v1.update.routers import get_update_router @@ -261,6 +262,8 @@ app.include_router(get_settings_router(), prefix="/api/v1/settings", tags=["sett app.include_router(get_visualize_router(), prefix="/api/v1/visualize", tags=["visualize"]) +app.include_router(get_delete_router(), prefix="/api/v1/delete", tags=["delete"]) + app.include_router(get_update_router(), prefix="/api/v1/update", tags=["update"]) app.include_router(get_responses_router(), prefix="/api/v1/responses", tags=["responses"]) diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index d18ff2485..6a0f2aa0f 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -93,12 +93,14 @@ async def run_code_graph_pipeline( data = RepoData(id=uuid4(), repo_path=repo_path) if include_docs: - non_code_pipeline_run = run_tasks(non_code_tasks, dataset, data, user, "cognify_pipeline") + non_code_pipeline_run = run_tasks( + non_code_tasks, dataset.id, data, user, "cognify_pipeline" + ) async for run_status in non_code_pipeline_run: yield run_status async for run_status in run_tasks( - tasks, dataset, data, user, "cognify_code_pipeline", incremental_loading=False + tasks, dataset.id, data, user, "cognify_code_pipeline", incremental_loading=False ): yield run_status diff --git a/cognee/api/v1/delete/__init__.py b/cognee/api/v1/delete/__init__.py new file mode 100644 index 000000000..a28df0d3f --- /dev/null +++ b/cognee/api/v1/delete/__init__.py @@ -0,0 +1,14 @@ +from uuid import UUID +from typing import Optional + +from deprecated import deprecated + +from cognee.api.v1.datasets import datasets +from cognee.modules.users.models import User + + +@deprecated( + reason="cognee.delete is deprecated. Use `datasets.delete_data` instead.", version="0.3.9" +) +async def delete(data_id: UUID, dataset_id: UUID, mode: str = "soft", user: Optional[User] = None): + await datasets.delete_data(data_id=data_id, dataset_id=dataset_id, user=user) diff --git a/cognee/api/v1/delete/routers/__init__.py b/cognee/api/v1/delete/routers/__init__.py new file mode 100644 index 000000000..1f10d84af --- /dev/null +++ b/cognee/api/v1/delete/routers/__init__.py @@ -0,0 +1 @@ +from .get_delete_router import get_delete_router diff --git a/cognee/api/v1/delete/routers/get_delete_router.py b/cognee/api/v1/delete/routers/get_delete_router.py index 3ff97681d..8a9a0729e 100644 --- a/cognee/api/v1/delete/routers/get_delete_router.py +++ b/cognee/api/v1/delete/routers/get_delete_router.py @@ -1,7 +1,9 @@ +from uuid import UUID +from deprecated import deprecated from fastapi import Depends from fastapi.responses import JSONResponse from fastapi import APIRouter -from uuid import UUID + from cognee.shared.logging_utils import get_logger from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user @@ -15,6 +17,10 @@ def get_delete_router() -> APIRouter: router = APIRouter() @router.delete("", response_model=None) + @deprecated( + reason="DELETE /v1/delete is deprecated. Use DELETE /v1/datasets/{dataset_id}/data/{data_id} instead.", + version="0.3.9", + ) async def delete( data_id: UUID, dataset_id: UUID, @@ -44,13 +50,12 @@ def get_delete_router() -> APIRouter: }, ) - from cognee.api.v1.delete import delete as cognee_delete + from cognee.api.v1.datasets import datasets try: - result = await cognee_delete( + result = await datasets.delete_data( data_id=data_id, dataset_id=dataset_id, - mode=mode, user=user, ) return result diff --git a/cognee/modules/graph/models/Edge.py b/cognee/modules/graph/models/Edge.py index 2cea1a94c..550838a1f 100644 --- a/cognee/modules/graph/models/Edge.py +++ b/cognee/modules/graph/models/Edge.py @@ -1,5 +1,7 @@ +from datetime import datetime, timezone from sqlalchemy import ( # event, + DateTime, String, JSON, UUID, @@ -25,11 +27,15 @@ class Edge(Base): source_node_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False) destination_node_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False) - relationship_name: Mapped[str | None] = mapped_column(String(255)) + relationship_name: Mapped[str | None] = mapped_column(String(255), nullable=False) label: Mapped[str | None] = mapped_column(String(255)) props: Mapped[dict | None] = mapped_column(JSON) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False + ) + # __table_args__ = ( # {"postgresql_partition_by": "HASH (user_id)"}, # partitioning by user # ) diff --git a/cognee/modules/graph/models/Node.py b/cognee/modules/graph/models/Node.py index 930fc2f31..ebce02aac 100644 --- a/cognee/modules/graph/models/Node.py +++ b/cognee/modules/graph/models/Node.py @@ -1,4 +1,6 @@ +from datetime import datetime, timezone from sqlalchemy import ( + DateTime, Index, # event, String, @@ -25,12 +27,16 @@ class Node(Base): dataset_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), index=True, nullable=False) - label: Mapped[str] = mapped_column(String(255)) + label: Mapped[str | None] = mapped_column(String(255)) type: Mapped[str] = mapped_column(String(255), nullable=False) - indexed_fields: Mapped[list] = mapped_column(JSON) + indexed_fields: Mapped[list] = mapped_column(JSON, nullable=False) # props: Mapped[dict] = mapped_column(JSON) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False + ) + __table_args__ = ( Index("index_node_dataset_slug", "dataset_id", "slug"), Index("index_node_dataset_data", "dataset_id", "data_id"), diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 40b52ed2d..4d1b211af 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -81,7 +81,7 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset, data, user, pipeline_name, context, incremental_loading, data_per_batch + tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 5a757aa5a..22e146b49 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -3,10 +3,11 @@ import os import asyncio from functools import wraps from typing import Any, Dict, List, Optional +from uuid import UUID from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.models import Dataset +from cognee.modules.data.methods import get_dataset from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.users.models import User from cognee.shared.logging_utils import get_logger @@ -53,7 +54,7 @@ def override_run_tasks(new_gen): @override_run_tasks(run_tasks_distributed) async def run_tasks( tasks: List[Task], - dataset: Dataset, + dataset_id: UUID, data: Optional[List[Any]] = None, user: Optional[User] = None, pipeline_name: str = "unknown_pipeline", @@ -64,6 +65,7 @@ async def run_tasks( if not user: user = await get_default_user() + dataset = await get_dataset(user.id, dataset_id) pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset.id, data) pipeline_run_id = pipeline_run.pipeline_run_id diff --git a/cognee/modules/pipelines/operations/run_tasks_distributed.py b/cognee/modules/pipelines/operations/run_tasks_distributed.py index dc93a9f4e..f659c14e7 100644 --- a/cognee/modules/pipelines/operations/run_tasks_distributed.py +++ b/cognee/modules/pipelines/operations/run_tasks_distributed.py @@ -6,6 +6,7 @@ except ModuleNotFoundError: from typing import Any, List, Optional from uuid import UUID +from cognee.modules.data.methods import get_dataset from cognee.modules.data.models import Dataset from cognee.modules.pipelines.tasks.task import Task from cognee.infrastructure.databases.relational import get_relational_engine @@ -82,7 +83,7 @@ if modal: async def run_tasks_distributed( tasks: List[Task], - dataset: Dataset, + dataset_id: UUID, data: Optional[List[Any]] = None, user: Optional[User] = None, pipeline_name: str = "unknown_pipeline", @@ -93,6 +94,7 @@ async def run_tasks_distributed( if not user: user = await get_default_user() + dataset = await get_dataset(user.id, dataset_id) pipeline_id: UUID = generate_pipeline_id(user.id, dataset.id, pipeline_name) pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset.id, data) pipeline_run_id: UUID = pipeline_run.pipeline_run_id diff --git a/examples/low_level/pipeline.py b/examples/low_level/pipeline.py index a7d1750fa..907af17bc 100644 --- a/examples/low_level/pipeline.py +++ b/examples/low_level/pipeline.py @@ -1,7 +1,10 @@ import os import json import asyncio -from typing import List, Any +from uuid import UUID, uuid4 +from pydantic import BaseModel +from typing import Dict, List, Optional + from cognee import prune from cognee import visualize_graph from cognee.low_level import setup, DataPoint @@ -38,14 +41,19 @@ class Company(DataPoint): metadata: dict = {"index_fields": ["name"]} -def ingest_files(data: List[Any]): +class Data(BaseModel): + id: Optional[UUID] = uuid4() + payload: Dict[str, List] + + +def ingest_files(data: List[Data]): people_data_points = {} departments_data_points = {} companies_data_points = {} for data_item in data: - people = data_item["people"] - companies = data_item["companies"] + people = data_item.payload["people"] + companies = data_item.payload["companies"] for person in people: new_person = Person(name=person["name"]) @@ -95,13 +103,12 @@ async def main(): people_file_path = os.path.join(os.path.dirname(__file__), "people.json") people = json.loads(open(people_file_path, "r").read()) - # Run tasks expects a list of data even if it is just one document - data = [{"companies": companies, "people": people}] + data = Data(payload={"companies": companies, "people": people}) pipeline = run_tasks( [Task(ingest_files), Task(add_data_points)], - dataset=datasets[0], - data=data, + dataset_id=datasets[0].id, + data=[data], incremental_loading=False, ) diff --git a/examples/low_level/product_recommendation.py b/examples/low_level/product_recommendation.py index e49e1ac33..7ce901d17 100644 --- a/examples/low_level/product_recommendation.py +++ b/examples/low_level/product_recommendation.py @@ -11,7 +11,7 @@ from cognee import prune # from cognee import visualize_graph from cognee.infrastructure.databases.graph import get_graph_engine from cognee.low_level import setup, DataPoint -from cognee.modules.data.models import Dataset +from cognee.modules.data.methods import create_dataset from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.pipelines import run_tasks, Task @@ -27,7 +27,7 @@ products_aggregator_node = Products() class Product(DataPoint): name: str - type: str + type: str = "Product" price: float colors: list[str] is_type: Products = products_aggregator_node @@ -115,7 +115,7 @@ async def main(): # Get user and dataset user: User = await get_default_user() # type: ignore - main_dataset = Dataset(id=uuid4(), name="demo_dataset") + main_dataset = await create_dataset("demo_dataset", user) customers_file_path = os.path.join(os.path.dirname(__file__), "customers.json") customers = json.loads(open(customers_file_path, "r").read()) @@ -131,7 +131,7 @@ async def main(): pipeline = run_tasks( [Task(ingest_customers), Task(add_data_points)], - dataset=main_dataset, + dataset_id=main_dataset.id, data=[data], user=user, ) diff --git a/notebooks/cognee_demo.ipynb b/notebooks/cognee_demo.ipynb index fc89ebcf1..480b04a40 100644 --- a/notebooks/cognee_demo.ipynb +++ b/notebooks/cognee_demo.ipynb @@ -643,7 +643,7 @@ " Task(add_data_points, task_config={\"batch_size\": 10}),\n", " ]\n", "\n", - " pipeline_run = run_tasks(tasks, dataset, data_documents, user, \"cognify_pipeline\", context={\"dataset\": dataset})\n", + " pipeline_run = run_tasks(tasks, dataset.id, data_documents, user, \"cognify_pipeline\", context={\"dataset\": dataset})\n", " pipeline_run_status = None\n", "\n", " async for run_status in pipeline_run:\n",