cognee/cognee/modules/pipelines/operations/run_parallel.py
lxobr d1eab97102
feature: tighten run_tasks_base (#730)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->
- Extracted run_tasks_base function into a new file run_tasks_base.py.
- Extracted four executors that execute core logic based on the task
type.
- Extracted a task handler/wrapper that safely executes the core logic
with logging and telemetry.
- Fixed the inconsistency with the batches of size 1.

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-04-16 09:19:03 +02:00

13 lines
479 B
Python

from typing import Any, Callable, Generator, List
import asyncio
from ..tasks.task import Task
def run_tasks_parallel(tasks: List[Task]) -> Callable[[Any], Generator[Any, Any, Any]]:
async def parallel_run(*args, **kwargs):
parallel_tasks = [asyncio.create_task(task.run(*args, **kwargs)) for task in tasks]
results = await asyncio.gather(*parallel_tasks)
return results[len(results) - 1] if len(results) > 1 else []
return Task(parallel_run)