Replace global graph DB lock with fine-grained keyed locking
• Use entity/relation-specific locks • Lock multiple entities when needed
This commit is contained in:
parent
2c09adb8d3
commit
8dfd3bf428
1 changed files with 66 additions and 34 deletions
|
|
@ -5,7 +5,7 @@ import asyncio
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
from .base import DeletionResult
|
from .base import DeletionResult
|
||||||
from .kg.shared_storage import get_graph_db_lock
|
from .kg.shared_storage import get_storage_keyed_lock
|
||||||
from .constants import GRAPH_FIELD_SEP
|
from .constants import GRAPH_FIELD_SEP
|
||||||
from .utils import compute_mdhash_id, logger
|
from .utils import compute_mdhash_id, logger
|
||||||
from .base import StorageNameSpace
|
from .base import StorageNameSpace
|
||||||
|
|
@ -74,9 +74,12 @@ async def adelete_by_entity(
|
||||||
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
||||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
workspace = entities_vdb.global_config.get("workspace", "")
|
||||||
async with graph_db_lock:
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
[entity_name], namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
# Check if the entity exists
|
# Check if the entity exists
|
||||||
if not await chunk_entity_relation_graph.has_node(entity_name):
|
if not await chunk_entity_relation_graph.has_node(entity_name):
|
||||||
|
|
@ -167,14 +170,18 @@ async def adelete_by_relation(
|
||||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
||||||
"""
|
"""
|
||||||
relation_str = f"{source_entity} -> {target_entity}"
|
relation_str = f"{source_entity} -> {target_entity}"
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Normalize entity order for undirected graph (ensures consistent key generation)
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
if source_entity > target_entity:
|
||||||
async with graph_db_lock:
|
source_entity, target_entity = target_entity, source_entity
|
||||||
try:
|
|
||||||
# Normalize entity order for undirected graph (ensures consistent key generation)
|
|
||||||
if source_entity > target_entity:
|
|
||||||
source_entity, target_entity = target_entity, source_entity
|
|
||||||
|
|
||||||
|
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
||||||
|
workspace = relationships_vdb.global_config.get("workspace", "")
|
||||||
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
sorted_edge_key = sorted([source_entity, target_entity])
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
sorted_edge_key, namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
|
try:
|
||||||
# Check if the relation exists
|
# Check if the relation exists
|
||||||
edge_exists = await chunk_entity_relation_graph.has_edge(
|
edge_exists = await chunk_entity_relation_graph.has_edge(
|
||||||
source_entity, target_entity
|
source_entity, target_entity
|
||||||
|
|
@ -267,9 +274,19 @@ async def aedit_entity(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing updated entity information
|
Dictionary containing updated entity information
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Determine entities to lock
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
new_entity_name = updated_data.get("entity_name", entity_name)
|
||||||
async with graph_db_lock:
|
is_renaming = new_entity_name != entity_name
|
||||||
|
|
||||||
|
# Lock both original and new entity names if renaming
|
||||||
|
lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name]
|
||||||
|
|
||||||
|
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
||||||
|
workspace = entities_vdb.global_config.get("workspace", "")
|
||||||
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
lock_keys, namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
# Save original entity name for chunk tracking updates
|
# Save original entity name for chunk tracking updates
|
||||||
original_entity_name = entity_name
|
original_entity_name = entity_name
|
||||||
|
|
@ -280,10 +297,6 @@ async def aedit_entity(
|
||||||
raise ValueError(f"Entity '{entity_name}' does not exist")
|
raise ValueError(f"Entity '{entity_name}' does not exist")
|
||||||
node_data = await chunk_entity_relation_graph.get_node(entity_name)
|
node_data = await chunk_entity_relation_graph.get_node(entity_name)
|
||||||
|
|
||||||
# Check if entity is being renamed
|
|
||||||
new_entity_name = updated_data.get("entity_name", entity_name)
|
|
||||||
is_renaming = new_entity_name != entity_name
|
|
||||||
|
|
||||||
# If renaming, check if new name already exists
|
# If renaming, check if new name already exists
|
||||||
if is_renaming:
|
if is_renaming:
|
||||||
if not allow_rename:
|
if not allow_rename:
|
||||||
|
|
@ -619,14 +632,18 @@ async def aedit_relation(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing updated relation information
|
Dictionary containing updated relation information
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Normalize entity order for undirected graph (ensures consistent key generation)
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
if source_entity > target_entity:
|
||||||
async with graph_db_lock:
|
source_entity, target_entity = target_entity, source_entity
|
||||||
try:
|
|
||||||
# Normalize entity order for undirected graph (ensures consistent key generation)
|
|
||||||
if source_entity > target_entity:
|
|
||||||
source_entity, target_entity = target_entity, source_entity
|
|
||||||
|
|
||||||
|
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
||||||
|
workspace = relationships_vdb.global_config.get("workspace", "")
|
||||||
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
sorted_edge_key = sorted([source_entity, target_entity])
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
sorted_edge_key, namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
|
try:
|
||||||
# 1. Get current relation information
|
# 1. Get current relation information
|
||||||
edge_exists = await chunk_entity_relation_graph.has_edge(
|
edge_exists = await chunk_entity_relation_graph.has_edge(
|
||||||
source_entity, target_entity
|
source_entity, target_entity
|
||||||
|
|
@ -799,9 +816,12 @@ async def acreate_entity(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing created entity information
|
Dictionary containing created entity information
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
workspace = entities_vdb.global_config.get("workspace", "")
|
||||||
async with graph_db_lock:
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
[entity_name], namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
# Check if entity already exists
|
# Check if entity already exists
|
||||||
existing_node = await chunk_entity_relation_graph.has_node(entity_name)
|
existing_node = await chunk_entity_relation_graph.has_node(entity_name)
|
||||||
|
|
@ -910,9 +930,13 @@ async def acreate_relation(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing created relation information
|
Dictionary containing created relation information
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
workspace = relationships_vdb.global_config.get("workspace", "")
|
||||||
async with graph_db_lock:
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
sorted_edge_key = sorted([source_entity, target_entity])
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
sorted_edge_key, namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
# Check if both entities exist
|
# Check if both entities exist
|
||||||
source_exists = await chunk_entity_relation_graph.has_node(source_entity)
|
source_exists = await chunk_entity_relation_graph.has_node(source_entity)
|
||||||
|
|
@ -1063,9 +1087,17 @@ async def amerge_entities(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing the merged entity information
|
Dictionary containing the merged entity information
|
||||||
"""
|
"""
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
# Collect all entities involved (source + target) and lock them all in sorted order
|
||||||
# Use graph database lock to ensure atomic graph and vector db operations
|
all_entities = set(source_entities)
|
||||||
async with graph_db_lock:
|
all_entities.add(target_entity)
|
||||||
|
lock_keys = sorted(all_entities)
|
||||||
|
|
||||||
|
# Use keyed lock for all entities to ensure atomic graph and vector db operations
|
||||||
|
workspace = entities_vdb.global_config.get("workspace", "")
|
||||||
|
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
||||||
|
async with get_storage_keyed_lock(
|
||||||
|
lock_keys, namespace=namespace, enable_logging=False
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
# Default merge strategy for entities
|
# Default merge strategy for entities
|
||||||
default_entity_merge_strategy = {
|
default_entity_merge_strategy = {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue