From e4bf4d19a0472ba7067dd19c93f0e2641d769382 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 13:22:56 +0800 Subject: [PATCH] Optimize knowledge graph rebuild with parallel processing - Add parallel processing for KG rebuild - Implement keyed locks for data consistency --- lightrag/operate.py | 188 +++++++++++++++++++++++++++++--------------- 1 file changed, 124 insertions(+), 64 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 4a06404e..3b81ae89 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -275,20 +275,26 @@ async def _rebuild_knowledge_from_chunks( pipeline_status: dict | None = None, pipeline_status_lock=None, ) -> None: - """Rebuild entity and relationship descriptions from cached extraction results + """Rebuild entity and relationship descriptions from cached extraction results with parallel processing This method uses cached LLM extraction results instead of calling LLM again, - following the same approach as the insert process. + following the same approach as the insert process. Now with parallel processing + controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids - text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data} + knowledge_graph_inst: Knowledge graph storage + entities_vdb: Entity vector database + relationships_vdb: Relationship vector database + text_chunks_storage: Text chunks storage + llm_response_cache: LLM response cache + global_config: Global configuration containing llm_model_max_async + pipeline_status: Pipeline status dictionary + pipeline_status_lock: Lock for pipeline status """ if not entities_to_rebuild and not relationships_to_rebuild: return - rebuilt_entities_count = 0 - rebuilt_relationships_count = 0 # Get all referenced chunk IDs all_referenced_chunk_ids = set() @@ -297,7 +303,7 @@ async def _rebuild_knowledge_from_chunks( for chunk_ids in relationships_to_rebuild.values(): all_referenced_chunk_ids.update(chunk_ids) - status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" + status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -367,66 +373,116 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) continue - # Rebuild entities + # Get max async tasks limit from global_config for semaphore control + llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1 + semaphore = asyncio.Semaphore(llm_model_max_async) + + # Counters for tracking progress + rebuilt_entities_count = 0 + rebuilt_relationships_count = 0 + failed_entities_count = 0 + failed_relationships_count = 0 + + async def _locked_rebuild_entity(entity_name, chunk_ids): + nonlocal rebuilt_entities_count, failed_entities_count + async with semaphore: + workspace = global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + [entity_name], namespace=namespace, enable_logging=False + ): + try: + await _rebuild_single_entity( + knowledge_graph_inst=knowledge_graph_inst, + entities_vdb=entities_vdb, + entity_name=entity_name, + chunk_ids=chunk_ids, + chunk_entities=chunk_entities, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + rebuilt_entities_count += 1 + status_message = ( + f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks" + ) + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + except Exception as e: + failed_entities_count += 1 + status_message = f"Failed to rebuild entity {entity_name}: {e}" + logger.info(status_message) # Per requirement, change to info + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + async def _locked_rebuild_relationship(src, tgt, chunk_ids): + nonlocal rebuilt_relationships_count, failed_relationships_count + async with semaphore: + workspace = global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + f"{src}-{tgt}", namespace=namespace, enable_logging=False + ): + try: + await _rebuild_single_relationship( + knowledge_graph_inst=knowledge_graph_inst, + relationships_vdb=relationships_vdb, + src=src, + tgt=tgt, + chunk_ids=chunk_ids, + chunk_relationships=chunk_relationships, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + rebuilt_relationships_count += 1 + status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + except Exception as e: + failed_relationships_count += 1 + status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" + logger.info(status_message) # Per requirement, change to info + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + # Create tasks for parallel processing + tasks = [] + + # Add entity rebuilding tasks for entity_name, chunk_ids in entities_to_rebuild.items(): - try: - await _rebuild_single_entity( - knowledge_graph_inst=knowledge_graph_inst, - entities_vdb=entities_vdb, - entity_name=entity_name, - chunk_ids=chunk_ids, - chunk_entities=chunk_entities, - llm_response_cache=llm_response_cache, - global_config=global_config, - ) - rebuilt_entities_count += 1 - status_message = ( - f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - except Exception as e: - status_message = f"Failed to rebuild entity {entity_name}: {e}" - logger.info(status_message) # Per requirement, change to info - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) + task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids)) + tasks.append(task) - # Rebuild relationships + # Add relationship rebuilding tasks for (src, tgt), chunk_ids in relationships_to_rebuild.items(): - try: - await _rebuild_single_relationship( - knowledge_graph_inst=knowledge_graph_inst, - relationships_vdb=relationships_vdb, - src=src, - tgt=tgt, - chunk_ids=chunk_ids, - chunk_relationships=chunk_relationships, - llm_response_cache=llm_response_cache, - global_config=global_config, - ) - rebuilt_relationships_count += 1 - status_message = ( - f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - except Exception as e: - status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) + task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids)) + tasks.append(task) + + # Log parallel processing start + status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (max concurrent: {llm_model_max_async})" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + # Execute all tasks in parallel with semaphore control + await asyncio.gather(*tasks) + + # Final status report + status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully." + if failed_entities_count > 0 or failed_relationships_count > 0: + status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships." - status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships." logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -726,7 +782,11 @@ async def _rebuild_single_relationship( llm_response_cache: BaseKVStorage, global_config: dict[str, str], ) -> None: - """Rebuild a single relationship from cached extraction results""" + """Rebuild a single relationship from cached extraction results + + Note: This function assumes the caller has already acquired the appropriate + keyed lock for the relationship pair to ensure thread safety. + """ # Get current relationship data current_relationship = await knowledge_graph_inst.get_edge(src, tgt) @@ -1148,7 +1208,7 @@ async def merge_nodes_and_edges( pipeline_status["history_messages"].append(log_message) # Get max async tasks limit from global_config for semaphore control - llm_model_max_async = global_config.get("llm_model_max_async", 4) + llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1 semaphore = asyncio.Semaphore(llm_model_max_async) async def _locked_process_entity_name(entity_name, entities):