From 8dfd3bf4285c34cf578888e40ad11f6bdc0f37f8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 02:55:58 +0800 Subject: [PATCH] Replace global graph DB lock with fine-grained keyed locking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Use entity/relation-specific locks • Lock multiple entities when needed --- lightrag/utils_graph.py | 100 ++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 34 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 5f7d5ac3..d782059d 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -5,7 +5,7 @@ import asyncio from typing import Any, cast from .base import DeletionResult -from .kg.shared_storage import get_graph_db_lock +from .kg.shared_storage import get_storage_keyed_lock from .constants import GRAPH_FIELD_SEP from .utils import compute_mdhash_id, logger from .base import StorageNameSpace @@ -74,9 +74,12 @@ async def adelete_by_entity( entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + [entity_name], namespace=namespace, enable_logging=False + ): try: # Check if the entity exists if not await chunk_entity_relation_graph.has_node(entity_name): @@ -167,14 +170,18 @@ async def adelete_by_relation( relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): + try: # Check if the relation exists edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -267,9 +274,19 @@ async def aedit_entity( Returns: Dictionary containing updated entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Determine entities to lock + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + # Lock both original and new entity names if renaming + lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name] + + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + lock_keys, namespace=namespace, enable_logging=False + ): try: # Save original entity name for chunk tracking updates original_entity_name = entity_name @@ -280,10 +297,6 @@ async def aedit_entity( raise ValueError(f"Entity '{entity_name}' does not exist") node_data = await chunk_entity_relation_graph.get_node(entity_name) - # Check if entity is being renamed - new_entity_name = updated_data.get("entity_name", entity_name) - is_renaming = new_entity_name != entity_name - # If renaming, check if new name already exists if is_renaming: if not allow_rename: @@ -619,14 +632,18 @@ async def aedit_relation( Returns: Dictionary containing updated relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): + try: # 1. Get current relation information edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -799,9 +816,12 @@ async def acreate_entity( Returns: Dictionary containing created entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + [entity_name], namespace=namespace, enable_logging=False + ): try: # Check if entity already exists existing_node = await chunk_entity_relation_graph.has_node(entity_name) @@ -910,9 +930,13 @@ async def acreate_relation( Returns: Dictionary containing created relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): try: # Check if both entities exist source_exists = await chunk_entity_relation_graph.has_node(source_entity) @@ -1063,9 +1087,17 @@ async def amerge_entities( Returns: Dictionary containing the merged entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Collect all entities involved (source + target) and lock them all in sorted order + all_entities = set(source_entities) + all_entities.add(target_entity) + lock_keys = sorted(all_entities) + + # Use keyed lock for all entities to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + lock_keys, namespace=namespace, enable_logging=False + ): try: # Default merge strategy for entities default_entity_merge_strategy = {