diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0961da16..9871d4aa 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -57,9 +57,9 @@ _lock_registry_count: Optional[Dict[str, int]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None # Timeout for keyed locks in seconds -CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 150 +CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 # Threshold for triggering cleanup - only clean when pending list exceeds this size -CLEANUP_THRESHOLD = 200 +CLEANUP_THRESHOLD = 500 _initialized = None diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index c07c6a53..feb7ab16 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1094,86 +1094,89 @@ class LightRAG: } ) - # Semphore is released here - # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: - try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task - await merge_nodes_and_edges( - chunk_results=chunk_results, # result collected from entity_relation_task - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - llm_response_cache=self.llm_response_cache, - current_file_number=current_file_number, - total_files=total_files, - file_path=file_path, - ) - - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "chunks_list": list( - chunks.keys() - ), # 保留 chunks_list - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now( - timezone.utc - ).isoformat(), - "file_path": file_path, - } - } - ) - - # Call _insert_done after processing each file - await self._insert_done() - - async with pipeline_status_lock: - log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() + # Concurrency is controlled by graph db lock for individual entities and relationships + if file_extraction_stage_ok: + try: + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task + await merge_nodes_and_edges( + chunk_results=chunk_results, # result collected from entity_relation_task + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, + current_file_number=current_file_number, + total_files=total_files, + file_path=file_path, ) - pipeline_status["history_messages"].append(error_msg) - # Persistent llm cache - if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() - - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "chunks_list": list( + chunks.keys() + ), # 保留 chunks_list + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now( + timezone.utc + ).isoformat(), + "file_path": file_path, + } } - } - ) + ) + + # Call _insert_done after processing each file + await self._insert_done() + + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append( + log_message + ) + + except Exception as e: + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) + + # Persistent llm cache + if self.llm_response_cache: + await self.llm_response_cache.index_done_callback() + + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) # Create processing tasks for all documents doc_tasks = []