diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index b6215b81b..f64cf292c 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -9,7 +9,7 @@ from cognee.api.v1.visualize.visualize import visualize_graph from cognee.base_config import get_base_config from cognee.modules.cognify.config import get_cognify_config from cognee.modules.pipelines import run_tasks -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.shared.data_models import KnowledgeGraph, MonitoringTool from cognee.shared.utils import render_graph diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index a5912fecd..8b38e5304 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -13,7 +13,7 @@ from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines import run_tasks from cognee.modules.pipelines.models import PipelineRunStatus from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.shared.data_models import KnowledgeGraph diff --git a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py index a2285e386..a5a7d4164 100644 --- a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py +++ b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py @@ -4,7 +4,7 @@ from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter from cognee.modules.chunking.TextChunker import TextChunker -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task logger = get_logger(level=ERROR) diff --git a/cognee/eval_framework/corpus_builder/task_getters/TaskGetters.py b/cognee/eval_framework/corpus_builder/task_getters/TaskGetters.py index a25a3c1c1..bb5c1acf4 100644 --- a/cognee/eval_framework/corpus_builder/task_getters/TaskGetters.py +++ b/cognee/eval_framework/corpus_builder/task_getters/TaskGetters.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Callable, Awaitable, List from cognee.api.v1.cognify.cognify import get_default_tasks -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.eval_framework.corpus_builder.task_getters.get_cascade_graph_tasks import ( get_cascade_graph_tasks, ) diff --git a/cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py b/cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py index ca77facef..c35f51afa 100644 --- a/cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py +++ b/cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py @@ -2,7 +2,7 @@ from typing import List from pydantic import BaseModel from cognee.modules.cognify.config import get_cognify_config -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.shared.data_models import KnowledgeGraph diff --git a/cognee/eval_framework/corpus_builder/task_getters/get_default_tasks_by_indices.py b/cognee/eval_framework/corpus_builder/task_getters/get_default_tasks_by_indices.py index e589f3f67..be532232f 100644 --- a/cognee/eval_framework/corpus_builder/task_getters/get_default_tasks_by_indices.py +++ b/cognee/eval_framework/corpus_builder/task_getters/get_default_tasks_by_indices.py @@ -1,6 +1,6 @@ from typing import List from cognee.api.v1.cognify.cognify import get_default_tasks -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.chunking.TextChunker import TextChunker from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points @@ -39,9 +39,10 @@ async def get_no_summary_tasks( extract_graph_from_data, graph_model=graph_model, ontology_adapter=ontology_adapter, + task_config={"batch_size": 10}, ) - add_data_points_task = Task(add_data_points) + add_data_points_task = Task(add_data_points, task_config={"batch_size": 10}) return base_tasks + [graph_task, add_data_points_task] @@ -53,6 +54,6 @@ async def get_just_chunks_tasks( # Get base tasks (0=classify, 1=check_permissions, 2=extract_chunks) base_tasks = await get_default_tasks_by_indices([0, 1, 2], chunk_size, chunker) - add_data_points_task = Task(add_data_points) + add_data_points_task = Task(add_data_points, task_config={"batch_size": 10}) return base_tasks + [add_data_points_task] diff --git a/cognee/modules/pipelines/__init__.py b/cognee/modules/pipelines/__init__.py index 5005c25f0..52a0942a6 100644 --- a/cognee/modules/pipelines/__init__.py +++ b/cognee/modules/pipelines/__init__.py @@ -1,3 +1,3 @@ -from .tasks.Task import Task +from .tasks.task import Task from .operations.run_tasks import run_tasks from .operations.run_parallel import run_tasks_parallel diff --git a/cognee/modules/pipelines/operations/run_parallel.py b/cognee/modules/pipelines/operations/run_parallel.py index d1774299b..63ac2f763 100644 --- a/cognee/modules/pipelines/operations/run_parallel.py +++ b/cognee/modules/pipelines/operations/run_parallel.py @@ -1,6 +1,6 @@ from typing import Any, Callable, Generator, List import asyncio -from ..tasks.Task import Task +from ..tasks.task import Task def run_tasks_parallel(tasks: List[Task]) -> Callable[[Any], Generator[Any, Any, Any]]: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 242f74320..e87259753 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -1,4 +1,3 @@ -import inspect import json from cognee.shared.logging_utils import get_logger from uuid import UUID, uuid4 @@ -15,212 +14,12 @@ from cognee.modules.users.models import User from cognee.shared.utils import send_telemetry from uuid import uuid5, NAMESPACE_OID -from ..tasks.Task import Task +from .run_tasks_base import run_tasks_base +from ..tasks.task import Task logger = get_logger("run_tasks(tasks: [Task], data)") -async def run_tasks_base(tasks: list[Task], data=None, user: User = None): - if len(tasks) == 0: - yield data - return - - args = [data] if data is not None else [] - - running_task = tasks[0] - leftover_tasks = tasks[1:] - next_task = leftover_tasks[0] if len(leftover_tasks) > 0 else None - next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1 - - if inspect.isasyncgenfunction(running_task.executable): - logger.info("Async generator task started: `%s`", running_task.executable.__name__) - send_telemetry( - "Async Generator Task Started", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - try: - results = [] - - async_iterator = running_task.run(*args) - - async for partial_result in async_iterator: - results.append(partial_result) - - if len(results) == next_task_batch_size: - async for result in run_tasks_base( - leftover_tasks, - results[0] if next_task_batch_size == 1 else results, - user=user, - ): - yield result - - results = [] - - if len(results) > 0: - async for result in run_tasks_base(leftover_tasks, results, user): - yield result - - results = [] - - logger.info("Async generator task completed: `%s`", running_task.executable.__name__) - send_telemetry( - "Async Generator Task Completed", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - except Exception as error: - logger.error( - "Async generator task errored: `%s`\n%s\n", - running_task.executable.__name__, - str(error), - exc_info=True, - ) - send_telemetry( - "Async Generator Task Errored", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - raise error - - elif inspect.isgeneratorfunction(running_task.executable): - logger.info("Generator task started: `%s`", running_task.executable.__name__) - send_telemetry( - "Generator Task Started", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - try: - results = [] - - for partial_result in running_task.run(*args): - results.append(partial_result) - - if len(results) == next_task_batch_size: - async for result in run_tasks_base( - leftover_tasks, results[0] if next_task_batch_size == 1 else results, user - ): - yield result - - results = [] - - if len(results) > 0: - async for result in run_tasks_base(leftover_tasks, results, user): - yield result - - results = [] - - logger.info("Generator task completed: `%s`", running_task.executable.__name__) - send_telemetry( - "Generator Task Completed", - user_id=user.id, - additional_properties={ - "task_name": running_task.executable.__name__, - }, - ) - except Exception as error: - logger.error( - "Generator task errored: `%s`\n%s\n", - running_task.executable.__name__, - str(error), - exc_info=True, - ) - send_telemetry( - "Generator Task Errored", - user_id=user.id, - additional_properties={ - "task_name": running_task.executable.__name__, - }, - ) - raise error - - elif inspect.iscoroutinefunction(running_task.executable): - logger.info("Coroutine task started: `%s`", running_task.executable.__name__) - send_telemetry( - "Coroutine Task Started", - user_id=user.id, - additional_properties={ - "task_name": running_task.executable.__name__, - }, - ) - try: - task_result = await running_task.run(*args) - - async for result in run_tasks_base(leftover_tasks, task_result, user): - yield result - - logger.info("Coroutine task completed: `%s`", running_task.executable.__name__) - send_telemetry( - "Coroutine Task Completed", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - except Exception as error: - logger.error( - "Coroutine task errored: `%s`\n%s\n", - running_task.executable.__name__, - str(error), - exc_info=True, - ) - send_telemetry( - "Coroutine Task Errored", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - raise error - - elif inspect.isfunction(running_task.executable): - logger.info("Function task started: `%s`", running_task.executable.__name__) - send_telemetry( - "Function Task Started", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - try: - task_result = running_task.run(*args) - - async for result in run_tasks_base(leftover_tasks, task_result, user): - yield result - - logger.info("Function task completed: `%s`", running_task.executable.__name__) - send_telemetry( - "Function Task Completed", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - except Exception as error: - logger.error( - "Function task errored: `%s`\n%s\n", - running_task.executable.__name__, - str(error), - exc_info=True, - ) - send_telemetry( - "Function Task Errored", - user.id, - { - "task_name": running_task.executable.__name__, - }, - ) - raise error - - async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str): config = get_current_settings() diff --git a/cognee/modules/pipelines/operations/run_tasks_base.py b/cognee/modules/pipelines/operations/run_tasks_base.py new file mode 100644 index 000000000..9c5cdfdb8 --- /dev/null +++ b/cognee/modules/pipelines/operations/run_tasks_base.py @@ -0,0 +1,72 @@ +import inspect +from cognee.shared.logging_utils import get_logger +from cognee.modules.users.models import User +from cognee.shared.utils import send_telemetry + +from ..tasks.task import Task + +logger = get_logger("run_tasks_base") + + +async def handle_task( + running_task: Task, + args: list, + leftover_tasks: list[Task], + next_task_batch_size: int, + user: User, +): + """Handle common task workflow with logging, telemetry, and error handling around the core execution logic.""" + task_type = running_task.task_type + + logger.info(f"{task_type} task started: `{running_task.executable.__name__}`") + send_telemetry( + f"{task_type} Task Started", + user_id=user.id, + additional_properties={ + "task_name": running_task.executable.__name__, + }, + ) + + try: + async for result_data in running_task.execute(args, next_task_batch_size): + async for result in run_tasks_base(leftover_tasks, result_data, user): + yield result + + logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`") + send_telemetry( + f"{task_type} Task Completed", + user_id=user.id, + additional_properties={ + "task_name": running_task.executable.__name__, + }, + ) + except Exception as error: + logger.error( + f"{task_type} task errored: `{running_task.executable.__name__}`\n{str(error)}\n", + exc_info=True, + ) + send_telemetry( + f"{task_type} Task Errored", + user_id=user.id, + additional_properties={ + "task_name": running_task.executable.__name__, + }, + ) + raise error + + +async def run_tasks_base(tasks: list[Task], data=None, user: User = None): + """Base function to execute tasks in a pipeline, handling task type detection and execution.""" + if len(tasks) == 0: + yield data + return + + args = [data] if data is not None else [] + + running_task = tasks[0] + leftover_tasks = tasks[1:] + next_task = leftover_tasks[0] if len(leftover_tasks) > 0 else None + next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1 + + async for result in handle_task(running_task, args, leftover_tasks, next_task_batch_size, user): + yield result diff --git a/cognee/modules/pipelines/tasks/Task.py b/cognee/modules/pipelines/tasks/Task.py deleted file mode 100644 index 753152d0d..000000000 --- a/cognee/modules/pipelines/tasks/Task.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import Union, Callable, Any, Coroutine, Generator, AsyncGenerator - - -class Task: - executable: Union[ - Callable[..., Any], - Callable[..., Coroutine[Any, Any, Any]], - Generator[Any, Any, Any], - AsyncGenerator[Any, Any], - ] - task_config: dict[str, Any] = { - "batch_size": 1, - } - default_params: dict[str, Any] = {} - - def __init__(self, executable, *args, task_config=None, **kwargs): - self.executable = executable - self.default_params = {"args": args, "kwargs": kwargs} - - if task_config is not None: - self.task_config = task_config - - if "batch_size" not in task_config: - self.task_config["batch_size"] = 1 - - def run(self, *args, **kwargs): - combined_args = args + self.default_params["args"] - combined_kwargs = {**self.default_params["kwargs"], **kwargs} - - return self.executable(*combined_args, **combined_kwargs) diff --git a/cognee/modules/pipelines/tasks/task.py b/cognee/modules/pipelines/tasks/task.py new file mode 100644 index 000000000..7ace6b358 --- /dev/null +++ b/cognee/modules/pipelines/tasks/task.py @@ -0,0 +1,97 @@ +from typing import Union, Callable, Any, Coroutine, Generator, AsyncGenerator +import inspect + + +class Task: + executable: Union[ + Callable[..., Any], + Callable[..., Coroutine[Any, Any, Any]], + Generator[Any, Any, Any], + AsyncGenerator[Any, Any], + ] + task_config: dict[str, Any] = { + "batch_size": 1, + } + default_params: dict[str, Any] = {} + task_type: str = None + _execute_method: Callable = None + _next_batch_size: int = 1 + + def __init__(self, executable, *args, task_config=None, **kwargs): + self.executable = executable + self.default_params = {"args": args, "kwargs": kwargs} + + if inspect.isasyncgenfunction(executable): + self.task_type = "Async Generator" + self._execute_method = self.execute_async_generator + elif inspect.isgeneratorfunction(executable): + self.task_type = "Generator" + self._execute_method = self.execute_generator + elif inspect.iscoroutinefunction(executable): + self.task_type = "Coroutine" + self._execute_method = self.execute_coroutine + elif inspect.isfunction(executable): + self.task_type = "Function" + self._execute_method = self.execute_function + else: + raise ValueError(f"Unsupported task type: {executable}") + + if task_config is not None: + self.task_config = task_config + + if "batch_size" not in task_config: + self.task_config["batch_size"] = 1 + + def run(self, *args, **kwargs): + """Execute the underlying task with given arguments.""" + combined_args = args + self.default_params["args"] + combined_kwargs = {**self.default_params["kwargs"], **kwargs} + + return self.executable(*combined_args, **combined_kwargs) + + async def execute_async_generator(self, args): + """Execute async generator task and collect results in batches.""" + results = [] + async_iterator = self.run(*args) + + async for partial_result in async_iterator: + results.append(partial_result) + + if len(results) == self._next_batch_size: + yield results + results = [] + + if results: + yield results + + async def execute_generator(self, args): + """Execute generator task and collect results in batches.""" + results = [] + + for partial_result in self.run(*args): + results.append(partial_result) + + if len(results) == self._next_batch_size: + yield results + results = [] + + if results: + yield results + + async def execute_coroutine(self, args): + """Execute coroutine task and yield the result.""" + task_result = await self.run(*args) + yield task_result + + async def execute_function(self, args): + """Execute function task and yield the result.""" + task_result = self.run(*args) + yield task_result + + async def execute(self, args, next_batch_size=None): + """Execute the task based on its type and yield results with the next task's batch size.""" + if next_batch_size is not None: + self._next_batch_size = next_batch_size + + async for result in self._execute_method(args): + yield result diff --git a/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py b/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py index 0aa60f3b2..4aabb4241 100644 --- a/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py +++ b/cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py @@ -3,7 +3,7 @@ from queue import Queue import cognee from cognee.modules.pipelines.operations.run_tasks import run_tasks_base -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.infrastructure.databases.relational import create_db_and_tables @@ -19,11 +19,11 @@ async def pipeline(data_queue): else: await asyncio.sleep(0.3) - async def add_one(num): - yield num + 1 + async def add_one(num_list): + yield num_list[0] + 1 - async def multiply_by_two(num): - yield num * 2 + async def multiply_by_two(num_list): + yield num_list[0] * 2 await create_db_and_tables() user = await get_default_user() @@ -41,7 +41,7 @@ async def pipeline(data_queue): results = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] index = 0 async for result in tasks_run: - assert result == results[index], f"at {index = }: {result = } != {results[index] = }" + assert result[0] == results[index], f"at {index = }: {result = } != {results[index] = }" index += 1 diff --git a/cognee/tests/integration/run_toy_tasks/run_tasks_test.py b/cognee/tests/integration/run_toy_tasks/run_tasks_test.py index b1400beb5..7fbc905ee 100644 --- a/cognee/tests/integration/run_toy_tasks/run_tasks_test.py +++ b/cognee/tests/integration/run_toy_tasks/run_tasks_test.py @@ -2,7 +2,7 @@ import asyncio import cognee from cognee.modules.pipelines.operations.run_tasks import run_tasks_base -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.infrastructure.databases.relational import create_db_and_tables @@ -19,11 +19,11 @@ async def run_and_check_tasks(): for num in nums: yield num + 1 - async def multiply_by_two(num): - yield num * 2 + async def multiply_by_two(nums): + yield nums[0] * 2 - async def add_one_single(num): - yield num + 1 + async def add_one_single(nums): + yield nums[0] + 1 await create_db_and_tables() user = await get_default_user() @@ -42,7 +42,7 @@ async def run_and_check_tasks(): results = [5, 7, 9, 11, 13, 15, 17, 19, 21, 23] index = 0 async for result in pipeline: - assert result == results[index], f"at {index = }: {result = } != {results[index] = }" + assert result[0] == results[index], f"at {index = }: {result = } != {results[index] = }" index += 1 diff --git a/examples/python/pokemon_datapoints_example.py b/examples/python/pokemon_datapoints_example.py index 1c51ae468..e67967529 100644 --- a/examples/python/pokemon_datapoints_example.py +++ b/examples/python/pokemon_datapoints_example.py @@ -13,7 +13,7 @@ import cognee from cognee.low_level import DataPoint, setup as cognee_setup from cognee.api.v1.search import SearchType from cognee.tasks.storage import add_data_points -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines import run_tasks