From a25003c3369f3f81553fb78b36c96b99197b2500 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 00:52:56 +0800 Subject: [PATCH] Fix relation deduplication logic and standardize log message prefixes --- lightrag/utils_graph.py | 72 ++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index da7da5a9..58876af5 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1072,8 +1072,7 @@ async def amerge_entities( effective_entity_merge_strategy = default_entity_merge_strategy if merge_strategy: logger.warning( - "merge_strategy parameter is deprecated and will be ignored in a future " - "release. Provided overrides will be applied for now." + "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." ) effective_entity_merge_strategy = { **default_entity_merge_strategy, @@ -1100,7 +1099,7 @@ async def amerge_entities( await chunk_entity_relation_graph.get_node(target_entity) ) logger.info( - f"Target entity '{target_entity}' already exists, will merge data" + "Entity Merge: target entity already exists, source and target entities will be merged" ) # 3. Merge entity data @@ -1118,11 +1117,11 @@ async def amerge_entities( # 4. Get all relationships of the source entities and target entity (if exists) all_relations = [] entities_to_collect = source_entities.copy() - + # If target entity exists, also collect its relationships for merging if target_exists: entities_to_collect.append(target_entity) - + for entity_name in entities_to_collect: # Get all relationships of the entities edges = await chunk_entity_relation_graph.get_node_edges(entity_name) @@ -1141,14 +1140,14 @@ async def amerge_entities( await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Created new target entity '{target_entity}'") + logger.info(f"Entity Merge: created target '{target_entity}'") else: await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Updated existing target entity '{target_entity}'") + logger.info(f"Entity Merge: Updated target '{target_entity}'") - # 6. Recreate all relationships, pointing to the target entity + # 6. Recreate all relations pointing to the target entity in KG relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] @@ -1161,12 +1160,14 @@ async def amerge_entities( # Skip relationships between source entities to avoid self-loops if new_src == new_tgt: logger.info( - f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" + f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" ) continue - # Check if the same relationship already exists - relation_key = f"{new_src}|{new_tgt}" + # Normalize entity order for consistent duplicate detection (undirected relationships) + normalized_src, normalized_tgt = sorted([new_src, new_tgt]) + relation_key = f"{normalized_src}|{normalized_tgt}" + if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1183,28 +1184,24 @@ async def amerge_entities( ) relation_updates[relation_key]["data"] = merged_relation logger.info( - f"Merged duplicate relationship: {new_src} -> {new_tgt}" + f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" ) else: relation_updates[relation_key] = { - "src": new_src, - "tgt": new_tgt, + "graph_src": new_src, + "graph_tgt": new_tgt, + "norm_src": normalized_src, + "norm_tgt": normalized_tgt, "data": edge_data.copy(), } # Apply relationship updates for rel_data in relation_updates.values(): await chunk_entity_relation_graph.upsert_edge( - rel_data["src"], rel_data["tgt"], rel_data["data"] + rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] ) logger.info( - f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" - ) - - # Delete relationships records from vector database - await relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity from vector database" + f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" ) # 7. Update entity vector representation @@ -1223,17 +1220,18 @@ async def amerge_entities( "entity_type": entity_type, } } - await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") # 8. Update relationship vector representations + logger.info( + f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" + ) + await relationships_vdb.delete(relations_to_delete) for rel_data in relation_updates.values(): - src = rel_data["src"] - tgt = rel_data["tgt"] edge_data = rel_data["data"] - - # Normalize entity order for consistent vector storage - normalized_src, normalized_tgt = sorted([src, tgt]) + normalized_src = rel_data["norm_src"] + normalized_tgt = rel_data["norm_tgt"] description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -1259,28 +1257,28 @@ async def amerge_entities( "weight": weight, } } - await relationships_vdb.upsert(relation_data_for_vdb) + logger.info( + f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" + ) # 9. Delete source entities for entity_name in source_entities: if entity_name == target_entity: - logger.info( - f"Skipping deletion of '{entity_name}' as it's also the target entity" + logger.warning( + f"Entity Merge: source entity'{entity_name}' is same as target entity" ) continue - # Delete entity node from knowledge graph + logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") + + # Delete entity node and related edges from knowledge graph await chunk_entity_relation_graph.delete_node(entity_name) # Delete entity record from vector database entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - logger.info( - f"Deleted source entity '{entity_name}' and its vector embedding from database" - ) - # 10. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, @@ -1289,7 +1287,7 @@ async def amerge_entities( ) logger.info( - f"Successfully merged {len(source_entities)} entities into '{target_entity}'" + f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" ) return await get_entity_info( chunk_entity_relation_graph,