diff --git a/cognee/infrastructure/pipeline/models/Operation.py b/cognee/infrastructure/pipeline/models/Operation.py deleted file mode 100644 index 29a2ec043..000000000 --- a/cognee/infrastructure/pipeline/models/Operation.py +++ /dev/null @@ -1,60 +0,0 @@ -from datetime import datetime, timezone -from sqlalchemy.orm import Mapped, MappedColumn -from sqlalchemy import Column, DateTime, ForeignKey, Enum, JSON -from cognee.infrastructure.databases.relational import Base, UUID - - -class OperationType(Enum): - """ - Define various types of operations for data handling. - - Public methods: - - __str__(): Returns a string representation of the operation type. - - Instance variables: - - MERGE_DATA: Represents the merge data operation type. - - APPEND_DATA: Represents the append data operation type. - """ - - MERGE_DATA = "MERGE_DATA" - APPEND_DATA = "APPEND_DATA" - - -class OperationStatus(Enum): - """ - Represent the status of an operation with predefined states. - """ - - STARTED = "OPERATION_STARTED" - IN_PROGRESS = "OPERATION_IN_PROGRESS" - COMPLETE = "OPERATION_COMPLETE" - ERROR = "OPERATION_ERROR" - CANCELLED = "OPERATION_CANCELLED" - - -class Operation(Base): - """ - Represents an operation in the system, extending the Base class. - - This class defines the structure of the 'operation' table, including fields for the - operation's ID, status, type, associated data, metadata, and creation timestamp. The - public methods available in this class are inherited from the Base class. Instance - variables include: - - id: Unique identifier for the operation. - - status: The current status of the operation. - - operation_type: The type of operation being represented. - - data_id: Foreign key referencing the associated data's ID. - - meta_data: Additional metadata related to the operation. - - created_at: Timestamp for when the operation was created. - """ - - __tablename__ = "operation" - - id = Column(UUID, primary_key=True) - status = Column(Enum(OperationStatus)) - operation_type = Column(Enum(OperationType)) - - data_id = Column(UUID, ForeignKey("data.id")) - meta_data: Mapped[dict] = MappedColumn(type_=JSON) - - created_at = Column(DateTime, default=datetime.now(timezone.utc)) diff --git a/cognee/infrastructure/pipeline/models/__init__.py b/cognee/infrastructure/pipeline/models/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/cognee/modules/pipelines/layers/pipeline_status_check.py b/cognee/modules/pipelines/layers/process_pipeline_check.py similarity index 67% rename from cognee/modules/pipelines/layers/pipeline_status_check.py rename to cognee/modules/pipelines/layers/process_pipeline_check.py index ac8abc0df..12cbe07aa 100644 --- a/cognee/modules/pipelines/layers/pipeline_status_check.py +++ b/cognee/modules/pipelines/layers/process_pipeline_check.py @@ -1,4 +1,5 @@ from cognee.modules.data.models import Dataset +from cognee.modules.data.models import Data from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset @@ -12,7 +13,23 @@ from cognee.modules.pipelines.models.PipelineRunInfo import ( logger = get_logger(__name__) -async def pipeline_status_check(dataset, data, pipeline_name): +async def process_pipeline_check( + dataset: Dataset, data: list[Data], pipeline_name: str +) -> [None, PipelineRunStarted, PipelineRunCompleted]: + """ + Function used to determine if pipeline is currently being processed or was already processed. + In case pipeline was or is being processed return value is returned and current pipline execution should be stopped. + In case pipeline is not or was not processed there will be no return value and pipeline processing can start. + + Args: + dataset: Dataset object + data: List of Data + pipeline_name: pipeline name + + Returns: Pipeline state if it is being processed or was already processed + + """ + # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests if isinstance(dataset, Dataset): task_status = await get_pipeline_status([dataset.id], pipeline_name) @@ -25,19 +42,19 @@ async def pipeline_status_check(dataset, data, pipeline_name): if task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: logger.info("Dataset %s is already being processed.", dataset.id) pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) - yield PipelineRunStarted( + return PipelineRunStarted( pipeline_run_id=pipeline_run.pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, payload=data, ) - return elif task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: logger.info("Dataset %s is already processed.", dataset.id) pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name) - yield PipelineRunCompleted( + return PipelineRunCompleted( pipeline_run_id=pipeline_run.pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, ) - return + + return diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e95340619..569df8501 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -13,7 +13,7 @@ from cognee.modules.users.models import User from cognee.modules.pipelines.operations import log_pipeline_run_initiated from cognee.context_global_variables import set_database_global_context_variables from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets -from cognee.modules.pipelines.layers.pipeline_status_check import pipeline_status_check +from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, @@ -117,8 +117,12 @@ async def run_pipeline( if not data: data: list[Data] = await get_dataset_data(dataset_id=dataset.id) - async for pipeline_status in pipeline_status_check(dataset, data, pipeline_name): - yield pipeline_status + process_pipeline_status = await process_pipeline_check(dataset, data, pipeline_name) + if process_pipeline_status: + # If pipeline was already processed or is currently being processed + # return status information to async generator and finish execution + yield process_pipeline_status + return if not isinstance(tasks, list): raise ValueError("Tasks must be a list")