From 8c02676b4f3f98795f403f231fa4bc0b6bd3c39a Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Wed, 4 Jun 2025 16:39:34 +0200 Subject: [PATCH] fix: parallelize crewai --- .../v1/crewai/routers/get_crewai_router.py | 20 ++++++++++++------- .../custom_tools/cognee_ingestion.py | 11 ++++++---- .../crewai_demo/custom_tools/cognee_search.py | 9 +++++++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/cognee/api/v1/crewai/routers/get_crewai_router.py b/cognee/api/v1/crewai/routers/get_crewai_router.py index 92adbcb87..d2eaf91f6 100644 --- a/cognee/api/v1/crewai/routers/get_crewai_router.py +++ b/cognee/api/v1/crewai/routers/get_crewai_router.py @@ -79,7 +79,14 @@ def get_crewai_router() -> APIRouter: "applicant_2": payload.username2, } - run_hiring_crew(user, applicants=applicants, number_of_rounds=2) + def run_crewai_in_thread(): + run_hiring_crew(user, applicants=applicants, number_of_rounds=2) + + async def run_crewai_async(): + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, run_crewai_in_thread) + + await run_crewai_async() return True @@ -91,13 +98,12 @@ def get_crewai_router() -> APIRouter: ), ): from cognee import add, cognify - # from secrets import choice - # from string import ascii_letters, digits - - # hash6 = "".join(choice(ascii_letters + digits) for _ in range(6)) + # Set context based database settings if necessary dataset_name = "Github" - await add(payload.feedback, node_set=["final_report"], dataset_name=dataset_name, user=user) - await cognify(datasets=dataset_name, is_stream_info_enabled=True, user=user) + await set_database_global_context_variables(dataset_name, user.id) + + await add(payload.feedback, node_set=["final_report"], dataset_name=dataset_name) + await cognify(datasets=dataset_name, is_stream_info_enabled=True) @router.websocket("/subscribe") async def subscribe_to_crewai_info(websocket: WebSocket): diff --git a/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_ingestion.py b/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_ingestion.py index bcd07e72b..721d1d83e 100644 --- a/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_ingestion.py +++ b/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_ingestion.py @@ -54,14 +54,17 @@ class CogneeIngestion(BaseTool): return f"Error during ingestion: {str(e)}" try: - loop = asyncio.get_event_loop() + try: + loop = asyncio.get_event_loop() + + if not loop.is_running(): + loop = asyncio.new_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() if not loop.is_running(): - loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - nest_asyncio.apply(loop) - result = loop.run_until_complete(main()) return result diff --git a/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_search.py b/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_search.py index d801c1b18..309ebb9e0 100644 --- a/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_search.py +++ b/cognee/complex_demos/crewai_demo/src/crewai_demo/custom_tools/cognee_search.py @@ -61,10 +61,15 @@ class CogneeSearch(BaseTool): return f"Error: {str(e)}" try: - loop = asyncio.get_event_loop() + try: + loop = asyncio.get_event_loop() + + if not loop.is_running(): + loop = asyncio.new_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() if not loop.is_running(): - loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) nest_asyncio.apply(loop)