diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 96bfe6d32..54d0a7d94 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: check-added-large-files - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.9.0 + rev: v0.9.5 hooks: # Run the linter. - id: ruff diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index bd633118d..468fe5147 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -9,6 +9,7 @@ from cognee.infrastructure.databases.relational import ( from cognee.infrastructure.databases.vector.pgvector import ( create_db_and_tables as create_pgvector_db_and_tables, ) +from uuid import uuid5, NAMESPACE_OID async def add( @@ -37,7 +38,10 @@ async def add( tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)] - pipeline = run_tasks(tasks, data, "add_pipeline") + dataset_id = uuid5(NAMESPACE_OID, dataset_name) + pipeline = run_tasks( + tasks=tasks, dataset_id=dataset_id, data=data, pipeline_name="add_pipeline" + ) async for result in pipeline: print(result) diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index c73e90c19..9a4b05a59 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -69,9 +69,19 @@ async def run_code_graph_pipeline(repo_path, include_docs=True): ), ] + pipeline_run_status = None if include_docs: - async for result in run_tasks(non_code_tasks, repo_path): - yield result + non_code_pipeline_run = run_tasks(non_code_tasks, None, repo_path, "cognify_pipeline") + async for run_status in non_code_pipeline_run: + pipeline_run_status = run_status - async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"): - yield result + from cognee.modules.data.methods import get_datasets + + existing_datasets = await get_datasets(user.id) + code_pipeline_run = run_tasks( + tasks, existing_datasets[0].id, repo_path, "cognify_code_pipeline" + ) + async for run_status in code_pipeline_run: + pipeline_run_status = run_status + + return pipeline_run_status diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index dd4ab2229..2faa9903e 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -12,7 +12,6 @@ from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines import run_tasks from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status -from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status from cognee.modules.pipelines.tasks.Task import Task from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User @@ -71,8 +70,6 @@ async def cognify( async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id) - document_ids_str = [str(document.id) for document in data_documents] - dataset_id = dataset.id dataset_name = generate_dataset_name(dataset.name) @@ -82,21 +79,12 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): task_status = await get_pipeline_status([dataset_id]) if ( - dataset_id in task_status - and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED + str(dataset_id) in task_status + and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED ): logger.info("Dataset %s is already being processed.", dataset_name) return - await log_pipeline_status( - dataset_id, - PipelineRunStatus.DATASET_PROCESSING_STARTED, - { - "dataset_name": dataset_name, - "files": document_ids_str, - }, - ) - try: if not isinstance(tasks, list): raise ValueError("Tasks must be a list") @@ -105,32 +93,17 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): if not isinstance(task, Task): raise ValueError(f"Task {task} is not an instance of Task") - pipeline = run_tasks(tasks, data_documents, "cognify_pipeline") + pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline") + pipeline_run_status = None - async for result in pipeline: - print(result) + async for run_status in pipeline_run: + pipeline_run_status = run_status send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id) + return pipeline_run_status - await log_pipeline_status( - dataset_id, - PipelineRunStatus.DATASET_PROCESSING_COMPLETED, - { - "dataset_name": dataset_name, - "files": document_ids_str, - }, - ) except Exception as error: send_telemetry("cognee.cognify EXECUTION ERRORED", user.id) - - await log_pipeline_status( - dataset_id, - PipelineRunStatus.DATASET_PROCESSING_ERRORED, - { - "dataset_name": dataset_name, - "files": document_ids_str, - }, - ) raise error diff --git a/cognee/modules/pipelines/models/PipelineRun.py b/cognee/modules/pipelines/models/PipelineRun.py index c778066fe..4e7d9a22d 100644 --- a/cognee/modules/pipelines/models/PipelineRun.py +++ b/cognee/modules/pipelines/models/PipelineRun.py @@ -1,7 +1,7 @@ import enum from uuid import uuid4 from datetime import datetime, timezone -from sqlalchemy import Column, DateTime, JSON, Enum, UUID +from sqlalchemy import Column, DateTime, JSON, Enum, UUID, String from cognee.infrastructure.databases.relational import Base @@ -19,6 +19,7 @@ class PipelineRun(Base): created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) status = Column(Enum(PipelineRunStatus)) - - run_id = Column(UUID, index=True) + pipeline_run_id = Column(UUID, index=True) + pipeline_id = Column(UUID, index=True) + dataset_id = Column(UUID, index=True) run_info = Column(JSON) diff --git a/cognee/modules/pipelines/operations/__init__.py b/cognee/modules/pipelines/operations/__init__.py index e69de29bb..5522976ba 100644 --- a/cognee/modules/pipelines/operations/__init__.py +++ b/cognee/modules/pipelines/operations/__init__.py @@ -0,0 +1,3 @@ +from .log_pipeline_run_start import log_pipeline_run_start +from .log_pipeline_run_complete import log_pipeline_run_complete +from .log_pipeline_run_error import log_pipeline_run_error diff --git a/cognee/modules/pipelines/operations/get_pipeline_status.py b/cognee/modules/pipelines/operations/get_pipeline_status.py index 4f49cd544..29318382f 100644 --- a/cognee/modules/pipelines/operations/get_pipeline_status.py +++ b/cognee/modules/pipelines/operations/get_pipeline_status.py @@ -1,11 +1,11 @@ from uuid import UUID -from sqlalchemy import func, select -from sqlalchemy.orm import aliased +from sqlalchemy import select, func from cognee.infrastructure.databases.relational import get_relational_engine from ..models import PipelineRun +from sqlalchemy.orm import aliased -async def get_pipeline_status(pipeline_ids: list[UUID]): +async def get_pipeline_status(dataset_ids: list[UUID]): db_engine = get_relational_engine() async with db_engine.get_async_session() as session: @@ -14,12 +14,12 @@ async def get_pipeline_status(pipeline_ids: list[UUID]): PipelineRun, func.row_number() .over( - partition_by=PipelineRun.run_id, + partition_by=PipelineRun.dataset_id, order_by=PipelineRun.created_at.desc(), ) .label("rn"), ) - .filter(PipelineRun.run_id.in_(pipeline_ids)) + .filter(PipelineRun.dataset_id.in_(dataset_ids)) .subquery() ) @@ -29,16 +29,6 @@ async def get_pipeline_status(pipeline_ids: list[UUID]): runs = (await session.execute(latest_runs)).scalars().all() - pipeline_statuses = {str(run.run_id): run.status for run in runs} + pipeline_statuses = {str(run.dataset_id): run.status for run in runs} return pipeline_statuses - - # f"""SELECT data_id, status - # FROM ( - # SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn - # FROM cognee.cognee.task_runs - # WHERE data_id IN ({formatted_data_ids}) - # ) t - # WHERE rn = 1;""" - - # return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses } diff --git a/cognee/modules/pipelines/operations/log_pipeline_run_complete.py b/cognee/modules/pipelines/operations/log_pipeline_run_complete.py new file mode 100644 index 000000000..98ec875b1 --- /dev/null +++ b/cognee/modules/pipelines/operations/log_pipeline_run_complete.py @@ -0,0 +1,34 @@ +from uuid import UUID, uuid4 +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus +from typing import Any + + +async def log_pipeline_run_complete( + pipeline_run_id: UUID, pipeline_id: str, dataset_id: UUID, data: Any +): + if not data: + data_info = "None" + elif isinstance(data, list) and all(isinstance(item, Data) for item in data): + data_info = [str(item.id) for item in data] + else: + data_info = str(data) + + pipeline_run = PipelineRun( + pipeline_run_id=pipeline_run_id, + pipeline_id=pipeline_id, + status=PipelineRunStatus.DATASET_PROCESSING_COMPLETED, + dataset_id=dataset_id, + run_info={ + "data": data_info, + }, + ) + + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + session.add(pipeline_run) + await session.commit() + + return pipeline_run diff --git a/cognee/modules/pipelines/operations/log_pipeline_run_error.py b/cognee/modules/pipelines/operations/log_pipeline_run_error.py new file mode 100644 index 000000000..763af9ba4 --- /dev/null +++ b/cognee/modules/pipelines/operations/log_pipeline_run_error.py @@ -0,0 +1,35 @@ +from uuid import UUID, uuid4 +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus +from typing import Any + + +async def log_pipeline_run_error( + pipeline_run_id: UUID, pipeline_id: str, dataset_id: UUID, data: Any, e: Exception +): + if not data: + data_info = "None" + elif isinstance(data, list) and all(isinstance(item, Data) for item in data): + data_info = [str(item.id) for item in data] + else: + data_info = str(data) + + pipeline_run = PipelineRun( + pipeline_run_id=pipeline_run_id, + pipeline_id=pipeline_id, + status=PipelineRunStatus.DATASET_PROCESSING_ERRORED, + dataset_id=dataset_id, + run_info={ + "data": data_info, + "error": str(e), + }, + ) + + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + session.add(pipeline_run) + await session.commit() + + return pipeline_run diff --git a/cognee/modules/pipelines/operations/log_pipeline_run_start.py b/cognee/modules/pipelines/operations/log_pipeline_run_start.py new file mode 100644 index 000000000..00dd84b5e --- /dev/null +++ b/cognee/modules/pipelines/operations/log_pipeline_run_start.py @@ -0,0 +1,34 @@ +from uuid import UUID, uuid4 +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus +from typing import Any + + +async def log_pipeline_run_start(pipeline_id: str, dataset_id: UUID, data: Any): + if not data: + data_info = "None" + elif isinstance(data, list) and all(isinstance(item, Data) for item in data): + data_info = [str(item.id) for item in data] + else: + data_info = str(data) + + pipeline_run_id = uuid4() + + pipeline_run = PipelineRun( + pipeline_run_id=pipeline_run_id, + pipeline_id=pipeline_id, + status=PipelineRunStatus.DATASET_PROCESSING_STARTED, + dataset_id=dataset_id, + run_info={ + "data": data_info, + }, + ) + + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + session.add(pipeline_run) + await session.commit() + + return pipeline_run diff --git a/cognee/modules/pipelines/operations/log_pipeline_status.py b/cognee/modules/pipelines/operations/log_pipeline_status.py deleted file mode 100644 index c0f08cd2a..000000000 --- a/cognee/modules/pipelines/operations/log_pipeline_status.py +++ /dev/null @@ -1,18 +0,0 @@ -from uuid import UUID -from cognee.infrastructure.databases.relational import get_relational_engine -from ..models.PipelineRun import PipelineRun - - -async def log_pipeline_status(run_id: UUID, status: str, run_info: dict): - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - session.add( - PipelineRun( - run_id=run_id, - status=status, - run_info=run_info, - ) - ) - - await session.commit() diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index d5d7ef7a4..c0f22d4f4 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -1,11 +1,19 @@ import inspect import json import logging +from uuid import UUID +from typing import Any +from cognee.modules.pipelines.operations import ( + log_pipeline_run_start, + log_pipeline_run_complete, + log_pipeline_run_error, +) from cognee.modules.settings import get_current_settings from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.shared.utils import send_telemetry +from uuid import uuid5, NAMESPACE_OID from ..tasks.Task import Task @@ -261,6 +269,20 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str): raise error -async def run_tasks(tasks: list[Task], data=None, pipeline_name: str = "default_pipeline"): - async for result in run_tasks_with_telemetry(tasks, data, pipeline_name): - yield result +async def run_tasks(tasks: list[Task], dataset_id: UUID, data: Any, pipeline_name: str): + pipeline_id = uuid5(NAMESPACE_OID, pipeline_name) + + pipeline_run = await log_pipeline_run_start(pipeline_id, dataset_id, data) + + yield pipeline_run + pipeline_run_id = pipeline_run.pipeline_run_id + + try: + async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id): + pass + + yield await log_pipeline_run_complete(pipeline_run_id, pipeline_id, dataset_id, data) + + except Exception as e: + yield await log_pipeline_run_error(pipeline_run_id, pipeline_id, dataset_id, data, e) + raise e diff --git a/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py b/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py index 6a4024342..ddfd1b694 100644 --- a/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py +++ b/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py @@ -1,8 +1,9 @@ import asyncio from queue import Queue -from cognee.modules.pipelines.operations.run_tasks import run_tasks +from cognee.modules.pipelines.operations.run_tasks import run_tasks_base from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.users.methods import get_default_user async def pipeline(data_queue): @@ -19,13 +20,15 @@ async def pipeline(data_queue): async def multiply_by_two(num): yield num * 2 - tasks_run = run_tasks( + user = await get_default_user() + tasks_run = run_tasks_base( [ Task(queue_consumer), Task(add_one), Task(multiply_by_two), ], - pipeline_name="test_run_tasks_from_queue", + data=None, + user=user, ) results = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] @@ -50,3 +53,7 @@ async def run_queue(): def test_run_tasks_from_queue(): asyncio.run(run_queue()) + + +if __name__ == "__main__": + asyncio.run(run_queue()) diff --git a/cognee/tests/integration/run_toy_tasks/run_tasks_test.py b/cognee/tests/integration/run_toy_tasks/run_tasks_test.py index 54613c214..f831175cc 100644 --- a/cognee/tests/integration/run_toy_tasks/run_tasks_test.py +++ b/cognee/tests/integration/run_toy_tasks/run_tasks_test.py @@ -1,7 +1,8 @@ import asyncio -from cognee.modules.pipelines.operations.run_tasks import run_tasks +from cognee.modules.pipelines.operations.run_tasks import run_tasks_base from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.users.methods import get_default_user async def run_and_check_tasks(): @@ -19,15 +20,16 @@ async def run_and_check_tasks(): async def add_one_single(num): yield num + 1 - pipeline = run_tasks( + user = await get_default_user() + pipeline = run_tasks_base( [ Task(number_generator), Task(add_one, task_config={"batch_size": 5}), Task(multiply_by_two, task_config={"batch_size": 1}), Task(add_one_single), ], - 10, - pipeline_name="test_run_tasks", + data=10, + user=user, ) results = [5, 7, 9, 11, 13, 15, 17, 19, 21, 23] diff --git a/examples/python/code_graph_example.py b/examples/python/code_graph_example.py index 59229344f..25dc5cfd5 100644 --- a/examples/python/code_graph_example.py +++ b/examples/python/code_graph_example.py @@ -7,8 +7,7 @@ from cognee.shared.utils import setup_logging async def main(repo_path, include_docs): - async for result in run_code_graph_pipeline(repo_path, include_docs): - print(result) + return await run_code_graph_pipeline(repo_path, include_docs) def parse_args(): diff --git a/notebooks/cognee_code_graph_demo.ipynb b/notebooks/cognee_code_graph_demo.ipynb index d5bde67ef..b8abb710f 100644 --- a/notebooks/cognee_code_graph_demo.ipynb +++ b/notebooks/cognee_code_graph_demo.ipynb @@ -93,11 +93,12 @@ "outputs": [], "source": [ "from cognee.modules.pipelines import run_tasks\n", + "from uuid import uuid5, NAMESPACE_OID\n", "\n", "notebook_path = os.path.abspath(\"\")\n", "repo_clone_location = os.path.join(notebook_path, \".data/graphrag\")\n", "\n", - "pipeline = run_tasks(tasks, repo_clone_location, \"code_graph_pipeline\")\n", + "pipeline = run_tasks(tasks, uuid5(NAMESPACE_OID, repo_clone_location), repo_clone_location, \"code_graph_pipeline\")\n", "\n", "async for result in pipeline:\n", " print(result)" @@ -117,7 +118,9 @@ { "cell_type": "markdown", "metadata": {}, - "source": "# Let's check the evaluations" + "source": [ + "# Let's check the evaluations" + ] }, { "cell_type": "code", diff --git a/notebooks/cognee_demo.ipynb b/notebooks/cognee_demo.ipynb index 607caed97..a0c8961f8 100644 --- a/notebooks/cognee_demo.ipynb +++ b/notebooks/cognee_demo.ipynb @@ -674,10 +674,12 @@ " Task(add_data_points, task_config = { \"batch_size\": 10 }),\n", " ]\n", "\n", - " pipeline = run_tasks(tasks, data_documents)\n", + " pipeline_run = run_tasks(tasks, dataset.id, data_documents, \"cognify_pipeline\")\n", + " pipeline_run_status = None\n", + "\n", + " async for run_status in pipeline_run:\n", + " pipeline_run_status = run_status\n", "\n", - " async for result in pipeline:\n", - " print(result)\n", " except Exception as error:\n", " raise error\n" ]