Merge pull request #1811 from danielaskdd/fix-keyed-lock

Fix: implemented entity-keyed locks for edge merging operations to ensure robust race condition protection
This commit is contained in:
Daniel.y 2025-07-19 12:22:49 +08:00 committed by GitHub
commit 0171e0ce20
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -432,7 +432,7 @@ async def _rebuild_knowledge_from_chunks(
# Sort src and tgt to ensure order-independent lock key generation
sorted_key_parts = sorted([src, tgt])
async with get_storage_keyed_lock(
f"{sorted_key_parts[0]}-{sorted_key_parts[1]}",
sorted_key_parts,
namespace=namespace,
enable_logging=False,
):
@ -1115,23 +1115,18 @@ async def _merge_edges_then_upsert(
)
for need_insert_id in [src_id, tgt_id]:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[need_insert_id], namespace=namespace, enable_logging=False
):
if not (await knowledge_graph_inst.has_node(need_insert_id)):
await knowledge_graph_inst.upsert_node(
need_insert_id,
node_data={
"entity_id": need_insert_id,
"source_id": source_id,
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
if not (await knowledge_graph_inst.has_node(need_insert_id)):
await knowledge_graph_inst.upsert_node(
need_insert_id,
node_data={
"entity_id": need_insert_id,
"source_id": source_id,
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
@ -1287,7 +1282,7 @@ async def merge_nodes_and_edges(
sorted_edge_key = sorted([edge_key[0], edge_key[1]])
# logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}")
async with get_storage_keyed_lock(
f"{sorted_edge_key[0]}-{sorted_edge_key[1]}",
sorted_edge_key,
namespace=namespace,
enable_logging=False,
):