From 6d438c8800f3b02a29b5b4da86df1fc1b23c6dc6 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 25 Aug 2025 12:47:48 +0200 Subject: [PATCH 1/5] feat: Add dataset and pipeline status layer --- cognee/modules/pipelines/layers/__init__.py | 0 .../layers/authorized_user_datasets.py | 40 ++++++++++ .../pipelines/layers/pipeline_status_check.py | 43 ++++++++++ .../modules/pipelines/operations/pipeline.py | 79 ++----------------- 4 files changed, 91 insertions(+), 71 deletions(-) create mode 100644 cognee/modules/pipelines/layers/__init__.py create mode 100644 cognee/modules/pipelines/layers/authorized_user_datasets.py create mode 100644 cognee/modules/pipelines/layers/pipeline_status_check.py diff --git a/cognee/modules/pipelines/layers/__init__.py b/cognee/modules/pipelines/layers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/modules/pipelines/layers/authorized_user_datasets.py b/cognee/modules/pipelines/layers/authorized_user_datasets.py new file mode 100644 index 000000000..f0ba013ec --- /dev/null +++ b/cognee/modules/pipelines/layers/authorized_user_datasets.py @@ -0,0 +1,40 @@ +from uuid import UUID +from typing import Union + +from cognee.modules.users.methods import get_default_user +from cognee.modules.users.models import User +from cognee.modules.data.exceptions import DatasetNotFoundError +from cognee.modules.data.methods import ( + get_authorized_existing_datasets, + load_or_create_datasets, + check_dataset_name, +) + + +async def authorized_user_datasets(user: User, datasets: Union[str, list[str], list[UUID]]): + # If no user is provided use default user + if user is None: + user = await get_default_user() + + # Convert datasets to list + if isinstance(datasets, str) or isinstance(datasets, UUID): + datasets = [datasets] + + # Get datasets user wants write permissions for (verify user has permissions if datasets are provided as well) + # NOTE: If a user wants to write to a dataset he does not own it must be provided through UUID + existing_datasets = await get_authorized_existing_datasets(datasets, "write", user) + + if not datasets: + # Get datasets from database if none sent. + authorized_datasets = existing_datasets + else: + # If dataset matches an existing Dataset (by name or id), reuse it. Otherwise, create a new Dataset. + authorized_datasets = await load_or_create_datasets(datasets, existing_datasets, user) + + if not authorized_datasets: + raise DatasetNotFoundError("There are no datasets to work with.") + + for dataset in authorized_datasets: + check_dataset_name(dataset.name) + + return user, authorized_datasets diff --git a/cognee/modules/pipelines/layers/pipeline_status_check.py b/cognee/modules/pipelines/layers/pipeline_status_check.py new file mode 100644 index 000000000..ac8abc0df --- /dev/null +++ b/cognee/modules/pipelines/layers/pipeline_status_check.py @@ -0,0 +1,43 @@ +from cognee.modules.data.models import Dataset +from cognee.modules.pipelines.models import PipelineRunStatus +from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status +from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset +from cognee.shared.logging_utils import get_logger + +from cognee.modules.pipelines.models.PipelineRunInfo import ( + PipelineRunCompleted, + PipelineRunStarted, +) + +logger = get_logger(__name__) + + +async def pipeline_status_check(dataset, data, pipeline_name): + # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests + if isinstance(dataset, Dataset): + task_status = await get_pipeline_status([dataset.id], pipeline_name) + else: + task_status = [ + PipelineRunStatus.DATASET_PROCESSING_COMPLETED + ] # TODO: this is a random assignment, find permanent solution + + if str(dataset.id) in task_status: + if task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: + logger.info("Dataset %s is already being processed.", dataset.id) + pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) + yield PipelineRunStarted( + pipeline_run_id=pipeline_run.pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=data, + ) + return + elif task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: + logger.info("Dataset %s is already processed.", dataset.id) + pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) + yield PipelineRunCompleted( + pipeline_run_id=pipeline_run.pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ) + return diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e52441101..e95340619 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -6,27 +6,14 @@ from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines.operations.run_tasks import run_tasks -from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.utils import generate_pipeline_id -from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status -from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.modules.pipelines.operations import log_pipeline_run_initiated from cognee.context_global_variables import set_database_global_context_variables -from cognee.modules.data.exceptions import DatasetNotFoundError -from cognee.modules.data.methods import ( - get_authorized_existing_datasets, - load_or_create_datasets, - check_dataset_name, -) - -from cognee.modules.pipelines.models.PipelineRunInfo import ( - PipelineRunCompleted, - PipelineRunStarted, -) +from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets +from cognee.modules.pipelines.layers.pipeline_status_check import pipeline_status_check from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, @@ -80,29 +67,9 @@ async def cognee_pipeline( await test_embedding_connection() cognee_pipeline.first_run = False # Update flag after first run - # If no user is provided use default user - if user is None: - user = await get_default_user() + user, authorized_datasets = await authorized_user_datasets(user, datasets) - # Convert datasets to list - if isinstance(datasets, str) or isinstance(datasets, UUID): - datasets = [datasets] - - # Get datasets user wants write permissions for (verify user has permissions if datasets are provided as well) - # NOTE: If a user wants to write to a dataset he does not own it must be provided through UUID - existing_datasets = await get_authorized_existing_datasets(datasets, "write", user) - - if not datasets: - # Get datasets from database if none sent. - datasets = existing_datasets - else: - # If dataset matches an existing Dataset (by name or id), reuse it. Otherwise, create a new Dataset. - datasets = await load_or_create_datasets(datasets, existing_datasets, user) - - if not datasets: - raise DatasetNotFoundError("There are no datasets to work with.") - - for dataset in datasets: + for dataset in authorized_datasets: async for run_info in run_pipeline( dataset=dataset, user=user, @@ -124,8 +91,6 @@ async def run_pipeline( context: dict = None, incremental_loading=False, ): - check_dataset_name(dataset.name) - # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -149,39 +114,11 @@ async def run_pipeline( dataset_id=dataset.id, ) - dataset_id = dataset.id - if not data: - data: list[Data] = await get_dataset_data(dataset_id=dataset_id) + data: list[Data] = await get_dataset_data(dataset_id=dataset.id) - # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests - if isinstance(dataset, Dataset): - task_status = await get_pipeline_status([dataset_id], pipeline_name) - else: - task_status = [ - PipelineRunStatus.DATASET_PROCESSING_COMPLETED - ] # TODO: this is a random assignment, find permanent solution - - if str(dataset_id) in task_status: - if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: - logger.info("Dataset %s is already being processed.", dataset_id) - pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name) - yield PipelineRunStarted( - pipeline_run_id=pipeline_run.pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - payload=data, - ) - return - elif task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: - logger.info("Dataset %s is already processed.", dataset_id) - pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name) - yield PipelineRunCompleted( - pipeline_run_id=pipeline_run.pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - ) - return + async for pipeline_status in pipeline_status_check(dataset, data, pipeline_name): + yield pipeline_status if not isinstance(tasks, list): raise ValueError("Tasks must be a list") @@ -191,7 +128,7 @@ async def run_pipeline( raise ValueError(f"Task {task} is not an instance of Task") pipeline_run = run_tasks( - tasks, dataset_id, data, user, pipeline_name, context, incremental_loading + tasks, dataset.id, data, user, pipeline_name, context, incremental_loading ) async for pipeline_run_info in pipeline_run: From d4b23aa565ed47779da31c07690eb0c04c75fa3f Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 25 Aug 2025 17:16:08 +0200 Subject: [PATCH 2/5] feat: Add process_pipeline_check --- .../pipeline/models/Operation.py | 60 ------------------- .../pipeline/models/__init__.py | 0 ...tus_check.py => process_pipeline_check.py} | 27 +++++++-- .../modules/pipelines/operations/pipeline.py | 10 +++- 4 files changed, 29 insertions(+), 68 deletions(-) delete mode 100644 cognee/infrastructure/pipeline/models/Operation.py delete mode 100644 cognee/infrastructure/pipeline/models/__init__.py rename cognee/modules/pipelines/layers/{pipeline_status_check.py => process_pipeline_check.py} (67%) diff --git a/cognee/infrastructure/pipeline/models/Operation.py b/cognee/infrastructure/pipeline/models/Operation.py deleted file mode 100644 index 29a2ec043..000000000 --- a/cognee/infrastructure/pipeline/models/Operation.py +++ /dev/null @@ -1,60 +0,0 @@ -from datetime import datetime, timezone -from sqlalchemy.orm import Mapped, MappedColumn -from sqlalchemy import Column, DateTime, ForeignKey, Enum, JSON -from cognee.infrastructure.databases.relational import Base, UUID - - -class OperationType(Enum): - """ - Define various types of operations for data handling. - - Public methods: - - __str__(): Returns a string representation of the operation type. - - Instance variables: - - MERGE_DATA: Represents the merge data operation type. - - APPEND_DATA: Represents the append data operation type. - """ - - MERGE_DATA = "MERGE_DATA" - APPEND_DATA = "APPEND_DATA" - - -class OperationStatus(Enum): - """ - Represent the status of an operation with predefined states. - """ - - STARTED = "OPERATION_STARTED" - IN_PROGRESS = "OPERATION_IN_PROGRESS" - COMPLETE = "OPERATION_COMPLETE" - ERROR = "OPERATION_ERROR" - CANCELLED = "OPERATION_CANCELLED" - - -class Operation(Base): - """ - Represents an operation in the system, extending the Base class. - - This class defines the structure of the 'operation' table, including fields for the - operation's ID, status, type, associated data, metadata, and creation timestamp. The - public methods available in this class are inherited from the Base class. Instance - variables include: - - id: Unique identifier for the operation. - - status: The current status of the operation. - - operation_type: The type of operation being represented. - - data_id: Foreign key referencing the associated data's ID. - - meta_data: Additional metadata related to the operation. - - created_at: Timestamp for when the operation was created. - """ - - __tablename__ = "operation" - - id = Column(UUID, primary_key=True) - status = Column(Enum(OperationStatus)) - operation_type = Column(Enum(OperationType)) - - data_id = Column(UUID, ForeignKey("data.id")) - meta_data: Mapped[dict] = MappedColumn(type_=JSON) - - created_at = Column(DateTime, default=datetime.now(timezone.utc)) diff --git a/cognee/infrastructure/pipeline/models/__init__.py b/cognee/infrastructure/pipeline/models/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/cognee/modules/pipelines/layers/pipeline_status_check.py b/cognee/modules/pipelines/layers/process_pipeline_check.py similarity index 67% rename from cognee/modules/pipelines/layers/pipeline_status_check.py rename to cognee/modules/pipelines/layers/process_pipeline_check.py index ac8abc0df..12cbe07aa 100644 --- a/cognee/modules/pipelines/layers/pipeline_status_check.py +++ b/cognee/modules/pipelines/layers/process_pipeline_check.py @@ -1,4 +1,5 @@ from cognee.modules.data.models import Dataset +from cognee.modules.data.models import Data from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset @@ -12,7 +13,23 @@ from cognee.modules.pipelines.models.PipelineRunInfo import ( logger = get_logger(__name__) -async def pipeline_status_check(dataset, data, pipeline_name): +async def process_pipeline_check( + dataset: Dataset, data: list[Data], pipeline_name: str +) -> [None, PipelineRunStarted, PipelineRunCompleted]: + """ + Function used to determine if pipeline is currently being processed or was already processed. + In case pipeline was or is being processed return value is returned and current pipline execution should be stopped. + In case pipeline is not or was not processed there will be no return value and pipeline processing can start. + + Args: + dataset: Dataset object + data: List of Data + pipeline_name: pipeline name + + Returns: Pipeline state if it is being processed or was already processed + + """ + # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests if isinstance(dataset, Dataset): task_status = await get_pipeline_status([dataset.id], pipeline_name) @@ -25,19 +42,19 @@ async def pipeline_status_check(dataset, data, pipeline_name): if task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: logger.info("Dataset %s is already being processed.", dataset.id) pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) - yield PipelineRunStarted( + return PipelineRunStarted( pipeline_run_id=pipeline_run.pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, payload=data, ) - return elif task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: logger.info("Dataset %s is already processed.", dataset.id) pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) - yield PipelineRunCompleted( + return PipelineRunCompleted( pipeline_run_id=pipeline_run.pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, ) - return + + return diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e95340619..569df8501 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -13,7 +13,7 @@ from cognee.modules.users.models import User from cognee.modules.pipelines.operations import log_pipeline_run_initiated from cognee.context_global_variables import set_database_global_context_variables from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets -from cognee.modules.pipelines.layers.pipeline_status_check import pipeline_status_check +from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, @@ -117,8 +117,12 @@ async def run_pipeline( if not data: data: list[Data] = await get_dataset_data(dataset_id=dataset.id) - async for pipeline_status in pipeline_status_check(dataset, data, pipeline_name): - yield pipeline_status + process_pipeline_status = await process_pipeline_check(dataset, data, pipeline_name) + if process_pipeline_status: + # If pipeline was already processed or is currently being processed + # return status information to async generator and finish execution + yield process_pipeline_status + return if not isinstance(tasks, list): raise ValueError("Tasks must be a list") From 9a4e8ddc4d885401c11a65da69119b31e8f1733b Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 25 Aug 2025 17:22:37 +0200 Subject: [PATCH 3/5] refactor: Make the authorized_user_datasets function more understandable --- .../layers/authorized_user_datasets.py | 19 +++++++++++++++++-- .../modules/pipelines/operations/pipeline.py | 2 +- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/cognee/modules/pipelines/layers/authorized_user_datasets.py b/cognee/modules/pipelines/layers/authorized_user_datasets.py index f0ba013ec..2b4117208 100644 --- a/cognee/modules/pipelines/layers/authorized_user_datasets.py +++ b/cognee/modules/pipelines/layers/authorized_user_datasets.py @@ -1,8 +1,9 @@ from uuid import UUID -from typing import Union +from typing import Union, Tuple, List from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User +from cognee.modules.data.models import Dataset from cognee.modules.data.exceptions import DatasetNotFoundError from cognee.modules.data.methods import ( get_authorized_existing_datasets, @@ -11,7 +12,21 @@ from cognee.modules.data.methods import ( ) -async def authorized_user_datasets(user: User, datasets: Union[str, list[str], list[UUID]]): +async def authorized_user_datasets( + datasets: Union[str, list[str], list[UUID]], user: User = None +) -> Tuple[User, List[Dataset]]: + """ + Function handles creation and dataset authorization if datasets already exist for Cognee. + Verifies that provided user has necessary permission for provided Dataset. + If Dataset does not exist creates the Dataset and gives permission for the user creating the dataset. + + Args: + user: Cognee User request is being processed for, if None default user will be used. + datasets: Dataset names or Dataset UUID (in case Datasets already exist) + + Returns: + + """ # If no user is provided use default user if user is None: user = await get_default_user() diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 569df8501..7a520f4a4 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -67,7 +67,7 @@ async def cognee_pipeline( await test_embedding_connection() cognee_pipeline.first_run = False # Update flag after first run - user, authorized_datasets = await authorized_user_datasets(user, datasets) + user, authorized_datasets = await authorized_user_datasets(datasets, user) for dataset in authorized_datasets: async for run_info in run_pipeline( From 950d29a678f6140322aa2f7e436112865ff134f9 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 26 Aug 2025 14:04:33 +0200 Subject: [PATCH 4/5] refactor: Update typing --- .../modules/pipelines/layers/authorized_user_datasets.py | 2 +- cognee/modules/pipelines/layers/process_pipeline_check.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cognee/modules/pipelines/layers/authorized_user_datasets.py b/cognee/modules/pipelines/layers/authorized_user_datasets.py index 2b4117208..fd9c400ca 100644 --- a/cognee/modules/pipelines/layers/authorized_user_datasets.py +++ b/cognee/modules/pipelines/layers/authorized_user_datasets.py @@ -13,7 +13,7 @@ from cognee.modules.data.methods import ( async def authorized_user_datasets( - datasets: Union[str, list[str], list[UUID]], user: User = None + datasets: Union[str, UUID, list[str], list[UUID]], user: User = None ) -> Tuple[User, List[Dataset]]: """ Function handles creation and dataset authorization if datasets already exist for Cognee. diff --git a/cognee/modules/pipelines/layers/process_pipeline_check.py b/cognee/modules/pipelines/layers/process_pipeline_check.py index 12cbe07aa..8318bb50e 100644 --- a/cognee/modules/pipelines/layers/process_pipeline_check.py +++ b/cognee/modules/pipelines/layers/process_pipeline_check.py @@ -1,3 +1,4 @@ +from typing import Union, Optional from cognee.modules.data.models import Dataset from cognee.modules.data.models import Data from cognee.modules.pipelines.models import PipelineRunStatus @@ -15,7 +16,7 @@ logger = get_logger(__name__) async def process_pipeline_check( dataset: Dataset, data: list[Data], pipeline_name: str -) -> [None, PipelineRunStarted, PipelineRunCompleted]: +) -> Optional[Union[PipelineRunStarted, PipelineRunCompleted]]: """ Function used to determine if pipeline is currently being processed or was already processed. In case pipeline was or is being processed return value is returned and current pipline execution should be stopped. @@ -34,9 +35,7 @@ async def process_pipeline_check( if isinstance(dataset, Dataset): task_status = await get_pipeline_status([dataset.id], pipeline_name) else: - task_status = [ - PipelineRunStatus.DATASET_PROCESSING_COMPLETED - ] # TODO: this is a random assignment, find permanent solution + task_status = {} if str(dataset.id) in task_status: if task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: From 5ea3056564d92142f1b7469b6c3f11b1704a5974 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 26 Aug 2025 15:58:53 +0200 Subject: [PATCH 5/5] refactor: Rename auth layer --- ...user_datasets.py => resolve_authorized_user_datasets.py} | 2 +- cognee/modules/pipelines/operations/pipeline.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) rename cognee/modules/pipelines/layers/{authorized_user_datasets.py => resolve_authorized_user_datasets.py} (97%) diff --git a/cognee/modules/pipelines/layers/authorized_user_datasets.py b/cognee/modules/pipelines/layers/resolve_authorized_user_datasets.py similarity index 97% rename from cognee/modules/pipelines/layers/authorized_user_datasets.py rename to cognee/modules/pipelines/layers/resolve_authorized_user_datasets.py index fd9c400ca..4f6fb8254 100644 --- a/cognee/modules/pipelines/layers/authorized_user_datasets.py +++ b/cognee/modules/pipelines/layers/resolve_authorized_user_datasets.py @@ -12,7 +12,7 @@ from cognee.modules.data.methods import ( ) -async def authorized_user_datasets( +async def resolve_authorized_user_datasets( datasets: Union[str, UUID, list[str], list[UUID]], user: User = None ) -> Tuple[User, List[Dataset]]: """ diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 7a520f4a4..770312c50 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -12,7 +12,9 @@ from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.models import User from cognee.modules.pipelines.operations import log_pipeline_run_initiated from cognee.context_global_variables import set_database_global_context_variables -from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets +from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( + resolve_authorized_user_datasets, +) from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check from cognee.infrastructure.databases.relational import ( @@ -67,7 +69,7 @@ async def cognee_pipeline( await test_embedding_connection() cognee_pipeline.first_run = False # Update flag after first run - user, authorized_datasets = await authorized_user_datasets(datasets, user) + user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user) for dataset in authorized_datasets: async for run_info in run_pipeline(