Replace global graph DB lock with fine-grained keyed locking

• Use entity/relation-specific locks
• Lock multiple entities when needed
This commit is contained in:
yangdx 2025-10-27 02:55:58 +08:00
parent 2c09adb8d3
commit 8dfd3bf428

View file

@ -5,7 +5,7 @@ import asyncio
from typing import Any, cast
from .base import DeletionResult
from .kg.shared_storage import get_graph_db_lock
from .kg.shared_storage import get_storage_keyed_lock
from .constants import GRAPH_FIELD_SEP
from .utils import compute_mdhash_id, logger
from .base import StorageNameSpace
@ -74,9 +74,12 @@ async def adelete_by_entity(
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
# Use keyed lock for entity to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
# Check if the entity exists
if not await chunk_entity_relation_graph.has_node(entity_name):
@ -167,14 +170,18 @@ async def adelete_by_relation(
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
"""
relation_str = f"{source_entity} -> {target_entity}"
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Use keyed lock for relation to ensure atomic graph and vector db operations
workspace = relationships_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([source_entity, target_entity])
async with get_storage_keyed_lock(
sorted_edge_key, namespace=namespace, enable_logging=False
):
try:
# Check if the relation exists
edge_exists = await chunk_entity_relation_graph.has_edge(
source_entity, target_entity
@ -267,9 +274,19 @@ async def aedit_entity(
Returns:
Dictionary containing updated entity information
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
# Determine entities to lock
new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name
# Lock both original and new entity names if renaming
lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name]
# Use keyed lock for entity to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False
):
try:
# Save original entity name for chunk tracking updates
original_entity_name = entity_name
@ -280,10 +297,6 @@ async def aedit_entity(
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await chunk_entity_relation_graph.get_node(entity_name)
# Check if entity is being renamed
new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name
# If renaming, check if new name already exists
if is_renaming:
if not allow_rename:
@ -619,14 +632,18 @@ async def aedit_relation(
Returns:
Dictionary containing updated relation information
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Use keyed lock for relation to ensure atomic graph and vector db operations
workspace = relationships_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([source_entity, target_entity])
async with get_storage_keyed_lock(
sorted_edge_key, namespace=namespace, enable_logging=False
):
try:
# 1. Get current relation information
edge_exists = await chunk_entity_relation_graph.has_edge(
source_entity, target_entity
@ -799,9 +816,12 @@ async def acreate_entity(
Returns:
Dictionary containing created entity information
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
# Use keyed lock for entity to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
# Check if entity already exists
existing_node = await chunk_entity_relation_graph.has_node(entity_name)
@ -910,9 +930,13 @@ async def acreate_relation(
Returns:
Dictionary containing created relation information
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
# Use keyed lock for relation to ensure atomic graph and vector db operations
workspace = relationships_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([source_entity, target_entity])
async with get_storage_keyed_lock(
sorted_edge_key, namespace=namespace, enable_logging=False
):
try:
# Check if both entities exist
source_exists = await chunk_entity_relation_graph.has_node(source_entity)
@ -1063,9 +1087,17 @@ async def amerge_entities(
Returns:
Dictionary containing the merged entity information
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
# Collect all entities involved (source + target) and lock them all in sorted order
all_entities = set(source_entities)
all_entities.add(target_entity)
lock_keys = sorted(all_entities)
# Use keyed lock for all entities to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False
):
try:
# Default merge strategy for entities
default_entity_merge_strategy = {