From 5d2121e1a3acf36fdcd4aae036555441b084a998 Mon Sep 17 00:00:00 2001 From: Daniel Chalef <131175+danielchalef@users.noreply.github.com> Date: Sun, 22 Sep 2024 13:38:54 -0700 Subject: [PATCH] limit community building concurrency (#142) --- .../utils/maintenance/community_operations.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/graphiti_core/utils/maintenance/community_operations.py b/graphiti_core/utils/maintenance/community_operations.py index 53fe6122..014469c9 100644 --- a/graphiti_core/utils/maintenance/community_operations.py +++ b/graphiti_core/utils/maintenance/community_operations.py @@ -11,6 +11,9 @@ from graphiti_core.nodes import CommunityNode, EntityNode, get_community_node_fr from graphiti_core.prompts import prompt_library from graphiti_core.utils.maintenance.edge_operations import build_community_edges +MAX_COMMUNITY_BUILD_CONCURRENCY = 10 + + logger = logging.getLogger(__name__) @@ -132,10 +135,14 @@ async def build_communities( projection = await build_community_projection(driver) community_clusters = await get_community_clusters(driver, projection) + semaphore = asyncio.Semaphore(MAX_COMMUNITY_BUILD_CONCURRENCY) + + async def limited_build_community(cluster): + async with semaphore: + return await build_community(llm_client, cluster) + communities: list[tuple[CommunityNode, list[CommunityEdge]]] = list( - await asyncio.gather( - *[build_community(llm_client, cluster) for cluster in community_clusters] - ) + await asyncio.gather(*[limited_build_community(cluster) for cluster in community_clusters]) ) community_nodes: list[CommunityNode] = [] @@ -236,4 +243,4 @@ async def update_community( await community.generate_name_embedding(embedder) - await community.save(driver) + await community.save(driver) \ No newline at end of file