Feat: log pipeline status and pass it through pipeline [COG-1214] (#501)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced pipeline execution now provides consolidated status feedback
with improved telemetry for start, completion, and error events.
- Automatic generation of unique dataset identifiers offers clearer task
and pipeline run associations.

- **Refactor**
- Task execution has been streamlined with explicit parameter handling
for more structured pipeline processing.
- Interactive examples and demos now return results directly, making
integration and monitoring more accessible.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Boris Arzentar <borisarzentar@gmail.com>
This commit is contained in:
alekszievr 2025-02-11 16:41:40 +01:00 committed by GitHub
parent 6a0c0e3ef8
commit 05ba29af01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 195 additions and 94 deletions

View file

@ -10,7 +10,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.9.0
rev: v0.9.5
hooks:
# Run the linter.
- id: ruff

View file

@ -9,6 +9,7 @@ from cognee.infrastructure.databases.relational import (
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
from uuid import uuid5, NAMESPACE_OID
async def add(
@ -37,7 +38,10 @@ async def add(
tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)]
pipeline = run_tasks(tasks, data, "add_pipeline")
dataset_id = uuid5(NAMESPACE_OID, dataset_name)
pipeline = run_tasks(
tasks=tasks, dataset_id=dataset_id, data=data, pipeline_name="add_pipeline"
)
async for result in pipeline:
print(result)

View file

@ -69,9 +69,19 @@ async def run_code_graph_pipeline(repo_path, include_docs=True):
),
]
pipeline_run_status = None
if include_docs:
async for result in run_tasks(non_code_tasks, repo_path):
yield result
non_code_pipeline_run = run_tasks(non_code_tasks, None, repo_path, "cognify_pipeline")
async for run_status in non_code_pipeline_run:
pipeline_run_status = run_status
async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
yield result
from cognee.modules.data.methods import get_datasets
existing_datasets = await get_datasets(user.id)
code_pipeline_run = run_tasks(
tasks, existing_datasets[0].id, repo_path, "cognify_code_pipeline"
)
async for run_status in code_pipeline_run:
pipeline_run_status = run_status
return pipeline_run_status

View file

@ -12,7 +12,6 @@ from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
@ -71,8 +70,6 @@ async def cognify(
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)
document_ids_str = [str(document.id) for document in data_documents]
dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)
@ -82,21 +79,12 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
task_status = await get_pipeline_status([dataset_id])
if (
dataset_id in task_status
and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_name)
return
await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_STARTED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
try:
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
@ -105,32 +93,17 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")
pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline")
pipeline_run_status = None
async for result in pipeline:
print(result)
async for run_status in pipeline_run:
pipeline_run_status = run_status
send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)
return pipeline_run_status
await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
except Exception as error:
send_telemetry("cognee.cognify EXECUTION ERRORED", user.id)
await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_ERRORED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
raise error

View file

@ -1,7 +1,7 @@
import enum
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, JSON, Enum, UUID
from sqlalchemy import Column, DateTime, JSON, Enum, UUID, String
from cognee.infrastructure.databases.relational import Base
@ -19,6 +19,7 @@ class PipelineRun(Base):
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
status = Column(Enum(PipelineRunStatus))
run_id = Column(UUID, index=True)
pipeline_run_id = Column(UUID, index=True)
pipeline_id = Column(UUID, index=True)
dataset_id = Column(UUID, index=True)
run_info = Column(JSON)

View file

@ -0,0 +1,3 @@
from .log_pipeline_run_start import log_pipeline_run_start
from .log_pipeline_run_complete import log_pipeline_run_complete
from .log_pipeline_run_error import log_pipeline_run_error

View file

@ -1,11 +1,11 @@
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.orm import aliased
from sqlalchemy import select, func
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import PipelineRun
from sqlalchemy.orm import aliased
async def get_pipeline_status(pipeline_ids: list[UUID]):
async def get_pipeline_status(dataset_ids: list[UUID]):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
@ -14,12 +14,12 @@ async def get_pipeline_status(pipeline_ids: list[UUID]):
PipelineRun,
func.row_number()
.over(
partition_by=PipelineRun.run_id,
partition_by=PipelineRun.dataset_id,
order_by=PipelineRun.created_at.desc(),
)
.label("rn"),
)
.filter(PipelineRun.run_id.in_(pipeline_ids))
.filter(PipelineRun.dataset_id.in_(dataset_ids))
.subquery()
)
@ -29,16 +29,6 @@ async def get_pipeline_status(pipeline_ids: list[UUID]):
runs = (await session.execute(latest_runs)).scalars().all()
pipeline_statuses = {str(run.run_id): run.status for run in runs}
pipeline_statuses = {str(run.dataset_id): run.status for run in runs}
return pipeline_statuses
# f"""SELECT data_id, status
# FROM (
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
# FROM cognee.cognee.task_runs
# WHERE data_id IN ({formatted_data_ids})
# ) t
# WHERE rn = 1;"""
# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

View file

@ -0,0 +1,34 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any
async def log_pipeline_run_complete(
pipeline_run_id: UUID, pipeline_id: str, dataset_id: UUID, data: Any
):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = str(data)
pipeline_run = PipelineRun(
pipeline_run_id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
dataset_id=dataset_id,
run_info={
"data": data_info,
},
)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()
return pipeline_run

View file

