fix: add migration and backwards compatibility

This commit is contained in:
Boris Arzentar 2025-10-30 21:19:15 +01:00
parent efb57eee39
commit d8703aa497
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
16 changed files with 195 additions and 27 deletions

View file

@ -0,0 +1,119 @@
"""Replace graph ledger table with nodes and edges tables
Revision ID: 84e5d08260d6
Revises: 211ab850ef3d
Create Date: 2025-10-30 13:36:23.226706
"""
from uuid import NAMESPACE_OID, uuid4, uuid5
from typing import Sequence, Union
from datetime import datetime, timezone
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "84e5d08260d6"
down_revision: Union[str, None] = "211ab850ef3d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
table_names = inspector.get_table_names()
if "graph_relationship_ledger" in table_names:
op.drop_table("graph_relationship_ledger")
if "nodes" not in table_names:
op.create_table(
"nodes",
sa.Column("id", sa.UUID, primary_key=True),
sa.Column("slug", sa.UUID, nullable=False),
sa.Column("user_id", sa.UUID, nullable=False),
sa.Column("data_id", sa.UUID, nullable=False),
sa.Column("dataset_id", sa.UUID, index=True),
sa.Column("label", sa.String()),
sa.Column("type", sa.String(), nullable=False),
sa.Column("indexed_fields", sa.JSON(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
),
)
if "edges" not in table_names:
op.create_table(
"edges",
sa.Column("id", sa.UUID, primary_key=True),
sa.Column("user_id", sa.UUID, nullable=False),
sa.Column("data_id", sa.UUID, index=True),
sa.Column("dataset_id", sa.UUID, index=True),
sa.Column("source_node_id", sa.UUID, nullable=False),
sa.Column("destination_node_id", sa.UUID, nullable=False),
sa.Column("label", sa.String()),
sa.Column("relationship_name", sa.String(), nullable=False),
sa.Column("props", sa.JSON()),
sa.Column(
"created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
),
)
existing_indexes = [index["name"] for index in inspector.get_indexes("nodes")]
if "index_node_dataset_slug" not in existing_indexes:
op.create_index("index_node_dataset_slug", "nodes", ["dataset_id", "slug"])
if "index_node_dataset_data" not in existing_indexes:
op.create_index("index_node_dataset_data", "nodes", ["dataset_id", "data_id"])
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
table_names = inspector.get_table_names()
if "nodes" in table_names:
op.drop_table("nodes")
if "edges" in table_names:
op.drop_table("edges")
if "graph_relationship_ledger" not in table_names:
op.create_table(
"graph_relationship_ledger",
sa.Column(
"id",
sa.UUID,
primary_key=True,
default=lambda: uuid5(NAMESPACE_OID, f"{datetime.now(timezone.utc).timestamp()}"),
),
sa.Column("source_node_id", sa.UUID, nullable=False),
sa.Column("destination_node_id", sa.UUID, nullable=False),
sa.Column("creator_function", sa.String(), nullable=False),
sa.Column("node_label", sa.String(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
),
sa.Column(
"deleted_at", sa.DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
),
sa.Column("user_id", sa.UUID, nullable=False),
)
op.create_index("idx_graph_relationship_id", "graph_relationship_ledger", ["id"])
op.create_index(
"idx_graph_relationship_ledger_source_node_id",
"graph_relationship_ledger",
["source_node_id"],
)
op.create_index(
"idx_graph_relationship_ledger_destination_node_id",
"graph_relationship_ledger",
["destination_node_id"],
)

View file

@ -196,7 +196,7 @@ class CogneeClient:
)
return results
async def delete(self, data_id: UUID, dataset_id: UUID) -> Dict[str, Any]:
async def delete(self, data_id: UUID, dataset_id: UUID, mode: str = "soft") -> Dict[str, Any]:
"""
Delete data from a dataset.

View file

@ -16,6 +16,7 @@ from cognee.shared.logging_utils import setup_logging
logger = setup_logging()
from .api.v1.add import add
from .api.v1.delete import delete
from .api.v1.cognify import cognify
from .modules.memify import memify
from .api.v1.update import update

View file

@ -25,6 +25,7 @@ from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
from cognee.api.v1.responses.routers import get_responses_router
from cognee.api.v1.sync.routers import get_sync_router
from cognee.api.v1.update.routers import get_update_router
@ -261,6 +262,8 @@ app.include_router(get_settings_router(), prefix="/api/v1/settings", tags=["sett
app.include_router(get_visualize_router(), prefix="/api/v1/visualize", tags=["visualize"])
app.include_router(get_delete_router(), prefix="/api/v1/delete", tags=["delete"])
app.include_router(get_update_router(), prefix="/api/v1/update", tags=["update"])
app.include_router(get_responses_router(), prefix="/api/v1/responses", tags=["responses"])

View file

@ -93,12 +93,14 @@ async def run_code_graph_pipeline(
data = RepoData(id=uuid4(), repo_path=repo_path)
if include_docs:
non_code_pipeline_run = run_tasks(non_code_tasks, dataset, data, user, "cognify_pipeline")
non_code_pipeline_run = run_tasks(
non_code_tasks, dataset.id, data, user, "cognify_pipeline"
)
async for run_status in non_code_pipeline_run:
yield run_status
async for run_status in run_tasks(
tasks, dataset, data, user, "cognify_code_pipeline", incremental_loading=False
tasks, dataset.id, data, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status

View file

@ -0,0 +1,14 @@
from uuid import UUID
from typing import Optional
from deprecated import deprecated
from cognee.api.v1.datasets import datasets
from cognee.modules.users.models import User
@deprecated(
reason="cognee.delete is deprecated. Use `datasets.delete_data` instead.", version="0.3.9"
)
async def delete(data_id: UUID, dataset_id: UUID, mode: str = "soft", user: Optional[User] = None):
await datasets.delete_data(data_id=data_id, dataset_id=dataset_id, user=user)

View file

@ -0,0 +1 @@
from .get_delete_router import get_delete_router

View file

@ -1,7 +1,9 @@
from uuid import UUID
from deprecated import deprecated
from fastapi import Depends
from fastapi.responses import JSONResponse
from fastapi import APIRouter
from uuid import UUID
from cognee.shared.logging_utils import get_logger
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
@ -15,6 +17,10 @@ def get_delete_router() -> APIRouter:
router = APIRouter()
@router.delete("", response_model=None)
@deprecated(
reason="DELETE /v1/delete is deprecated. Use DELETE /v1/datasets/{dataset_id}/data/{data_id} instead.",
version="0.3.9",
)
async def delete(
data_id: UUID,
dataset_id: UUID,
@ -44,13 +50,12 @@ def get_delete_router() -> APIRouter:
},
)
from cognee.api.v1.delete import delete as cognee_delete
from cognee.api.v1.datasets import datasets
try:
result = await cognee_delete(
result = await datasets.delete_data(
data_id=data_id,
dataset_id=dataset_id,
mode=mode,
user=user,
)
return result

View file

@ -1,5 +1,7 @@
from datetime import datetime, timezone
from sqlalchemy import (
# event,
DateTime,
String,
JSON,
UUID,
@ -25,11 +27,15 @@ class Edge(Base):
source_node_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
destination_node_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
relationship_name: Mapped[str | None] = mapped_column(String(255))
relationship_name: Mapped[str | None] = mapped_column(String(255), nullable=False)
label: Mapped[str | None] = mapped_column(String(255))
props: Mapped[dict | None] = mapped_column(JSON)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
)
# __table_args__ = (
# {"postgresql_partition_by": "HASH (user_id)"}, # partitioning by user
# )

View file

@ -1,4 +1,6 @@
from datetime import datetime, timezone
from sqlalchemy import (
DateTime,
Index,
# event,
String,
@ -25,12 +27,16 @@ class Node(Base):
dataset_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), index=True, nullable=False)
label: Mapped[str] = mapped_column(String(255))
label: Mapped[str | None] = mapped_column(String(255))
type: Mapped[str] = mapped_column(String(255), nullable=False)
indexed_fields: Mapped[list] = mapped_column(JSON)
indexed_fields: Mapped[list] = mapped_column(JSON, nullable=False)
# props: Mapped[dict] = mapped_column(JSON)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
)
__table_args__ = (
Index("index_node_dataset_slug", "dataset_id", "slug"),
Index("index_node_dataset_data", "dataset_id", "data_id"),

View file

@ -81,7 +81,7 @@ async def run_pipeline_per_dataset(
return
pipeline_run = run_tasks(
tasks, dataset, data, user, pipeline_name, context, incremental_loading, data_per_batch
tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch
)
async for pipeline_run_info in pipeline_run:

View file

@ -3,10 +3,11 @@ import os
import asyncio
from functools import wraps
from typing import Any, Dict, List, Optional
from uuid import UUID
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Dataset
from cognee.modules.data.methods import get_dataset
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
from cognee.modules.users.models import User
from cognee.shared.logging_utils import get_logger
@ -53,7 +54,7 @@ def override_run_tasks(new_gen):
@override_run_tasks(run_tasks_distributed)
async def run_tasks(
tasks: List[Task],
dataset: Dataset,
dataset_id: UUID,
data: Optional[List[Any]] = None,
user: Optional[User] = None,
pipeline_name: str = "unknown_pipeline",
@ -64,6 +65,7 @@ async def run_tasks(
if not user:
user = await get_default_user()
dataset = await get_dataset(user.id, dataset_id)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset.id, data)
pipeline_run_id = pipeline_run.pipeline_run_id

View file

@ -6,6 +6,7 @@ except ModuleNotFoundError:
from typing import Any, List, Optional
from uuid import UUID
from cognee.modules.data.methods import get_dataset
from cognee.modules.data.models import Dataset
from cognee.modules.pipelines.tasks.task import Task
from cognee.infrastructure.databases.relational import get_relational_engine
@ -82,7 +83,7 @@ if modal:
async def run_tasks_distributed(
tasks: List[Task],
dataset: Dataset,
dataset_id: UUID,
data: Optional[List[Any]] = None,
user: Optional[User] = None,
pipeline_name: str = "unknown_pipeline",
@ -93,6 +94,7 @@ async def run_tasks_distributed(
if not user:
user = await get_default_user()
dataset = await get_dataset(user.id, dataset_id)
pipeline_id: UUID = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset.id, data)
pipeline_run_id: UUID = pipeline_run.pipeline_run_id

View file

@ -1,7 +1,10 @@
import os
import json
import asyncio
from typing import List, Any
from uuid import UUID, uuid4
from pydantic import BaseModel
from typing import Dict, List, Optional
from cognee import prune
from cognee import visualize_graph
from cognee.low_level import setup, DataPoint
@ -38,14 +41,19 @@ class Company(DataPoint):
metadata: dict = {"index_fields": ["name"]}
def ingest_files(data: List[Any]):
class Data(BaseModel):
id: Optional[UUID] = uuid4()
payload: Dict[str, List]
def ingest_files(data: List[Data]):
people_data_points = {}
departments_data_points = {}
companies_data_points = {}
for data_item in data:
people = data_item["people"]
companies = data_item["companies"]
people = data_item.payload["people"]
companies = data_item.payload["companies"]
for person in people:
new_person = Person(name=person["name"])
@ -95,13 +103,12 @@ async def main():
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())
# Run tasks expects a list of data even if it is just one document
data = [{"companies": companies, "people": people}]
data = Data(payload={"companies": companies, "people": people})
pipeline = run_tasks(
[Task(ingest_files), Task(add_data_points)],
dataset=datasets[0],
data=data,
dataset_id=datasets[0].id,
data=[data],
incremental_loading=False,
)

View file

@ -11,7 +11,7 @@ from cognee import prune
# from cognee import visualize_graph
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.low_level import setup, DataPoint
from cognee.modules.data.models import Dataset
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.pipelines import run_tasks, Task
@ -27,7 +27,7 @@ products_aggregator_node = Products()
class Product(DataPoint):
name: str
type: str
type: str = "Product"
price: float
colors: list[str]
is_type: Products = products_aggregator_node
@ -115,7 +115,7 @@ async def main():
# Get user and dataset
user: User = await get_default_user() # type: ignore
main_dataset = Dataset(id=uuid4(), name="demo_dataset")
main_dataset = await create_dataset("demo_dataset", user)
customers_file_path = os.path.join(os.path.dirname(__file__), "customers.json")
customers = json.loads(open(customers_file_path, "r").read())
@ -131,7 +131,7 @@ async def main():
pipeline = run_tasks(
[Task(ingest_customers), Task(add_data_points)],
dataset=main_dataset,
dataset_id=main_dataset.id,
data=[data],
user=user,
)

View file

@ -643,7 +643,7 @@
" Task(add_data_points, task_config={\"batch_size\": 10}),\n",
" ]\n",
"\n",
" pipeline_run = run_tasks(tasks, dataset, data_documents, user, \"cognify_pipeline\", context={\"dataset\": dataset})\n",
" pipeline_run = run_tasks(tasks, dataset.id, data_documents, user, \"cognify_pipeline\", context={\"dataset\": dataset})\n",
" pipeline_run_status = None\n",
"\n",
" async for run_status in pipeline_run:\n",