feat: Add dataset_id to pipeline run info and status (#1009)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-06-30 11:53:17 +02:00 committed by GitHub
parent e44840c601
commit 14be2a5f5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 144 additions and 38 deletions

View file

@ -1,7 +1,6 @@
import os import os
import pathlib import pathlib
import asyncio import asyncio
from uuid import NAMESPACE_OID, uuid5
from cognee.shared.logging_utils import get_logger, setup_logging from cognee.shared.logging_utils import get_logger, setup_logging
from cognee.modules.observability.get_observe import get_observe from cognee.modules.observability.get_observe import get_observe
@ -12,8 +11,8 @@ from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.shared.data_models import KnowledgeGraph from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.data.methods import create_dataset
from cognee.tasks.documents import classify_documents, extract_chunks_from_documents from cognee.tasks.documents import classify_documents, extract_chunks_from_documents
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data from cognee.tasks.ingestion import ingest_data
from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies
@ -21,6 +20,7 @@ from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependen
from cognee.tasks.storage import add_data_points from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text from cognee.tasks.summarization import summarize_text
from cognee.infrastructure.llm import get_max_chunk_tokens from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.infrastructure.databases.relational import get_relational_engine
observe = get_observe() observe = get_observe()
@ -65,16 +65,21 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
), ),
] ]
dataset_id = await get_unique_dataset_id("codebase", user) dataset_name = "codebase"
# Save dataset to database
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user, session)
if include_docs: if include_docs:
non_code_pipeline_run = run_tasks( non_code_pipeline_run = run_tasks(
non_code_tasks, dataset_id, repo_path, user, "cognify_pipeline" non_code_tasks, dataset.id, repo_path, user, "cognify_pipeline"
) )
async for run_status in non_code_pipeline_run: async for run_status in non_code_pipeline_run:
yield run_status yield run_status
async for run_status in run_tasks(tasks, dataset_id, repo_path, user, "cognify_code_pipeline"): async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"):
yield run_status yield run_status

View file