@ -0,0 +1,35 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any
async def log_pipeline_run_error(
pipeline_run_id: UUID, pipeline_id: str, dataset_id: UUID, data: Any, e: Exception
):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = str(data)
pipeline_run = PipelineRun(
pipeline_run_id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_ERRORED,
dataset_id=dataset_id,
run_info={
"data": data_info,
"error": str(e),
},
)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()
return pipeline_run

View file

@ -0,0 +1,34 @@
from uuid import UUID, uuid4
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus
from typing import Any
async def log_pipeline_run_start(pipeline_id: str, dataset_id: UUID, data: Any):
if not data:
data_info = "None"
elif isinstance(data, list) and all(isinstance(item, Data) for item in data):
data_info = [str(item.id) for item in data]
else:
data_info = str(data)
pipeline_run_id = uuid4()
pipeline_run = PipelineRun(
pipeline_run_id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_STARTED,
dataset_id=dataset_id,
run_info={
"data": data_info,
},
)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(pipeline_run)
await session.commit()
return pipeline_run

View file

@ -1,18 +0,0 @@
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.PipelineRun import PipelineRun
async def log_pipeline_status(run_id: UUID, status: str, run_info: dict):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(
PipelineRun(
run_id=run_id,
status=status,
run_info=run_info,
)
)
await session.commit()

View file

@ -1,11 +1,19 @@
import inspect
import json
import logging
from uuid import UUID
from typing import Any
from cognee.modules.pipelines.operations import (
log_pipeline_run_start,
log_pipeline_run_complete,
log_pipeline_run_error,
)
from cognee.modules.settings import get_current_settings
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.utils import send_telemetry
from uuid import uuid5, NAMESPACE_OID
from ..tasks.Task import Task
@ -261,6 +269,20 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
raise error
async def run_tasks(tasks: list[Task], data=None, pipeline_name: str = "default_pipeline"):
async for result in run_tasks_with_telemetry(tasks, data, pipeline_name):
yield result
async def run_tasks(tasks: list[Task], dataset_id: UUID, data: Any, pipeline_name: str):
pipeline_id = uuid5(NAMESPACE_OID, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, dataset_id, data)
yield pipeline_run
pipeline_run_id = pipeline_run.pipeline_run_id
try:
async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
pass
yield await log_pipeline_run_complete(pipeline_run_id, pipeline_id, dataset_id, data)
except Exception as e:
yield await log_pipeline_run_error(pipeline_run_id, pipeline_id, dataset_id, data, e)
raise e

View file

@ -1,8 +1,9 @@
import asyncio
from queue import Queue
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
async def pipeline(data_queue):
@ -19,13 +20,15 @@ async def pipeline(data_queue):
async def multiply_by_two(num):
yield num * 2
tasks_run = run_tasks(
user = await get_default_user()
tasks_run = run_tasks_base(
[
Task(queue_consumer),
Task(add_one),
Task(multiply_by_two),
],
pipeline_name="test_run_tasks_from_queue",
data=None,
user=user,
)
results = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
@ -50,3 +53,7 @@ async def run_queue():
def test_run_tasks_from_queue():
asyncio.run(run_queue())
if __name__ == "__main__":
asyncio.run(run_queue())

View file

@ -1,7 +1,8 @@
import asyncio
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
async def run_and_check_tasks():
@ -19,15 +20,16 @@ async def run_and_check_tasks():
async def add_one_single(num):
yield num + 1
pipeline = run_tasks(
user = await get_default_user()
pipeline = run_tasks_base(
[
Task(number_generator),
Task(add_one, task_config={"batch_size": 5}),
Task(multiply_by_two, task_config={"batch_size": 1}),
Task(add_one_single),
],
10,
pipeline_name="test_run_tasks",
data=10,
user=user,
)
results = [5, 7, 9, 11, 13, 15, 17, 19, 21, 23]

View file

@ -7,8 +7,7 @@ from cognee.shared.utils import setup_logging
async def main(repo_path, include_docs):
async for result in run_code_graph_pipeline(repo_path, include_docs):
print(result)
return await run_code_graph_pipeline(repo_path, include_docs)
def parse_args():

View file

@ -93,11 +93,12 @@
"outputs": [],
"source": [
"from cognee.modules.pipelines import run_tasks\n",
"from uuid import uuid5, NAMESPACE_OID\n",
"\n",
"notebook_path = os.path.abspath(\"\")\n",
"repo_clone_location = os.path.join(notebook_path, \".data/graphrag\")\n",
"\n",
"pipeline = run_tasks(tasks, repo_clone_location, \"code_graph_pipeline\")\n",
"pipeline = run_tasks(tasks, uuid5(NAMESPACE_OID, repo_clone_location), repo_clone_location, \"code_graph_pipeline\")\n",
"\n",
"async for result in pipeline:\n",
" print(result)"
@ -117,7 +118,9 @@
{
"cell_type": "markdown",
"metadata": {},
"source": "# Let's check the evaluations"
"source": [
"# Let's check the evaluations"
]
},
{
"cell_type": "code",

View file

@ -674,10 +674,12 @@
" Task(add_data_points, task_config = { \"batch_size\": 10 }),\n",
" ]\n",
"\n",
" pipeline = run_tasks(tasks, data_documents)\n",
" pipeline_run = run_tasks(tasks, dataset.id, data_documents, \"cognify_pipeline\")\n",
" pipeline_run_status = None\n",
"\n",
" async for run_status in pipeline_run:\n",
" pipeline_run_status = run_status\n",
"\n",
" async for result in pipeline:\n",
" print(result)\n",
" except Exception as error:\n",
" raise error\n"
]