Optimize knowledge graph rebuild with parallel processing

- Add parallel processing for KG rebuild
- Implement keyed locks for data consistency
This commit is contained in:
yangdx 2025-07-12 13:22:56 +08:00
parent a85d7054d4
commit e4bf4d19a0

View file

@ -275,20 +275,26 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status: dict | None = None, pipeline_status: dict | None = None,
pipeline_status_lock=None, pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results """Rebuild entity and relationship descriptions from cached extraction results with parallel processing
This method uses cached LLM extraction results instead of calling LLM again, This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process. following the same approach as the insert process. Now with parallel processing
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
Args: Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data} knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entity vector database
relationships_vdb: Relationship vector database
text_chunks_storage: Text chunks storage
llm_response_cache: LLM response cache
global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status
""" """
if not entities_to_rebuild and not relationships_to_rebuild: if not entities_to_rebuild and not relationships_to_rebuild:
return return
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
# Get all referenced chunk IDs # Get all referenced chunk IDs
all_referenced_chunk_ids = set() all_referenced_chunk_ids = set()
@ -297,7 +303,7 @@ async def _rebuild_knowledge_from_chunks(
for chunk_ids in relationships_to_rebuild.values(): for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids) all_referenced_chunk_ids.update(chunk_ids)
status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)"
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -367,66 +373,116 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message) pipeline_status["history_messages"].append(status_message)
continue continue
# Rebuild entities # Get max async tasks limit from global_config for semaphore control
llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
semaphore = asyncio.Semaphore(llm_model_max_async)
# Counters for tracking progress
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
failed_entities_count = 0
failed_relationships_count = 0
async def _locked_rebuild_entity(entity_name, chunk_ids):
nonlocal rebuilt_entities_count, failed_entities_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _locked_rebuild_relationship(src, tgt, chunk_ids):
nonlocal rebuilt_relationships_count, failed_relationships_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
f"{src}-{tgt}", namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_relationships_count += 1
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Create tasks for parallel processing
tasks = []
# Add entity rebuilding tasks
for entity_name, chunk_ids in entities_to_rebuild.items(): for entity_name, chunk_ids in entities_to_rebuild.items():
try: task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids))
await _rebuild_single_entity( tasks.append(task)
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Rebuild relationships # Add relationship rebuilding tasks
for (src, tgt), chunk_ids in relationships_to_rebuild.items(): for (src, tgt), chunk_ids in relationships_to_rebuild.items():
try: task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids))
await _rebuild_single_relationship( tasks.append(task)
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb, # Log parallel processing start
src=src, status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (max concurrent: {llm_model_max_async})"
tgt=tgt, logger.info(status_message)
chunk_ids=chunk_ids, if pipeline_status is not None and pipeline_status_lock is not None:
chunk_relationships=chunk_relationships, async with pipeline_status_lock:
llm_response_cache=llm_response_cache, pipeline_status["latest_message"] = status_message
global_config=global_config, pipeline_status["history_messages"].append(status_message)
)
rebuilt_relationships_count += 1 # Execute all tasks in parallel with semaphore control
status_message = ( await asyncio.gather(*tasks)
f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
) # Final status report
logger.info(status_message) status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
if pipeline_status is not None and pipeline_status_lock is not None: if failed_entities_count > 0 or failed_relationships_count > 0:
async with pipeline_status_lock: status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships."
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -726,7 +782,11 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
) -> None: ) -> None:
"""Rebuild a single relationship from cached extraction results""" """Rebuild a single relationship from cached extraction results
Note: This function assumes the caller has already acquired the appropriate
keyed lock for the relationship pair to ensure thread safety.
"""
# Get current relationship data # Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt) current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
@ -1148,7 +1208,7 @@ async def merge_nodes_and_edges(
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
# Get max async tasks limit from global_config for semaphore control # Get max async tasks limit from global_config for semaphore control
llm_model_max_async = global_config.get("llm_model_max_async", 4) llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
semaphore = asyncio.Semaphore(llm_model_max_async) semaphore = asyncio.Semaphore(llm_model_max_async)
async def _locked_process_entity_name(entity_name, entities): async def _locked_process_entity_name(entity_name, entities):