Move merging stage back controled by max parallel insert semhore

This commit is contained in:
yangdx 2025-07-12 03:32:08 +08:00
parent 7490a18481
commit 39965d7ded
2 changed files with 82 additions and 79 deletions

View file

@ -57,9 +57,9 @@ _lock_registry_count: Optional[Dict[str, int]] = None
_lock_cleanup_data: Optional[Dict[str, time.time]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None
_registry_guard = None _registry_guard = None
# Timeout for keyed locks in seconds # Timeout for keyed locks in seconds
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 150 CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
# Threshold for triggering cleanup - only clean when pending list exceeds this size # Threshold for triggering cleanup - only clean when pending list exceeds this size
CLEANUP_THRESHOLD = 200 CLEANUP_THRESHOLD = 500
_initialized = None _initialized = None

View file

@ -1094,86 +1094,89 @@ class LightRAG:
} }
) )
# Semphore is released here # Concurrency is controlled by graph db lock for individual entities and relationships
# Concurrency is controlled by graph db lock for individual entities and relationships if file_extraction_stage_ok:
if file_extraction_stage_ok: try:
try: # Get chunk_results from entity_relation_task
# Get chunk_results from entity_relation_task chunk_results = await entity_relation_task
chunk_results = await entity_relation_task await merge_nodes_and_edges(
await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task
chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph,
knowledge_graph_inst=self.chunk_entity_relation_graph, entity_vdb=self.entities_vdb,
entity_vdb=self.entities_vdb, relationships_vdb=self.relationships_vdb,
relationships_vdb=self.relationships_vdb, global_config=asdict(self),
global_config=asdict(self), pipeline_status=pipeline_status,
pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock,
pipeline_status_lock=pipeline_status_lock, llm_response_cache=self.llm_response_cache,
llm_response_cache=self.llm_response_cache, current_file_number=current_file_number,
current_file_number=current_file_number, total_files=total_files,
total_files=total_files, file_path=file_path,
file_path=file_path,
)
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"chunks_list": list(
chunks.keys()
), # 保留 chunks_list
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now(
timezone.utc
).isoformat(),
"file_path": file_path,
}
}
)
# Call _insert_done after processing each file
await self._insert_done()
async with pipeline_status_lock:
log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
) )
pipeline_status["history_messages"].append(error_msg)
# Persistent llm cache await self.doc_status.upsert(
if self.llm_response_cache: {
await self.llm_response_cache.index_done_callback() doc_id: {
"status": DocStatus.PROCESSED,
# Update document status to failed "chunks_count": len(chunks),
await self.doc_status.upsert( "chunks_list": list(
{ chunks.keys()
doc_id: { ), # 保留 chunks_list
"status": DocStatus.FAILED, "content": status_doc.content,
"error": str(e), "content_summary": status_doc.content_summary,
"content": status_doc.content, "content_length": status_doc.content_length,
"content_summary": status_doc.content_summary, "created_at": status_doc.created_at,
"content_length": status_doc.content_length, "updated_at": datetime.now(
"created_at": status_doc.created_at, timezone.utc
"updated_at": datetime.now().isoformat(), ).isoformat(),
"file_path": file_path, "file_path": file_path,
}
} }
} )
)
# Call _insert_done after processing each file
await self._insert_done()
async with pipeline_status_lock:
log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(
log_message
)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
)
# Create processing tasks for all documents # Create processing tasks for all documents
doc_tasks = [] doc_tasks = []