Release semphore before merge stage

This commit is contained in:
yangdx 2025-07-09 09:24:44 +08:00
parent 56d43de58a
commit cb3bfc0e5b
2 changed files with 58 additions and 59 deletions

View file

@ -1069,27 +1069,27 @@ class LightRAG:
}
)
# Semphore is NOT released here, because the merge_nodes_and_edges function is highly concurrent
# and more importantly, it is the bottleneck and needs to be resource controlled in massively
# parallel insertions
# Semphore is released here
# Concurrency is controlled by graph db lock for individual entities and relationships
if file_extraction_stage_ok:
try:
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache,
current_file_number=current_file_number,
total_files=total_files,
file_path=file_path,
)
if file_extraction_stage_ok:
try:
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache,
current_file_number=current_file_number,
total_files=total_files,
file_path=file_path,
)
await self.doc_status.upsert(
{
@ -1111,46 +1111,46 @@ class LightRAG:
}
)
# Call _insert_done after processing each file
await self._insert_done()
# Call _insert_done after processing each file
await self._insert_done()
async with pipeline_status_lock:
log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
async with pipeline_status_lock:
log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Persistent llm cache
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Persistent llm cache
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
)
# Create processing tasks for all documents
doc_tasks = []

View file

@ -36,6 +36,7 @@ from .base import (
)
from .prompt import PROMPTS
from .constants import GRAPH_FIELD_SEP
from .kg.shared_storage import get_graph_db_lock_keyed
import time
from dotenv import load_dotenv
@ -1121,8 +1122,6 @@ async def merge_nodes_and_edges(
pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache
"""
# Get lock manager from shared storage
from .kg.shared_storage import get_graph_db_lock_keyed
# Collect all nodes and edges from all chunks