diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 7fde953b..01e513c5 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1069,27 +1069,27 @@ class LightRAG: } ) - # Semphore is NOT released here, because the merge_nodes_and_edges function is highly concurrent - # and more importantly, it is the bottleneck and needs to be resource controlled in massively - # parallel insertions + # Semphore is released here + # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: - try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task - await merge_nodes_and_edges( - chunk_results=chunk_results, # result collected from entity_relation_task - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - llm_response_cache=self.llm_response_cache, - current_file_number=current_file_number, - total_files=total_files, - file_path=file_path, - ) + + if file_extraction_stage_ok: + try: + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task + await merge_nodes_and_edges( + chunk_results=chunk_results, # result collected from entity_relation_task + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, + current_file_number=current_file_number, + total_files=total_files, + file_path=file_path, + ) await self.doc_status.upsert( { @@ -1111,46 +1111,46 @@ class LightRAG: } ) - # Call _insert_done after processing each file - await self._insert_done() + # Call _insert_done after processing each file + await self._insert_done() - async with pipeline_status_lock: - log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) - - # Persistent llm cache - if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() - - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, - } - } + except Exception as e: + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() ) + pipeline_status["history_messages"].append(error_msg) + + # Persistent llm cache + if self.llm_response_cache: + await self.llm_response_cache.index_done_callback() + + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) # Create processing tasks for all documents doc_tasks = [] diff --git a/lightrag/operate.py b/lightrag/operate.py index cff40b0a..1a410469 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -36,6 +36,7 @@ from .base import ( ) from .prompt import PROMPTS from .constants import GRAPH_FIELD_SEP +from .kg.shared_storage import get_graph_db_lock_keyed import time from dotenv import load_dotenv @@ -1121,8 +1122,6 @@ async def merge_nodes_and_edges( pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache """ - # Get lock manager from shared storage - from .kg.shared_storage import get_graph_db_lock_keyed # Collect all nodes and edges from all chunks