From ef4870fda51b8173ae0ef6ae6c41b65bc35f410b Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 16:34:54 +0800 Subject: [PATCH] Combined entity and edge processing tasks and optimize merging with semaphore --- lightrag/lightrag.py | 1 - lightrag/operate.py | 138 ++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 67 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index e41e1d7c..6f04a43f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1078,7 +1078,6 @@ class LightRAG: # Semphore is released here # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: try: # Get chunk_results from entity_relation_task diff --git a/lightrag/operate.py b/lightrag/operate.py index c50a24be..dd65f031 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1016,7 +1016,7 @@ async def _merge_edges_then_upsert( ) for need_insert_id in [src_id, tgt_id]: - if (await knowledge_graph_inst.has_node(need_insert_id)): + if await knowledge_graph_inst.has_node(need_insert_id): # This is so that the initial check for the existence of the node need not be locked continue async with get_graph_db_lock_keyed([need_insert_id], enable_logging=False): @@ -1124,7 +1124,6 @@ async def merge_nodes_and_edges( llm_response_cache: LLM response cache """ - # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -1145,92 +1144,99 @@ async def merge_nodes_and_edges( # Merge nodes and edges async with pipeline_status_lock: - log_message = ( - f"Merging stage {current_file_number}/{total_files}: {file_path}" - ) + log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # Process and update all entities at once - log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}" + # Process and update all entities and relationships in parallel + log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) + # Get max async tasks limit from global_config for semaphore control + llm_model_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(llm_model_max_async) + async def _locked_process_entity_name(entity_name, entities): - async with get_graph_db_lock_keyed([entity_name], enable_logging=False): - entity_data = await _merge_nodes_then_upsert( - entity_name, - entities, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) - if entity_vdb is not None: - data_for_vdb = { - compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get("file_path", "unknown_source"), + async with semaphore: + async with get_graph_db_lock_keyed([entity_name], enable_logging=False): + entity_data = await _merge_nodes_then_upsert( + entity_name, + entities, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + if entity_vdb is not None: + data_for_vdb = { + compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { + "entity_name": entity_data["entity_name"], + "entity_type": entity_data["entity_type"], + "content": f"{entity_data['entity_name']}\n{entity_data['description']}", + "source_id": entity_data["source_id"], + "file_path": entity_data.get("file_path", "unknown_source"), + } } - } - await entity_vdb.upsert(data_for_vdb) - return entity_data - - tasks = [] - for entity_name, entities in all_nodes.items(): - tasks.append(asyncio.create_task(_locked_process_entity_name(entity_name, entities))) - await asyncio.gather(*tasks) - - # Process and update all relationships at once - log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + await entity_vdb.upsert(data_for_vdb) + return entity_data async def _locked_process_edges(edge_key, edges): - async with get_graph_db_lock_keyed(f"{edge_key[0]}-{edge_key[1]}", enable_logging=False): - edge_data = await _merge_edges_then_upsert( - edge_key[0], - edge_key[1], - edges, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) - if edge_data is None: - return None + async with semaphore: + async with get_graph_db_lock_keyed( + f"{edge_key[0]}-{edge_key[1]}", enable_logging=False + ): + edge_data = await _merge_edges_then_upsert( + edge_key[0], + edge_key[1], + edges, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + if edge_data is None: + return None - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id(edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get("file_path", "unknown_source"), + if relationships_vdb is not None: + data_for_vdb = { + compute_mdhash_id( + edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" + ): { + "src_id": edge_data["src_id"], + "tgt_id": edge_data["tgt_id"], + "keywords": edge_data["keywords"], + "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", + "source_id": edge_data["source_id"], + "file_path": edge_data.get("file_path", "unknown_source"), + } } - } - await relationships_vdb.upsert(data_for_vdb) - return edge_data + await relationships_vdb.upsert(data_for_vdb) + return edge_data + # Create a single task queue for both entities and edges tasks = [] + + # Add entity processing tasks + for entity_name, entities in all_nodes.items(): + tasks.append( + asyncio.create_task(_locked_process_entity_name(entity_name, entities)) + ) + + # Add edge processing tasks for edge_key, edges in all_edges.items(): tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) + + # Execute all tasks in parallel with semaphore control await asyncio.gather(*tasks) + async def extract_entities( chunks: dict[str, TextChunkSchema], global_config: dict[str, str],