@ -10,7 +10,7 @@ from cognee.modules.pipelines import cognee_pipeline
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunErrored
from cognee.modules.pipelines.queues.pipeline_run_info_queues import push_to_queue from cognee.modules.pipelines.queues.pipeline_run_info_queues import push_to_queue
from cognee.modules.users.models import User from cognee.modules.users.models import User
@ -66,7 +66,7 @@ async def run_cognify_blocking(
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
): ):
pipeline_run_info = None total_run_info = {}
async for run_info in cognee_pipeline( async for run_info in cognee_pipeline(
tasks=tasks, tasks=tasks,
@ -76,9 +76,12 @@ async def run_cognify_blocking(
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
): ):
pipeline_run_info = run_info if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info
else:
total_run_info = run_info
return pipeline_run_info return total_run_info
async def run_cognify_as_background_process( async def run_cognify_as_background_process(
@ -88,30 +91,43 @@ async def run_cognify_as_background_process(
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
): ):
pipeline_run = cognee_pipeline( # Store pipeline status for all pipelines
tasks=tasks, pipeline_run_started_info = []
user=user,
datasets=datasets,
pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config,
vector_db_config=vector_db_config,
)
pipeline_run_started_info = await anext(pipeline_run) async def handle_rest_of_the_run(pipeline_list):
# Execute all provided pipelines one by one to avoid database write conflicts
for pipeline in pipeline_list:
while True:
try:
pipeline_run_info = await anext(pipeline)
async def handle_rest_of_the_run(): push_to_queue(pipeline_run_info.pipeline_run_id, pipeline_run_info)
while True:
try:
pipeline_run_info = await anext(pipeline_run)
push_to_queue(pipeline_run_info.pipeline_run_id, pipeline_run_info) if isinstance(pipeline_run_info, PipelineRunCompleted) or isinstance(
pipeline_run_info, PipelineRunErrored
if isinstance(pipeline_run_info, PipelineRunCompleted): ):
break
except StopAsyncIteration:
break break
except StopAsyncIteration:
break
asyncio.create_task(handle_rest_of_the_run()) # Start all pipelines to get started status
pipeline_list = []
for dataset in datasets:
pipeline_run = cognee_pipeline(
tasks=tasks,
user=user,
datasets=dataset,
pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config,
vector_db_config=vector_db_config,
)
# Save dataset Pipeline run started info
pipeline_run_started_info.append(await anext(pipeline_run))
pipeline_list.append(pipeline_run)
# Send all started pipelines to execute one by one in background
asyncio.create_task(handle_rest_of_the_run(pipeline_list=pipeline_list))
return pipeline_run_started_info return pipeline_run_started_info

View file

@ -39,7 +39,7 @@ class CognifyPayloadDTO(InDTO):
def get_cognify_router() -> APIRouter: def get_cognify_router() -> APIRouter:
router = APIRouter() router = APIRouter()
@router.post("", response_model=None) @router.post("", response_model=dict)
async def cognify(payload: CognifyPayloadDTO, user: User = Depends(get_authenticated_user)): async def cognify(payload: CognifyPayloadDTO, user: User = Depends(get_authenticated_user)):
"""This endpoint is responsible for the cognitive processing of the content.""" """This endpoint is responsible for the cognitive processing of the content."""
if not payload.datasets and not payload.dataset_ids: if not payload.datasets and not payload.dataset_ids:
@ -56,7 +56,7 @@ def get_cognify_router() -> APIRouter:
datasets, user, payload.graph_model, run_in_background=payload.run_in_background datasets, user, payload.graph_model, run_in_background=payload.run_in_background
) )
return cognify_run.model_dump() return cognify_run
except Exception as error: except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)}) return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -1,7 +1,9 @@
from typing import List, Union from typing import List, Union
from uuid import UUID from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Dataset from cognee.modules.data.models import Dataset
from cognee.modules.data.methods import create_dataset
from cognee.modules.data.methods import get_unique_dataset_id from cognee.modules.data.methods import get_unique_dataset_id
from cognee.modules.data.exceptions import DatasetNotFoundError from cognee.modules.data.exceptions import DatasetNotFoundError
@ -12,7 +14,7 @@ async def load_or_create_datasets(
""" """
Given a list of dataset identifiers (names or UUIDs), return Dataset instances: Given a list of dataset identifiers (names or UUIDs), return Dataset instances:
- If an identifier matches an existing Dataset (by name or id), reuse it. - If an identifier matches an existing Dataset (by name or id), reuse it.
- Otherwise, create a new Dataset with a unique id. Note: Created dataset is not stored to database. - Otherwise, create a new Dataset with a unique id.
""" """
result: List[Dataset] = [] result: List[Dataset] = []
@ -37,6 +39,12 @@ async def load_or_create_datasets(
name=identifier, name=identifier,
owner_id=user.id, owner_id=user.id,
) )
# Save dataset to database
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
await create_dataset(identifier, user, session)
result.append(new_dataset) result.append(new_dataset)
return result return result

View file

@ -1 +1,2 @@
from .get_pipeline_run import get_pipeline_run from .get_pipeline_run import get_pipeline_run
from .get_pipeline_run_by_dataset import get_pipeline_run_by_dataset

View file

@ -0,0 +1,33 @@
from uuid import UUID
from sqlalchemy import select, func
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import PipelineRun
from sqlalchemy.orm import aliased
async def get_pipeline_run_by_dataset(dataset_id: UUID, pipeline_name: str):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
query = (
select(
PipelineRun,
func.row_number()
.over(
partition_by=PipelineRun.dataset_id,
order_by=PipelineRun.created_at.desc(),
)
.label("rn"),
)
.filter(PipelineRun.dataset_id == dataset_id)
.filter(PipelineRun.pipeline_name == pipeline_name)
.subquery()
)
aliased_pipeline_run = aliased(PipelineRun, query)
latest_run = select(aliased_pipeline_run).filter(query.c.rn == 1)
run = (await session.execute(latest_run)).scalars().first()
return run

