feat: Add process_pipeline_check

This commit is contained in:
Igor Ilic 2025-08-25 17:16:08 +02:00
parent 6d438c8800
commit d4b23aa565
4 changed files with 29 additions and 68 deletions

View file

@ -1,60 +0,0 @@
from datetime import datetime, timezone
from sqlalchemy.orm import Mapped, MappedColumn
from sqlalchemy import Column, DateTime, ForeignKey, Enum, JSON
from cognee.infrastructure.databases.relational import Base, UUID
class OperationType(Enum):
"""
Define various types of operations for data handling.
Public methods:
- __str__(): Returns a string representation of the operation type.
Instance variables:
- MERGE_DATA: Represents the merge data operation type.
- APPEND_DATA: Represents the append data operation type.
"""
MERGE_DATA = "MERGE_DATA"
APPEND_DATA = "APPEND_DATA"
class OperationStatus(Enum):
"""
Represent the status of an operation with predefined states.
"""
STARTED = "OPERATION_STARTED"
IN_PROGRESS = "OPERATION_IN_PROGRESS"
COMPLETE = "OPERATION_COMPLETE"
ERROR = "OPERATION_ERROR"
CANCELLED = "OPERATION_CANCELLED"
class Operation(Base):
"""
Represents an operation in the system, extending the Base class.
This class defines the structure of the 'operation' table, including fields for the
operation's ID, status, type, associated data, metadata, and creation timestamp. The
public methods available in this class are inherited from the Base class. Instance
variables include:
- id: Unique identifier for the operation.
- status: The current status of the operation.
- operation_type: The type of operation being represented.
- data_id: Foreign key referencing the associated data's ID.
- meta_data: Additional metadata related to the operation.
- created_at: Timestamp for when the operation was created.
"""
__tablename__ = "operation"
id = Column(UUID, primary_key=True)
status = Column(Enum(OperationStatus))
operation_type = Column(Enum(OperationType))
data_id = Column(UUID, ForeignKey("data.id"))
meta_data: Mapped[dict] = MappedColumn(type_=JSON)
created_at = Column(DateTime, default=datetime.now(timezone.utc))

View file

@ -1,4 +1,5 @@
from cognee.modules.data.models import Dataset
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset
@ -12,7 +13,23 @@ from cognee.modules.pipelines.models.PipelineRunInfo import (
logger = get_logger(__name__)
async def pipeline_status_check(dataset, data, pipeline_name):
async def process_pipeline_check(
dataset: Dataset, data: list[Data], pipeline_name: str
) -> [None, PipelineRunStarted, PipelineRunCompleted]:
"""
Function used to determine if pipeline is currently being processed or was already processed.
In case pipeline was or is being processed return value is returned and current pipline execution should be stopped.
In case pipeline is not or was not processed there will be no return value and pipeline processing can start.
Args:
dataset: Dataset object
data: List of Data
pipeline_name: pipeline name
Returns: Pipeline state if it is being processed or was already processed
"""
# async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests
if isinstance(dataset, Dataset):
task_status = await get_pipeline_status([dataset.id], pipeline_name)
@ -25,19 +42,19 @@ async def pipeline_status_check(dataset, data, pipeline_name):
if task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED:
logger.info("Dataset %s is already being processed.", dataset.id)
pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name)
yield PipelineRunStarted(
return PipelineRunStarted(
pipeline_run_id=pipeline_run.pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
return
elif task_status[str(dataset.id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED:
logger.info("Dataset %s is already processed.", dataset.id)
pipeline_run = await get_pipeline_run_by_dataset(dataset.id, pipeline_name)
yield PipelineRunCompleted(
return PipelineRunCompleted(
pipeline_run_id=pipeline_run.pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
return
return

View file

@ -13,7 +13,7 @@ from cognee.modules.users.models import User
from cognee.modules.pipelines.operations import log_pipeline_run_initiated
from cognee.context_global_variables import set_database_global_context_variables
from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets
from cognee.modules.pipelines.layers.pipeline_status_check import pipeline_status_check
from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
@ -117,8 +117,12 @@ async def run_pipeline(
if not data:
data: list[Data] = await get_dataset_data(dataset_id=dataset.id)
async for pipeline_status in pipeline_status_check(dataset, data, pipeline_name):
yield pipeline_status
process_pipeline_status = await process_pipeline_check(dataset, data, pipeline_name)
if process_pipeline_status:
# If pipeline was already processed or is currently being processed
# return status information to async generator and finish execution
yield process_pipeline_status
return
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")