From 2d3a530ce8f1d82cd081e9236522b892d674b57c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 19 Jul 2025 11:48:19 +0800 Subject: [PATCH] Fix: Implemented entity-keyed locks for edge merging operations to ensure robust race condition protection - Replacing string concatenation with direct list passing for lock keys - Eliminating deadlock risks by removing the lock around node insertion within the edge merge --- lightrag/operate.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index d978d77a..f6355c3a 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -432,7 +432,7 @@ async def _rebuild_knowledge_from_chunks( # Sort src and tgt to ensure order-independent lock key generation sorted_key_parts = sorted([src, tgt]) async with get_storage_keyed_lock( - f"{sorted_key_parts[0]}-{sorted_key_parts[1]}", + sorted_key_parts, namespace=namespace, enable_logging=False, ): @@ -1115,23 +1115,18 @@ async def _merge_edges_then_upsert( ) for need_insert_id in [src_id, tgt_id]: - workspace = global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - async with get_storage_keyed_lock( - [need_insert_id], namespace=namespace, enable_logging=False - ): - if not (await knowledge_graph_inst.has_node(need_insert_id)): - await knowledge_graph_inst.upsert_node( - need_insert_id, - node_data={ - "entity_id": need_insert_id, - "source_id": source_id, - "description": description, - "entity_type": "UNKNOWN", - "file_path": file_path, - "created_at": int(time.time()), - }, - ) + if not (await knowledge_graph_inst.has_node(need_insert_id)): + await knowledge_graph_inst.upsert_node( + need_insert_id, + node_data={ + "entity_id": need_insert_id, + "source_id": source_id, + "description": description, + "entity_type": "UNKNOWN", + "file_path": file_path, + "created_at": int(time.time()), + }, + ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] @@ -1287,7 +1282,7 @@ async def merge_nodes_and_edges( sorted_edge_key = sorted([edge_key[0], edge_key[1]]) logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}") async with get_storage_keyed_lock( - f"{sorted_edge_key[0]}-{sorted_edge_key[1]}", + sorted_edge_key, namespace=namespace, enable_logging=False, ):