diff --git a/lightrag/operate.py b/lightrag/operate.py index cbfbb858..22ed9117 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -30,6 +30,7 @@ from .utils import ( pick_by_vector_similarity, process_chunks_unified, build_file_path, + safe_vdb_operation_with_exception, ) from .base import ( BaseGraphStorage, @@ -930,24 +931,24 @@ async def _rebuild_single_entity( async def _update_entity_storage( final_description: str, entity_type: str, file_paths: set[str] ): - # Update entity in graph storage - updated_entity_data = { - **current_entity, - "description": final_description, - "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), - "file_path": GRAPH_FIELD_SEP.join(file_paths) - if file_paths - else current_entity.get("file_path", "unknown_source"), - } - await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + try: + # Update entity in graph storage (critical path) + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) - # Update entity in vector database - entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + # Update entity in vector database (equally critical) + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + entity_content = f"{entity_name}\n{final_description}" - entity_content = f"{entity_name}\n{final_description}" - await entities_vdb.upsert( - { + vdb_data = { entity_vdb_id: { "content": entity_content, "entity_name": entity_name, @@ -957,7 +958,20 @@ async def _rebuild_single_entity( "file_path": updated_entity_data["file_path"], } } - ) + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: entities_vdb.upsert(vdb_data), + operation_name="rebuild_entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) + + except Exception as e: + error_msg = f"Failed to update entity storage for `{entity_name}`: {e}" + logger.error(error_msg) + raise # Re-raise exception # Collect all entity data from relevant chunks all_entity_data = [] @@ -1145,21 +1159,21 @@ async def _rebuild_single_relationship( await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) # Update relationship in vector database - rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") - rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") - - # Delete old vector records first (both directions to be safe) try: - await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) - except Exception as e: - logger.debug( - f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" - ) + rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") - # Insert new vector record - rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}" - await relationships_vdb.upsert( - { + # Delete old vector records first (both directions to be safe) + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + + # Insert new vector record + rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}" + vdb_data = { rel_vdb_id: { "src_id": src, "tgt_id": tgt, @@ -1171,7 +1185,20 @@ async def _rebuild_single_relationship( "file_path": updated_relationship_data["file_path"], } } - ) + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: relationships_vdb.upsert(vdb_data), + operation_name="rebuild_relationship_upsert", + entity_name=f"{src}-{tgt}", + max_retries=3, + retry_delay=0.2, + ) + + except Exception as e: + error_msg = f"Failed to rebuild relationship storage for `{src}-{tgt}`: {e}" + logger.error(error_msg) + raise # Re-raise exception async def _merge_nodes_then_upsert( @@ -1516,27 +1543,68 @@ async def merge_nodes_and_edges( async with get_storage_keyed_lock( [entity_name], namespace=namespace, enable_logging=False ): - entity_data = await _merge_nodes_then_upsert( - entity_name, - entities, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) - if entity_vdb is not None: - data_for_vdb = { - compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get("file_path", "unknown_source"), + try: + # Graph database operation (critical path, must succeed) + entity_data = await _merge_nodes_then_upsert( + entity_name, + entities, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + + # Vector database operation (equally critical, must succeed) + if entity_vdb is not None and entity_data: + data_for_vdb = { + compute_mdhash_id( + entity_data["entity_name"], prefix="ent-" + ): { + "entity_name": entity_data["entity_name"], + "entity_type": entity_data["entity_type"], + "content": f"{entity_data['entity_name']}\n{entity_data['description']}", + "source_id": entity_data["source_id"], + "file_path": entity_data.get( + "file_path", "unknown_source" + ), + } } - } - await entity_vdb.upsert(data_for_vdb) - return entity_data + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: entity_vdb.upsert(data_for_vdb), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) + + return entity_data + + except Exception as e: + # Any database operation failure is critical + error_msg = ( + f"Critical error in entity processing for `{entity_name}`: {e}" + ) + logger.error(error_msg) + + # Try to update pipeline status, but don't let status update failure affect main exception + try: + if ( + pipeline_status is not None + and pipeline_status_lock is not None + ): + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + except Exception as status_error: + logger.error( + f"Failed to update pipeline status: {status_error}" + ) + + # Re-raise the original exception + raise # Create entity processing tasks entity_tasks = [] @@ -1584,38 +1652,75 @@ async def merge_nodes_and_edges( namespace=namespace, enable_logging=False, ): - added_entities = [] # Track entities added during edge processing - edge_data = await _merge_edges_then_upsert( - edge_key[0], - edge_key[1], - edges, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - added_entities, # Pass list to collect added entities - ) + try: + added_entities = [] # Track entities added during edge processing - if edge_data is None: - return None, [] + # Graph database operation (critical path, must succeed) + edge_data = await _merge_edges_then_upsert( + edge_key[0], + edge_key[1], + edges, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + added_entities, # Pass list to collect added entities + ) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get("file_path", "unknown_source"), - "weight": edge_data.get("weight", 1.0), + if edge_data is None: + return None, [] + + # Vector database operation (equally critical, must succeed) + if relationships_vdb is not None: + data_for_vdb = { + compute_mdhash_id( + edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" + ): { + "src_id": edge_data["src_id"], + "tgt_id": edge_data["tgt_id"], + "keywords": edge_data["keywords"], + "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", + "source_id": edge_data["source_id"], + "file_path": edge_data.get( + "file_path", "unknown_source" + ), + "weight": edge_data.get("weight", 1.0), + } } - } - await relationships_vdb.upsert(data_for_vdb) - return edge_data, added_entities + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: relationships_vdb.upsert(data_for_vdb), + operation_name="relationship_upsert", + entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", + max_retries=3, + retry_delay=0.1, + ) + + return edge_data, added_entities + + except Exception as e: + # Any database operation failure is critical + error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" + logger.error(error_msg) + + # Try to update pipeline status, but don't let status update failure affect main exception + try: + if ( + pipeline_status is not None + and pipeline_status_lock is not None + ): + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + except Exception as status_error: + logger.error( + f"Failed to update pipeline status: {status_error}" + ) + + # Re-raise the original exception + raise # Create relationship processing tasks edge_tasks = [] diff --git a/lightrag/utils.py b/lightrag/utils.py index f2d56282..83453a7e 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -14,7 +14,7 @@ from dataclasses import dataclass from datetime import datetime from functools import wraps from hashlib import md5 -from typing import Any, Protocol, Callable, TYPE_CHECKING, List +from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional import numpy as np from dotenv import load_dotenv @@ -57,6 +57,53 @@ except ImportError: ) +async def safe_vdb_operation_with_exception( + operation: Callable, + operation_name: str, + entity_name: str = "", + max_retries: int = 3, + retry_delay: float = 0.2, + logger_func: Optional[Callable] = None +) -> None: + """ + Safely execute vector database operations with retry mechanism and exception handling. + + This function ensures that VDB operations are executed with proper error handling + and retry logic. If all retries fail, it raises an exception to maintain data consistency. + + Args: + operation: The async operation to execute + operation_name: Operation name for logging purposes + entity_name: Entity name for logging purposes + max_retries: Maximum number of retry attempts + retry_delay: Delay between retries in seconds + logger_func: Logger function to use for error messages + + Raises: + Exception: When operation fails after all retry attempts + """ + log_func = logger_func or logger.warning + last_exception = None + + for attempt in range(max_retries): + try: + await operation() + return # Success, return immediately + except Exception as e: + last_exception = e + if attempt == max_retries - 1: + error_msg = f"VDB {operation_name} failed for {entity_name} after {max_retries} attempts: {e}" + log_func(error_msg) + raise Exception(error_msg) from e + else: + log_func(f"VDB {operation_name} attempt {attempt + 1} failed for {entity_name}: {e}, retrying...") + if retry_delay > 0: + await asyncio.sleep(retry_delay) + + # This line should theoretically never be reached, but included for safety + raise Exception(f"Max retries exceeded for {operation_name}") from last_exception + + def get_env_value( env_key: str, default: any, value_type: type = str, special_none: bool = False ) -> any: