diff --git a/lightrag/operate.py b/lightrag/operate.py index 3b81ae89..9eb060bb 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -374,8 +374,8 @@ async def _rebuild_knowledge_from_chunks( continue # Get max async tasks limit from global_config for semaphore control - llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1 - semaphore = asyncio.Semaphore(llm_model_max_async) + graph_max_async = global_config.get("llm_model_max_async", 4) * 2 + semaphore = asyncio.Semaphore(graph_max_async) # Counters for tracking progress rebuilt_entities_count = 0 @@ -468,7 +468,7 @@ async def _rebuild_knowledge_from_chunks( tasks.append(task) # Log parallel processing start - status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (max concurrent: {llm_model_max_async})" + status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (async: {graph_max_async})" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -1200,17 +1200,17 @@ async def merge_nodes_and_edges( pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) + # Get max async tasks limit from global_config for semaphore control + graph_max_async = global_config.get("llm_model_max_async", 4) * 2 + semaphore = asyncio.Semaphore(graph_max_async) + # Process and update all entities and relationships in parallel - log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations" + log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})" logger.info(log_message) async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # Get max async tasks limit from global_config for semaphore control - llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1 - semaphore = asyncio.Semaphore(llm_model_max_async) - async def _locked_process_entity_name(entity_name, entities): async with semaphore: workspace = global_config.get("workspace", "") @@ -1502,8 +1502,8 @@ async def extract_entities( return maybe_nodes, maybe_edges # Get max async tasks limit from global_config - llm_model_max_async = global_config.get("llm_model_max_async", 4) - semaphore = asyncio.Semaphore(llm_model_max_async) + chunk_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(chunk_max_async) async def _process_with_semaphore(chunk): async with semaphore: