From 007399b1c6b0170427e4b07a5b0335e27a2d1fe9 Mon Sep 17 00:00:00 2001 From: Boris Date: Tue, 26 Aug 2025 19:06:08 +0200 Subject: [PATCH] feat: migrate pipeline status reset to add pipeline (#1289) ## Description ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- cognee/api/v1/add/add.py | 18 ++++++++-- .../v1/cognify/routers/get_cognify_router.py | 2 +- .../datasets/routers/get_datasets_router.py | 2 +- cognee/modules/data/methods/__init__.py | 2 ++ .../data/methods/create_authorized_dataset.py | 19 +++++++++++ .../data/methods/get_authorized_dataset.py | 16 ++++++--- .../methods/get_authorized_dataset_by_name.py | 16 +++++++++ .../data/methods/load_or_create_datasets.py | 22 ++---------- .../graph/methods/get_formatted_graph_data.py | 5 +-- .../reset_dataset_pipeline_run_status.py | 12 +++++++ .../layers/resolve_authorized_user_dataset.py | 34 +++++++++++++++++++ cognee/modules/pipelines/methods/__init__.py | 2 ++ .../methods/get_pipeline_runs_by_dataset.py | 34 +++++++++++++++++++ .../methods/reset_pipeline_run_status.py | 16 +++++++++ .../operations/log_pipeline_run_initiated.py | 2 +- .../modules/pipelines/operations/pipeline.py | 22 ------------ 16 files changed, 169 insertions(+), 55 deletions(-) create mode 100644 cognee/modules/data/methods/create_authorized_dataset.py create mode 100644 cognee/modules/data/methods/get_authorized_dataset_by_name.py create mode 100644 cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py create mode 100644 cognee/modules/pipelines/layers/resolve_authorized_user_dataset.py create mode 100644 cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py create mode 100644 cognee/modules/pipelines/methods/reset_pipeline_run_status.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index e8eea42c3..fbf4e0a54 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,9 +1,15 @@ from uuid import UUID from typing import Union, BinaryIO, List, Optional -from cognee.modules.pipelines import Task from cognee.modules.users.models import User -from cognee.modules.pipelines import cognee_pipeline +from cognee.modules.pipelines import Task, cognee_pipeline +from cognee.modules.pipelines.layers.resolve_authorized_user_dataset import ( + resolve_authorized_user_dataset, +) +from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( + reset_dataset_pipeline_run_status, +) +from cognee.modules.engine.operations.setup import setup from cognee.tasks.ingestion import ingest_data, resolve_data_directories @@ -140,11 +146,17 @@ async def add( Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders), ] + await setup() + + user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user) + + await reset_dataset_pipeline_run_status(authorized_dataset.id, user) + pipeline_run_info = None async for run_info in cognee_pipeline( tasks=tasks, - datasets=dataset_id if dataset_id else dataset_name, + datasets=[authorized_dataset.id], data=data, user=user, pipeline_name="add_pipeline", diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index b63238966..1f60c2f94 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -164,7 +164,7 @@ def get_cognify_router() -> APIRouter: { "pipeline_run_id": str(pipeline_run_info.pipeline_run_id), "status": pipeline_run_info.status, - "payload": await get_formatted_graph_data(pipeline_run.dataset_id, user.id), + "payload": await get_formatted_graph_data(pipeline_run.dataset_id, user), } ) diff --git a/cognee/api/v1/datasets/routers/get_datasets_router.py b/cognee/api/v1/datasets/routers/get_datasets_router.py index a6938d764..8052e3864 100644 --- a/cognee/api/v1/datasets/routers/get_datasets_router.py +++ b/cognee/api/v1/datasets/routers/get_datasets_router.py @@ -284,7 +284,7 @@ def get_datasets_router() -> APIRouter: - **500 Internal Server Error**: Error retrieving graph data """ - graph_data = await get_formatted_graph_data(dataset_id, user.id) + graph_data = await get_formatted_graph_data(dataset_id, user) return graph_data diff --git a/cognee/modules/data/methods/__init__.py b/cognee/modules/data/methods/__init__.py index a17d41683..d9716de95 100644 --- a/cognee/modules/data/methods/__init__.py +++ b/cognee/modules/data/methods/__init__.py @@ -7,6 +7,7 @@ from .get_datasets import get_datasets from .get_datasets_by_name import get_datasets_by_name from .get_dataset_data import get_dataset_data from .get_authorized_dataset import get_authorized_dataset +from .get_authorized_dataset_by_name import get_authorized_dataset_by_name from .get_data import get_data from .get_unique_dataset_id import get_unique_dataset_id from .get_authorized_existing_datasets import get_authorized_existing_datasets @@ -18,6 +19,7 @@ from .delete_data import delete_data # Create from .load_or_create_datasets import load_or_create_datasets +from .create_authorized_dataset import create_authorized_dataset # Check from .check_dataset_name import check_dataset_name diff --git a/cognee/modules/data/methods/create_authorized_dataset.py b/cognee/modules/data/methods/create_authorized_dataset.py new file mode 100644 index 000000000..e43381b35 --- /dev/null +++ b/cognee/modules/data/methods/create_authorized_dataset.py @@ -0,0 +1,19 @@ +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.models import User +from cognee.modules.data.models import Dataset +from cognee.modules.users.permissions.methods import give_permission_on_dataset +from .create_dataset import create_dataset + + +async def create_authorized_dataset(dataset_name: str, user: User) -> Dataset: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + new_dataset = await create_dataset(dataset_name, user, session) + + await give_permission_on_dataset(user, new_dataset.id, "read") + await give_permission_on_dataset(user, new_dataset.id, "write") + await give_permission_on_dataset(user, new_dataset.id, "delete") + await give_permission_on_dataset(user, new_dataset.id, "share") + + return new_dataset diff --git a/cognee/modules/data/methods/get_authorized_dataset.py b/cognee/modules/data/methods/get_authorized_dataset.py index ee6115d82..0e30b7e0e 100644 --- a/cognee/modules/data/methods/get_authorized_dataset.py +++ b/cognee/modules/data/methods/get_authorized_dataset.py @@ -1,11 +1,15 @@ -from typing import Optional from uuid import UUID -from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets +from typing import Optional + +from cognee.modules.users.models import User +from cognee.modules.data.methods.get_authorized_existing_datasets import ( + get_authorized_existing_datasets, +) from ..models import Dataset async def get_authorized_dataset( - user_id: UUID, dataset_id: UUID, permission_type="read" + user: User, dataset_id: UUID, permission_type="read" ) -> Optional[Dataset]: """ Get a specific dataset with permissions for a user. @@ -18,6 +22,8 @@ async def get_authorized_dataset( Returns: Optional[Dataset]: dataset with permissions """ - datasets = await get_specific_user_permission_datasets(user_id, permission_type, [dataset_id]) + authorized_datasets = await get_authorized_existing_datasets( + [dataset_id], permission_type, user + ) - return datasets[0] if datasets else None + return authorized_datasets[0] if authorized_datasets else None diff --git a/cognee/modules/data/methods/get_authorized_dataset_by_name.py b/cognee/modules/data/methods/get_authorized_dataset_by_name.py new file mode 100644 index 000000000..654dcb630 --- /dev/null +++ b/cognee/modules/data/methods/get_authorized_dataset_by_name.py @@ -0,0 +1,16 @@ +from typing import Optional + +from cognee.modules.users.models import User +from cognee.modules.data.methods.get_authorized_existing_datasets import ( + get_authorized_existing_datasets, +) + +from ..models import Dataset + + +async def get_authorized_dataset_by_name( + dataset_name: str, user: User, permission_type: str +) -> Optional[Dataset]: + authorized_datasets = await get_authorized_existing_datasets([], permission_type, user) + + return next((dataset for dataset in authorized_datasets if dataset.name == dataset_name), None) diff --git a/cognee/modules/data/methods/load_or_create_datasets.py b/cognee/modules/data/methods/load_or_create_datasets.py index 43cd0513c..1d6ef3efb 100644 --- a/cognee/modules/data/methods/load_or_create_datasets.py +++ b/cognee/modules/data/methods/load_or_create_datasets.py @@ -1,12 +1,9 @@ from typing import List, Union from uuid import UUID -from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Dataset -from cognee.modules.data.methods import create_dataset -from cognee.modules.data.methods import get_unique_dataset_id +from cognee.modules.data.methods import create_authorized_dataset from cognee.modules.data.exceptions import DatasetNotFoundError -from cognee.modules.users.permissions.methods import give_permission_on_dataset async def load_or_create_datasets( @@ -34,22 +31,7 @@ async def load_or_create_datasets( if isinstance(identifier, UUID): raise DatasetNotFoundError(f"Dataset with given UUID does not exist: {identifier}") - # Otherwise, create a new Dataset instance - new_dataset = Dataset( - id=await get_unique_dataset_id(dataset_name=identifier, user=user), - name=identifier, - owner_id=user.id, - ) - - # Save dataset to database - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - await create_dataset(identifier, user, session) - - await give_permission_on_dataset(user, new_dataset.id, "read") - await give_permission_on_dataset(user, new_dataset.id, "write") - await give_permission_on_dataset(user, new_dataset.id, "delete") - await give_permission_on_dataset(user, new_dataset.id, "share") + new_dataset = await create_authorized_dataset(identifier, user) result.append(new_dataset) diff --git a/cognee/modules/graph/methods/get_formatted_graph_data.py b/cognee/modules/graph/methods/get_formatted_graph_data.py index d08958a7a..34b354722 100644 --- a/cognee/modules/graph/methods/get_formatted_graph_data.py +++ b/cognee/modules/graph/methods/get_formatted_graph_data.py @@ -3,10 +3,11 @@ from cognee.infrastructure.databases.graph import get_graph_engine from cognee.context_global_variables import set_database_global_context_variables from cognee.modules.data.exceptions.exceptions import DatasetNotFoundError from cognee.modules.data.methods import get_authorized_dataset +from cognee.modules.users.models import User -async def get_formatted_graph_data(dataset_id: UUID, user_id: UUID): - dataset = await get_authorized_dataset(user_id, dataset_id) +async def get_formatted_graph_data(dataset_id: UUID, user: User): + dataset = await get_authorized_dataset(user, dataset_id) if not dataset: raise DatasetNotFoundError(message="Dataset not found.") diff --git a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py new file mode 100644 index 000000000..cc72a6e51 --- /dev/null +++ b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py @@ -0,0 +1,12 @@ +from uuid import UUID +from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status +from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus +from cognee.modules.users.models import User + + +async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User): + related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id) + + for pipeline_run in related_pipeline_runs: + if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED: + await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) diff --git a/cognee/modules/pipelines/layers/resolve_authorized_user_dataset.py b/cognee/modules/pipelines/layers/resolve_authorized_user_dataset.py new file mode 100644 index 000000000..30d0fef71 --- /dev/null +++ b/cognee/modules/pipelines/layers/resolve_authorized_user_dataset.py @@ -0,0 +1,34 @@ +from uuid import UUID + +from cognee.api.v1.exceptions import DatasetNotFoundError +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_default_user +from cognee.modules.data.methods import ( + create_authorized_dataset, + get_authorized_dataset, + get_authorized_dataset_by_name, +) + + +async def resolve_authorized_user_dataset(dataset_id: UUID, dataset_name: str, user: User): + if not user: + user = await get_default_user() + + if dataset_id: + authorized_dataset = await get_authorized_dataset(user, dataset_id, "write") + elif dataset_name: + authorized_dataset = await get_authorized_dataset_by_name(dataset_name, user, "write") + + if not authorized_dataset: + authorized_dataset = await create_authorized_dataset( + dataset_name=dataset_name, user=user + ) + else: + raise ValueError("Either dataset_id or dataset_name must be provided.") + + if not authorized_dataset: + raise DatasetNotFoundError( + message=f"Dataset ({str(dataset_id) or dataset_name}) not found." + ) + + return user, authorized_dataset diff --git a/cognee/modules/pipelines/methods/__init__.py b/cognee/modules/pipelines/methods/__init__.py index c15b58207..d470cc3f6 100644 --- a/cognee/modules/pipelines/methods/__init__.py +++ b/cognee/modules/pipelines/methods/__init__.py @@ -1,2 +1,4 @@ from .get_pipeline_run import get_pipeline_run from .get_pipeline_run_by_dataset import get_pipeline_run_by_dataset +from .get_pipeline_runs_by_dataset import get_pipeline_runs_by_dataset +from .reset_pipeline_run_status import reset_pipeline_run_status diff --git a/cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py b/cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py new file mode 100644 index 000000000..62b234bd4 --- /dev/null +++ b/cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py @@ -0,0 +1,34 @@ +from uuid import UUID +from typing import Optional +from sqlalchemy import select, func +from sqlalchemy.orm import aliased + +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models import PipelineRun + + +async def get_pipeline_runs_by_dataset(dataset_id: UUID): + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + query = ( + select( + PipelineRun, + func.row_number() + .over( + partition_by=(PipelineRun.dataset_id, PipelineRun.pipeline_name), + order_by=PipelineRun.created_at.desc(), + ) + .label("rn"), + ) + .filter(PipelineRun.dataset_id == dataset_id) + .subquery() + ) + + aliased_pipeline_run = aliased(PipelineRun, query) + + latest_run = select(aliased_pipeline_run).filter(query.c.rn == 1) + + runs = (await session.execute(latest_run)).scalars().all() + + return runs diff --git a/cognee/modules/pipelines/methods/reset_pipeline_run_status.py b/cognee/modules/pipelines/methods/reset_pipeline_run_status.py new file mode 100644 index 000000000..059909696 --- /dev/null +++ b/cognee/modules/pipelines/methods/reset_pipeline_run_status.py @@ -0,0 +1,16 @@ +from uuid import UUID +from cognee.modules.pipelines.utils.generate_pipeline_id import generate_pipeline_id +from cognee.modules.pipelines.operations.log_pipeline_run_initiated import ( + log_pipeline_run_initiated, +) + + +async def reset_pipeline_run_status(user_id: UUID, dataset_id: UUID, pipeline_name: str): + pipeline_id = generate_pipeline_id(user_id, dataset_id, pipeline_name) + + # Without this the pipeline status will be DATASET_PROCESSING_COMPLETED and will skip the execution. + await log_pipeline_run_initiated( + pipeline_id=pipeline_id, + pipeline_name=pipeline_name, + dataset_id=dataset_id, + ) diff --git a/cognee/modules/pipelines/operations/log_pipeline_run_initiated.py b/cognee/modules/pipelines/operations/log_pipeline_run_initiated.py index e68efe31e..0ac054c7b 100644 --- a/cognee/modules/pipelines/operations/log_pipeline_run_initiated.py +++ b/cognee/modules/pipelines/operations/log_pipeline_run_initiated.py @@ -4,7 +4,7 @@ from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus from cognee.modules.pipelines.utils import generate_pipeline_run_id -async def log_pipeline_run_initiated(pipeline_id: str, pipeline_name: str, dataset_id: UUID): +async def log_pipeline_run_initiated(pipeline_id: UUID, pipeline_name: str, dataset_id: UUID): pipeline_run = PipelineRun( pipeline_run_id=generate_pipeline_run_id(pipeline_id, dataset_id), pipeline_name=pipeline_name, diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index a36fd5cee..ad24edac8 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -9,11 +9,9 @@ from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines.operations.run_tasks import run_tasks -from cognee.modules.pipelines.utils import generate_pipeline_id from cognee.modules.pipelines.layers import validate_pipeline_tasks from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.models import User -from cognee.modules.pipelines.operations import log_pipeline_run_initiated from cognee.context_global_variables import set_database_global_context_variables from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( resolve_authorized_user_datasets, @@ -66,26 +64,6 @@ async def run_pipeline( # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) - # Ugly hack, but no easier way to do this. - if pipeline_name == "add_pipeline": - pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) - # Refresh the add pipeline status so data is added to a dataset. - # Without this the app_pipeline status will be DATASET_PROCESSING_COMPLETED and will skip the execution. - - await log_pipeline_run_initiated( - pipeline_id=pipeline_id, - pipeline_name="add_pipeline", - dataset_id=dataset.id, - ) - - # Refresh the cognify pipeline status after we add new files. - # Without this the cognify_pipeline status will be DATASET_PROCESSING_COMPLETED and will skip the execution. - await log_pipeline_run_initiated( - pipeline_id=pipeline_id, - pipeline_name="cognify_pipeline", - dataset_id=dataset.id, - ) - if not data: data: list[Data] = await get_dataset_data(dataset_id=dataset.id)