View file

@ -6,6 +6,8 @@ from pydantic import BaseModel
class PipelineRunInfo(BaseModel): class PipelineRunInfo(BaseModel):
status: str status: str
pipeline_run_id: UUID pipeline_run_id: UUID
dataset_id: UUID
dataset_name: str
payload: Optional[Any] = None payload: Optional[Any] = None
model_config = { model_config = {

View file

@ -8,6 +8,8 @@ from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines.operations.run_tasks import run_tasks from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.methods import get_pipeline_run_by_dataset
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User from cognee.modules.users.models import User
@ -20,6 +22,11 @@ from cognee.modules.data.methods import (
check_dataset_name, check_dataset_name,
) )
from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunStarted,
)
from cognee.infrastructure.databases.relational import ( from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables, create_db_and_tables as create_relational_db_and_tables,
) )
@ -151,9 +158,22 @@ async def run_pipeline(
if str(dataset_id) in task_status: if str(dataset_id) in task_status:
if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED: if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED:
logger.info("Dataset %s is already being processed.", dataset_id) logger.info("Dataset %s is already being processed.", dataset_id)
pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name)
yield PipelineRunStarted(
pipeline_run_id=pipeline_run.pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
return return
if task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED: elif task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_COMPLETED:
logger.info("Dataset %s is already processed.", dataset_id) logger.info("Dataset %s is already processed.", dataset_id)
pipeline_run = await get_pipeline_run_by_dataset(dataset_id, pipeline_name)
yield PipelineRunCompleted(
pipeline_run_id=pipeline_run.pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
return return
if not isinstance(tasks, list): if not isinstance(tasks, list):

View file

@ -2,6 +2,7 @@ import json
from typing import Any from typing import Any
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.utils import generate_pipeline_id from cognee.modules.pipelines.utils import generate_pipeline_id
@ -77,7 +78,7 @@ async def run_tasks_with_telemetry(
async def run_tasks( async def run_tasks(
tasks: list[Task], tasks: list[Task],
dataset_id: UUID = uuid4(), dataset_id: UUID,
data: Any = None, data: Any = None,
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
@ -86,7 +87,14 @@ async def run_tasks(
if not user: if not user:
user = get_default_user() user = get_default_user()
pipeline_id = generate_pipeline_id(user.id, pipeline_name) # Get Dataset object
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
from cognee.modules.data.models import Dataset
dataset = await session.get(Dataset, dataset_id)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data)
@ -94,6 +102,8 @@ async def run_tasks(
yield PipelineRunStarted( yield PipelineRunStarted(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data, payload=data,
) )
@ -107,6 +117,8 @@ async def run_tasks(
): ):
yield PipelineRunYield( yield PipelineRunYield(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=result, payload=result,
) )
@ -114,13 +126,22 @@ async def run_tasks(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
) )
yield PipelineRunCompleted(pipeline_run_id=pipeline_run_id) yield PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
except Exception as error: except Exception as error:
await log_pipeline_run_error( await log_pipeline_run_error(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data, error pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data, error
) )
yield PipelineRunErrored(pipeline_run_id=pipeline_run_id, payload=error) yield PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=error,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
raise error raise error

View file

@ -1,5 +1,5 @@
from uuid import NAMESPACE_OID, UUID, uuid5 from uuid import NAMESPACE_OID, UUID, uuid5
def generate_pipeline_id(user_id: UUID, pipeline_name: str): def generate_pipeline_id(user_id: UUID, dataset_id: UUID, pipeline_name: str):
return uuid5(NAMESPACE_OID, f"{str(user_id)}_{pipeline_name}") return uuid5(NAMESPACE_OID, f"{str(user_id)}{pipeline_name}{str(dataset_id)}")