Add VDB error handling with retries for data consistency

- Add safe_vdb_operation_with_exception util
- Wrap VDB ops in entity/relationship code
- Ensure exceptions propagate on failure
- Add retry logic with configurable delays
This commit is contained in:
yangdx 2025-09-03 21:15:09 +08:00
parent 61fb2444f0
commit 7ef2f0dff6
2 changed files with 233 additions and 81 deletions

View file

@ -30,6 +30,7 @@ from .utils import (
pick_by_vector_similarity,
process_chunks_unified,
build_file_path,
safe_vdb_operation_with_exception,
)
from .base import (
BaseGraphStorage,
@ -930,24 +931,24 @@ async def _rebuild_single_entity(
async def _update_entity_storage(
final_description: str, entity_type: str, file_paths: set[str]
):
# Update entity in graph storage
updated_entity_data = {
**current_entity,
"description": final_description,
"entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_entity.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
try:
# Update entity in graph storage (critical path)
updated_entity_data = {
**current_entity,
"description": final_description,
"entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_entity.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
# Update entity in vector database
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
# Update entity in vector database (equally critical)
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
entity_content = f"{entity_name}\n{final_description}"
entity_content = f"{entity_name}\n{final_description}"
await entities_vdb.upsert(
{
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": entity_name,
@ -957,7 +958,20 @@ async def _rebuild_single_entity(
"file_path": updated_entity_data["file_path"],
}
}
)
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: entities_vdb.upsert(vdb_data),
operation_name="rebuild_entity_upsert",
entity_name=entity_name,
max_retries=3,
retry_delay=0.1,
)
except Exception as e:
error_msg = f"Failed to update entity storage for `{entity_name}`: {e}"
logger.error(error_msg)
raise # Re-raise exception
# Collect all entity data from relevant chunks
all_entity_data = []
@ -1145,21 +1159,21 @@ async def _rebuild_single_relationship(
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
# Delete old vector records first (both directions to be safe)
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
# Insert new vector record
rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
await relationships_vdb.upsert(
{
# Delete old vector records first (both directions to be safe)
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
# Insert new vector record
rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
vdb_data = {
rel_vdb_id: {
"src_id": src,
"tgt_id": tgt,
@ -1171,7 +1185,20 @@ async def _rebuild_single_relationship(
"file_path": updated_relationship_data["file_path"],
}
}
)
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: relationships_vdb.upsert(vdb_data),
operation_name="rebuild_relationship_upsert",
entity_name=f"{src}-{tgt}",
max_retries=3,
retry_delay=0.2,
)
except Exception as e:
error_msg = f"Failed to rebuild relationship storage for `{src}-{tgt}`: {e}"
logger.error(error_msg)
raise # Re-raise exception
async def _merge_nodes_then_upsert(
@ -1516,27 +1543,68 @@ async def merge_nodes_and_edges(
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if entity_vdb is not None:
data_for_vdb = {
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get("file_path", "unknown_source"),
try:
# Graph database operation (critical path, must succeed)
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
# Vector database operation (equally critical, must succeed)
if entity_vdb is not None and entity_data:
data_for_vdb = {
compute_mdhash_id(
entity_data["entity_name"], prefix="ent-"
): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get(
"file_path", "unknown_source"
),
}
}
}
await entity_vdb.upsert(data_for_vdb)
return entity_data
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: entity_vdb.upsert(data_for_vdb),
operation_name="entity_upsert",
entity_name=entity_name,
max_retries=3,
retry_delay=0.1,
)
return entity_data
except Exception as e:
# Any database operation failure is critical
error_msg = (
f"Critical error in entity processing for `{entity_name}`: {e}"
)
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
try:
if (
pipeline_status is not None
and pipeline_status_lock is not None
):
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
except Exception as status_error:
logger.error(
f"Failed to update pipeline status: {status_error}"
)
# Re-raise the original exception
raise
# Create entity processing tasks
entity_tasks = []
@ -1584,38 +1652,75 @@ async def merge_nodes_and_edges(
namespace=namespace,
enable_logging=False,
):
added_entities = [] # Track entities added during edge processing
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
added_entities, # Pass list to collect added entities
)
try:
added_entities = [] # Track entities added during edge processing
if edge_data is None:
return None, []
# Graph database operation (critical path, must succeed)
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
added_entities, # Pass list to collect added entities
)
if relationships_vdb is not None:
data_for_vdb = {
compute_mdhash_id(
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
): {
"src_id": edge_data["src_id"],
"tgt_id": edge_data["tgt_id"],
"keywords": edge_data["keywords"],
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
"source_id": edge_data["source_id"],
"file_path": edge_data.get("file_path", "unknown_source"),
"weight": edge_data.get("weight", 1.0),
if edge_data is None:
return None, []
# Vector database operation (equally critical, must succeed)
if relationships_vdb is not None:
data_for_vdb = {
compute_mdhash_id(
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
): {
"src_id": edge_data["src_id"],
"tgt_id": edge_data["tgt_id"],
"keywords": edge_data["keywords"],
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
"source_id": edge_data["source_id"],
"file_path": edge_data.get(
"file_path", "unknown_source"
),
"weight": edge_data.get("weight", 1.0),
}
}
}
await relationships_vdb.upsert(data_for_vdb)
return edge_data, added_entities
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: relationships_vdb.upsert(data_for_vdb),
operation_name="relationship_upsert",
entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}",
max_retries=3,
retry_delay=0.1,
)
return edge_data, added_entities
except Exception as e:
# Any database operation failure is critical
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
try:
if (
pipeline_status is not None
and pipeline_status_lock is not None
):
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
except Exception as status_error:
logger.error(
f"Failed to update pipeline status: {status_error}"
)
# Re-raise the original exception
raise
# Create relationship processing tasks
edge_tasks = []

View file

@ -14,7 +14,7 @@ from dataclasses import dataclass
from datetime import datetime
from functools import wraps
from hashlib import md5
from typing import Any, Protocol, Callable, TYPE_CHECKING, List
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional
import numpy as np
from dotenv import load_dotenv
@ -57,6 +57,53 @@ except ImportError:
)
async def safe_vdb_operation_with_exception(
operation: Callable,
operation_name: str,
entity_name: str = "",
max_retries: int = 3,
retry_delay: float = 0.2,
logger_func: Optional[Callable] = None
) -> None:
"""
Safely execute vector database operations with retry mechanism and exception handling.
This function ensures that VDB operations are executed with proper error handling
and retry logic. If all retries fail, it raises an exception to maintain data consistency.
Args:
operation: The async operation to execute
operation_name: Operation name for logging purposes
entity_name: Entity name for logging purposes
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
logger_func: Logger function to use for error messages
Raises:
Exception: When operation fails after all retry attempts
"""
log_func = logger_func or logger.warning
last_exception = None
for attempt in range(max_retries):
try:
await operation()
return # Success, return immediately
except Exception as e:
last_exception = e
if attempt == max_retries - 1:
error_msg = f"VDB {operation_name} failed for {entity_name} after {max_retries} attempts: {e}"
log_func(error_msg)
raise Exception(error_msg) from e
else:
log_func(f"VDB {operation_name} attempt {attempt + 1} failed for {entity_name}: {e}, retrying...")
if retry_delay > 0:
await asyncio.sleep(retry_delay)
# This line should theoretically never be reached, but included for safety
raise Exception(f"Max retries exceeded for {operation_name}") from last_exception
def get_env_value(
env_key: str, default: any, value_type: type = str, special_none: bool = False
) -> any: