diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index a3dabe1e2..00a0d3dc9 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -1,7 +1,6 @@ import os import pathlib import asyncio -from uuid import NAMESPACE_OID, uuid5 from cognee.shared.logging_utils import get_logger, setup_logging from cognee.modules.observability.get_observe import get_observe @@ -12,8 +11,8 @@ from cognee.modules.pipelines import run_tasks from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.shared.data_models import KnowledgeGraph +from cognee.modules.data.methods import create_dataset from cognee.tasks.documents import classify_documents, extract_chunks_from_documents -from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.ingestion import ingest_data from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies @@ -21,6 +20,7 @@ from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependen from cognee.tasks.storage import add_data_points from cognee.tasks.summarization import summarize_text from cognee.infrastructure.llm import get_max_chunk_tokens +from cognee.infrastructure.databases.relational import get_relational_engine observe = get_observe() @@ -65,16 +65,21 @@ async def run_code_graph_pipeline(repo_path, include_docs=False): ), ] - dataset_id = await get_unique_dataset_id("codebase", user) + dataset_name = "codebase" + + # Save dataset to database + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + dataset = await create_dataset(dataset_name, user, session) if include_docs: non_code_pipeline_run = run_tasks( - non_code_tasks, dataset_id, repo_path, user, "cognify_pipeline" + non_code_tasks, dataset.id, repo_path, user, "cognify_pipeline" ) async for run_status in non_code_pipeline_run: yield run_status - async for run_status in run_tasks(tasks, dataset_id, repo_path, user, "cognify_code_pipeline"): + async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): yield run_status diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 0674c837e..a3cf645d3 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -10,7 +10,7 @@ from cognee.modules.pipelines import cognee_pipeline from cognee.modules.pipelines.tasks.task import Task from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver -from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted +from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunErrored from cognee.modules.pipelines.queues.pipeline_run_info_queues import push_to_queue from cognee.modules.users.models import User @@ -66,7 +66,7 @@ async def run_cognify_blocking( graph_db_config: dict = None, vector_db_config: dict = False, ): - pipeline_run_info = None + total_run_info = {} async for run_info in cognee_pipeline( tasks=tasks, @@ -76,9 +76,12 @@ async def run_cognify_blocking( graph_db_config=graph_db_config, vector_db_config=vector_db_config, ): - pipeline_run_info = run_info + if run_info.dataset_id: + total_run_info[run_info.dataset_id] = run_info + else: + total_run_info = run_info - return pipeline_run_info + return total_run_info async def run_cognify_as_background_process( @@ -88,30 +91,43 @@ async def run_cognify_as_background_process( graph_db_config: dict = None, vector_db_config: dict = False, ): - pipeline_run = cognee_pipeline( - tasks=tasks, - user=user, - datasets=datasets, - pipeline_name="cognify_pipeline", - graph_db_config=graph_db_config, - vector_db_config=vector_db_config, - ) + # Store pipeline status for all pipelines + pipeline_run_started_info = [] - pipeline_run_started_info = await anext(pipeline_run) + async def handle_rest_of_the_run(pipeline_list): + # Execute all provided pipelines one by one to avoid database write conflicts + for pipeline in pipeline_list: + while True: + try: + pipeline_run_info = await anext(pipeline) - async def handle_rest_of_the_run(): - while True: - try: - pipeline_run_info = await anext(pipeline_run) + push_to_queue(pipeline_run_info.pipeline_run_id, pipeline_run_info) - push_to_queue(pipeline_run_info.pipeline_run_id, pipeline_run_info) - - if isinstance(pipeline_run_info, PipelineRunCompleted): + if isinstance(pipeline_run_info, PipelineRunCompleted) or isinstance( + pipeline_run_info, PipelineRunErrored + ): + break + except StopAsyncIteration: break - except StopAsyncIteration: - break - asyncio.create_task(handle_rest_of_the_run()) + # Start all pipelines to get started status + pipeline_list = [] + for dataset in datasets: + pipeline_run = cognee_pipeline( + tasks=tasks, + user=user, + datasets=dataset, + pipeline_name="cognify_pipeline", + graph_db_config=graph_db_config, + vector_db_config=vector_db_config, + ) + + # Save dataset Pipeline run started info + pipeline_run_started_info.append(await anext(pipeline_run)) + pipeline_list.append(pipeline_run) + + # Send all started pipelines to execute one by one in background + asyncio.create_task(handle_rest_of_the_run(pipeline_list=pipeline_list)) return pipeline_run_started_info diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index 06d9ee38e..c4c3d9525 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -39,7 +39,7 @@ class CognifyPayloadDTO(InDTO): def get_cognify_router() -> APIRouter: router = APIRouter() - @router.post("", response_model=None) + @router.post("", response_model=dict) async def cognify(payload: CognifyPayloadDTO, user: User = Depends(get_authenticated_user)): """This endpoint is responsible for the cognitive processing of the content.""" if not payload.datasets and not payload.dataset_ids: @@ -56,7 +56,7 @@ def get_cognify_router() -> APIRouter: datasets, user, payload.graph_model, run_in_background=payload.run_in_background ) - return cognify_run.model_dump() + return cognify_run except Exception as error: return JSONResponse(status_code=409, content={"error": str(error)}) diff --git a/cognee/modules/data/methods/load_or_create_datasets.py b/cognee/modules/data/methods/load_or_create_datasets.py index ff2c2eb65..c2dfc201d 100644 --- a/cognee/modules/data/methods/load_or_create_datasets.py +++ b/cognee/modules/data/methods/load_or_create_datasets.py @@ -1,7 +1,9 @@ from typing import List, Union from uuid import UUID +from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Dataset +from cognee.modules.data.methods import create_dataset from cognee.modules.data.methods import get_unique_dataset_id from cognee.modules.data.exceptions import DatasetNotFoundError @@ -12,7 +14,7 @@ async def load_or_create_datasets( """ Given a list of dataset identifiers (names or UUIDs), return Dataset instances: - If an identifier matches an existing Dataset (by name or id), reuse it. - - Otherwise, create a new Dataset with a unique id. Note: Created dataset is not stored to database. + - Otherwise, create a new Dataset with a unique id. """ result: List[Dataset] = [] @@ -37,6 +39,12 @@ async def load_or_create_datasets( name=identifier, owner_id=user.id, ) + + # Save dataset to database + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + await create_dataset(identifier, user, session) + result.append(new_dataset) return result diff --git a/cognee/modules/pipelines/methods/__init__.py b/cognee/modules/pipelines/methods/__init__.py index 55af20aea..c15b58207 100644 --- a/cognee/modules/pipelines/methods/__init__.py +++ b/cognee/modules/pipelines/methods/__init__.py @@ -1 +1,2 @@ from .get_pipeline_run import get_pipeline_run +from .get_pipeline_run_by_dataset import get_pipeline_run_by_dataset diff --git a/cognee/modules/pipelines/methods/get_pipeline_run_by_dataset.py b/cognee/modules/pipelines/methods/get_pipeline_run_by_dataset.py new file mode 100644 index 000000000..303d686e5 --- /dev/null +++ b/cognee/modules/pipelines/methods/get_pipeline_run_by_dataset.py @@ -0,0 +1,33 @@ +from uuid import UUID +from sqlalchemy import select, func +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models import PipelineRun +from sqlalchemy.orm import aliased + + +async def get_pipeline_run_by_dataset(dataset_id: UUID, pipeline_name: str): + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + query = ( + select( + PipelineRun, + func.row_number() + .over( + partition_by=PipelineRun.dataset_id, + order_by=PipelineRun.created_at.desc(), + ) + .label("rn"), + ) + .filter(PipelineRun.dataset_id == dataset_id) + .filter(PipelineRun.pipeline_name == pipeline_name) + .subquery() + ) + + aliased_pipeline_run = aliased(PipelineRun, query) + + latest_run = select(aliased_pipeline_run).filter(query.c.rn == 1) + + run = (await session.execute(latest_run)).scalars().first() + + return run diff --git a/cognee/modules/pipelines/models/PipelineRunInfo.py b/cognee/modules/pipelines/models/PipelineRunInfo.py index 218251314..d910f4fc8 100644 --- a/cognee/modules/pipelines/models/PipelineRunInfo.py +++ b/cognee/modules/pipelines/models/PipelineRunInfo.py @@ -6,6 +6,8 @@ from pydantic import BaseModel class PipelineRunInfo(BaseModel): status: str pipeline_run_id: UUID + dataset_id: UUID + dataset_name: str payload: Optional[Any] = None model_config = { diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 2effe19c4..3de424796 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -8,6 +8,8 @@ from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines.operations.run_tasks import run_tasks from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status +from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset + from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User @@ -20,6 +22,11 @@ from cognee.modules.data.methods import ( check_dataset_name, ) +from cognee.modules.pipelines.models.PipelineRunInfo import ( + PipelineRunCompleted, + PipelineRunStarted, +) + from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, ) @@ -151,9 +158,22 @@ async def run_pipeline( if str(dataset_id) in task_status: if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: logger.info("Dataset %s is already being processed.", dataset_id) + pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name) + yield PipelineRunStarted( + pipeline_run_id=pipeline_run.pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=data, + ) return - if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: + elif task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: logger.info("Dataset %s is already processed.", dataset_id) + pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name) + yield PipelineRunCompleted( + pipeline_run_id=pipeline_run.pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ) return if not isinstance(tasks, list): diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 009a99a95..93171a556 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -2,6 +2,7 @@ import json from typing import Any from uuid import UUID, uuid4 +from cognee.infrastructure.databases.relational import get_relational_engine from cognee.shared.logging_utils import get_logger from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines.utils import generate_pipeline_id @@ -77,7 +78,7 @@ async def run_tasks_with_telemetry( async def run_tasks( tasks: list[Task], - dataset_id: UUID = uuid4(), + dataset_id: UUID, data: Any = None, user: User = None, pipeline_name: str = "unknown_pipeline", @@ -86,7 +87,14 @@ async def run_tasks( if not user: user = get_default_user() - pipeline_id = generate_pipeline_id(user.id, pipeline_name) + # Get Dataset object + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + from cognee.modules.data.models import Dataset + + dataset = await session.get(Dataset, dataset_id) + + pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) @@ -94,6 +102,8 @@ async def run_tasks( yield PipelineRunStarted( pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, payload=data, ) @@ -107,6 +117,8 @@ async def run_tasks( ): yield PipelineRunYield( pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, payload=result, ) @@ -114,13 +126,22 @@ async def run_tasks( pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data ) - yield PipelineRunCompleted(pipeline_run_id=pipeline_run_id) + yield PipelineRunCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ) except Exception as error: await log_pipeline_run_error( pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data, error ) - yield PipelineRunErrored(pipeline_run_id=pipeline_run_id, payload=error) + yield PipelineRunErrored( + pipeline_run_id=pipeline_run_id, + payload=error, + dataset_id=dataset.id, + dataset_name=dataset.name, + ) raise error diff --git a/cognee/modules/pipelines/utils/generate_pipeline_id.py b/cognee/modules/pipelines/utils/generate_pipeline_id.py index 5edf62146..19f4fed6a 100644 --- a/cognee/modules/pipelines/utils/generate_pipeline_id.py +++ b/cognee/modules/pipelines/utils/generate_pipeline_id.py @@ -1,5 +1,5 @@ from uuid import NAMESPACE_OID, UUID, uuid5 -def generate_pipeline_id(user_id: UUID, pipeline_name: str): - return uuid5(NAMESPACE_OID, f"{str(user_id)}_{pipeline_name}") +def generate_pipeline_id(user_id: UUID, dataset_id: UUID, pipeline_name: str): + return uuid5(NAMESPACE_OID, f"{str(user_id)}{pipeline_name}{str(dataset_id)}")