Fix relation deduplication logic and standardize log message prefixes

This commit is contained in:
yangdx 2025-10-27 00:52:56 +08:00
parent ab32456a79
commit a25003c336

View file

@ -1072,8 +1072,7 @@ async def amerge_entities(
effective_entity_merge_strategy = default_entity_merge_strategy effective_entity_merge_strategy = default_entity_merge_strategy
if merge_strategy: if merge_strategy:
logger.warning( logger.warning(
"merge_strategy parameter is deprecated and will be ignored in a future " "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release."
"release. Provided overrides will be applied for now."
) )
effective_entity_merge_strategy = { effective_entity_merge_strategy = {
**default_entity_merge_strategy, **default_entity_merge_strategy,
@ -1100,7 +1099,7 @@ async def amerge_entities(
await chunk_entity_relation_graph.get_node(target_entity) await chunk_entity_relation_graph.get_node(target_entity)
) )
logger.info( logger.info(
f"Target entity '{target_entity}' already exists, will merge data" "Entity Merge: target entity already exists, source and target entities will be merged"
) )
# 3. Merge entity data # 3. Merge entity data
@ -1118,11 +1117,11 @@ async def amerge_entities(
# 4. Get all relationships of the source entities and target entity (if exists) # 4. Get all relationships of the source entities and target entity (if exists)
all_relations = [] all_relations = []
entities_to_collect = source_entities.copy() entities_to_collect = source_entities.copy()
# If target entity exists, also collect its relationships for merging # If target entity exists, also collect its relationships for merging
if target_exists: if target_exists:
entities_to_collect.append(target_entity) entities_to_collect.append(target_entity)
for entity_name in entities_to_collect: for entity_name in entities_to_collect:
# Get all relationships of the entities # Get all relationships of the entities
edges = await chunk_entity_relation_graph.get_node_edges(entity_name) edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
@ -1141,14 +1140,14 @@ async def amerge_entities(
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data target_entity, merged_entity_data
) )
logger.info(f"Created new target entity '{target_entity}'") logger.info(f"Entity Merge: created target '{target_entity}'")
else: else:
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data target_entity, merged_entity_data
) )
logger.info(f"Updated existing target entity '{target_entity}'") logger.info(f"Entity Merge: Updated target '{target_entity}'")
# 6. Recreate all relationships, pointing to the target entity # 6. Recreate all relations pointing to the target entity in KG
relation_updates = {} # Track relationships that need to be merged relation_updates = {} # Track relationships that need to be merged
relations_to_delete = [] relations_to_delete = []
@ -1161,12 +1160,14 @@ async def amerge_entities(
# Skip relationships between source entities to avoid self-loops # Skip relationships between source entities to avoid self-loops
if new_src == new_tgt: if new_src == new_tgt:
logger.info( logger.info(
f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop"
) )
continue continue
# Check if the same relationship already exists # Normalize entity order for consistent duplicate detection (undirected relationships)
relation_key = f"{new_src}|{new_tgt}" normalized_src, normalized_tgt = sorted([new_src, new_tgt])
relation_key = f"{normalized_src}|{normalized_tgt}"
if relation_key in relation_updates: if relation_key in relation_updates:
# Merge relationship data # Merge relationship data
existing_data = relation_updates[relation_key]["data"] existing_data = relation_updates[relation_key]["data"]
@ -1183,28 +1184,24 @@ async def amerge_entities(
) )
relation_updates[relation_key]["data"] = merged_relation relation_updates[relation_key]["data"] = merged_relation
logger.info( logger.info(
f"Merged duplicate relationship: {new_src} -> {new_tgt}" f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`"
) )
else: else:
relation_updates[relation_key] = { relation_updates[relation_key] = {
"src": new_src, "graph_src": new_src,
"tgt": new_tgt, "graph_tgt": new_tgt,
"norm_src": normalized_src,
"norm_tgt": normalized_tgt,
"data": edge_data.copy(), "data": edge_data.copy(),
} }
# Apply relationship updates # Apply relationship updates
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
await chunk_entity_relation_graph.upsert_edge( await chunk_entity_relation_graph.upsert_edge(
rel_data["src"], rel_data["tgt"], rel_data["data"] rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"]
) )
logger.info( logger.info(
f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`"
)
# Delete relationships records from vector database
await relationships_vdb.delete(relations_to_delete)
logger.info(
f"Deleted {len(relations_to_delete)} relation records for entity from vector database"
) )
# 7. Update entity vector representation # 7. Update entity vector representation
@ -1223,17 +1220,18 @@ async def amerge_entities(
"entity_type": entity_type, "entity_type": entity_type,
} }
} }
await entities_vdb.upsert(entity_data_for_vdb) await entities_vdb.upsert(entity_data_for_vdb)
logger.info(f"Entity Merge: updating vdb `{target_entity}`")
# 8. Update relationship vector representations # 8. Update relationship vector representations
logger.info(
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
)
await relationships_vdb.delete(relations_to_delete)
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
src = rel_data["src"]
tgt = rel_data["tgt"]
edge_data = rel_data["data"] edge_data = rel_data["data"]
normalized_src = rel_data["norm_src"]
# Normalize entity order for consistent vector storage normalized_tgt = rel_data["norm_tgt"]
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "") description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "") keywords = edge_data.get("keywords", "")
@ -1259,28 +1257,28 @@ async def amerge_entities(
"weight": weight, "weight": weight,
} }
} }
await relationships_vdb.upsert(relation_data_for_vdb) await relationships_vdb.upsert(relation_data_for_vdb)
logger.info(
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
)
# 9. Delete source entities # 9. Delete source entities
for entity_name in source_entities: for entity_name in source_entities:
if entity_name == target_entity: if entity_name == target_entity:
logger.info( logger.warning(
f"Skipping deletion of '{entity_name}' as it's also the target entity" f"Entity Merge: source entity'{entity_name}' is same as target entity"
) )
continue continue
# Delete entity node from knowledge graph logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb")
# Delete entity node and related edges from knowledge graph
await chunk_entity_relation_graph.delete_node(entity_name) await chunk_entity_relation_graph.delete_node(entity_name)
# Delete entity record from vector database # Delete entity record from vector database
entity_id = compute_mdhash_id(entity_name, prefix="ent-") entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([entity_id]) await entities_vdb.delete([entity_id])
logger.info(
f"Deleted source entity '{entity_name}' and its vector embedding from database"
)
# 10. Save changes # 10. Save changes
await _persist_graph_updates( await _persist_graph_updates(
entities_vdb=entities_vdb, entities_vdb=entities_vdb,
@ -1289,7 +1287,7 @@ async def amerge_entities(
) )
logger.info( logger.info(
f"Successfully merged {len(source_entities)} entities into '{target_entity}'" f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'"
) )
return await get_entity_info( return await get_entity_info(
chunk_entity_relation_graph, chunk_entity_relation_graph,