Fix: Implemented entity-keyed locks for edge merging operations to ensure robust race condition protection

- Replacing string concatenation with direct list passing for lock keys
- Eliminating deadlock risks by removing the lock around node insertion within the edge merge
This commit is contained in:
yangdx 2025-07-19 11:48:19 +08:00
parent aea18d7b80
commit 2d3a530ce8

View file

@ -432,7 +432,7 @@ async def _rebuild_knowledge_from_chunks(
# Sort src and tgt to ensure order-independent lock key generation # Sort src and tgt to ensure order-independent lock key generation
sorted_key_parts = sorted([src, tgt]) sorted_key_parts = sorted([src, tgt])
async with get_storage_keyed_lock( async with get_storage_keyed_lock(
f"{sorted_key_parts[0]}-{sorted_key_parts[1]}", sorted_key_parts,
namespace=namespace, namespace=namespace,
enable_logging=False, enable_logging=False,
): ):
@ -1115,23 +1115,18 @@ async def _merge_edges_then_upsert(
) )
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
workspace = global_config.get("workspace", "") if not (await knowledge_graph_inst.has_node(need_insert_id)):
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" await knowledge_graph_inst.upsert_node(
async with get_storage_keyed_lock( need_insert_id,
[need_insert_id], namespace=namespace, enable_logging=False node_data={
): "entity_id": need_insert_id,
if not (await knowledge_graph_inst.has_node(need_insert_id)): "source_id": source_id,
await knowledge_graph_inst.upsert_node( "description": description,
need_insert_id, "entity_type": "UNKNOWN",
node_data={ "file_path": file_path,
"entity_id": need_insert_id, "created_at": int(time.time()),
"source_id": source_id, },
"description": description, )
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
@ -1287,7 +1282,7 @@ async def merge_nodes_and_edges(
sorted_edge_key = sorted([edge_key[0], edge_key[1]]) sorted_edge_key = sorted([edge_key[0], edge_key[1]])
logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}") logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}")
async with get_storage_keyed_lock( async with get_storage_keyed_lock(
f"{sorted_edge_key[0]}-{sorted_edge_key[1]}", sorted_edge_key,
namespace=namespace, namespace=namespace,
enable_logging=False, enable_logging=False,
): ):