From f13d30206f4fdd33cea446a8dc65104e52dc4968 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 00:52:56 +0800 Subject: [PATCH] Fix relation deduplication logic and standardize log message prefixes (cherry picked from commit a25003c3369f3f81553fb78b36c96b99197b2500) --- lightrag/utils_graph.py | 164 ++++++---------------------------------- 1 file changed, 22 insertions(+), 142 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 5f7d5ac3..58876af5 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1038,14 +1038,11 @@ async def amerge_entities( target_entity: str, merge_strategy: dict[str, str] = None, target_entity_data: dict[str, Any] = None, - entity_chunks_storage=None, - relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously merge multiple entities into one entity. Merges multiple source entities into a target entity, handling all relationships, and updating both the knowledge graph and vector database. - Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage. Args: chunk_entity_relation_graph: Graph storage instance @@ -1057,8 +1054,6 @@ async def amerge_entities( customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} - entity_chunks_storage: Optional KV storage for tracking chunks that reference entities - relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing the merged entity information @@ -1123,8 +1118,8 @@ async def amerge_entities( all_relations = [] entities_to_collect = source_entities.copy() - # If target entity exists and not already in source_entities, add it - if target_exists and target_entity not in source_entities: + # If target entity exists, also collect its relationships for merging + if target_exists: entities_to_collect.append(target_entity) for entity_name in entities_to_collect: @@ -1153,25 +1148,12 @@ async def amerge_entities( logger.info(f"Entity Merge: Updated target '{target_entity}'") # 6. Recreate all relations pointing to the target entity in KG - # Also collect chunk tracking information in the same loop relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] - # Initialize chunk tracking variables - relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids - old_relation_keys_to_delete = [] - for src, tgt, edge_data in all_relations: relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - - # Collect old chunk tracking key for deletion - if relation_chunks_storage is not None: - from .utils import make_relation_chunk_key - - old_storage_key = make_relation_chunk_key(src, tgt) - old_relation_keys_to_delete.append(old_storage_key) - new_src = target_entity if src in source_entities else src new_tgt = target_entity if tgt in source_entities else tgt @@ -1186,34 +1168,6 @@ async def amerge_entities( normalized_src, normalized_tgt = sorted([new_src, new_tgt]) relation_key = f"{normalized_src}|{normalized_tgt}" - # Process chunk tracking for this relation - if relation_chunks_storage is not None: - storage_key = make_relation_chunk_key( - normalized_src, normalized_tgt - ) - - # Get chunk_ids from storage for this original relation - stored = await relation_chunks_storage.get_by_id(old_storage_key) - - if stored is not None and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - else: - # Fallback to source_id from graph - source_id = edge_data.get("source_id", "") - chunk_ids = [ - cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid - ] - - # Accumulate chunk_ids with ordered deduplication - if storage_key not in relation_chunk_tracking: - relation_chunk_tracking[storage_key] = [] - - existing_chunks = set(relation_chunk_tracking[storage_key]) - for chunk_id in chunk_ids: - if chunk_id not in existing_chunks: - existing_chunks.add(chunk_id) - relation_chunk_tracking[storage_key].append(chunk_id) - if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1250,25 +1204,26 @@ async def amerge_entities( f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" ) - # Update relation chunk tracking storage - if relation_chunks_storage is not None and all_relations: - if old_relation_keys_to_delete: - await relation_chunks_storage.delete(old_relation_keys_to_delete) + # 7. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description - if relation_chunk_tracking: - updates = {} - for storage_key, chunk_ids in relation_chunk_tracking.items(): - updates[storage_key] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") - await relation_chunks_storage.upsert(updates) - logger.info( - f"Entity Merge: merged chunk tracking for {len(updates)} relations" - ) - - # 7. Update relationship vector representations + # 8. Update relationship vector representations logger.info( f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" ) @@ -1307,80 +1262,7 @@ async def amerge_entities( f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" ) - # 8. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") - - # 9. Merge entity chunk tracking (source entities first, then target entity) - if entity_chunks_storage is not None: - all_chunk_id_lists = [] - - # Build list of entities to process (source entities first, then target entity) - entities_to_process = [] - - # Add source entities first (excluding target if it's already in source list) - for entity_name in source_entities: - if entity_name != target_entity: - entities_to_process.append(entity_name) - - # Add target entity last (if it exists) - if target_exists: - entities_to_process.append(target_entity) - - # Process all entities in order with unified logic - for entity_name in entities_to_process: - stored = await entity_chunks_storage.get_by_id(entity_name) - if stored and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - if chunk_ids: - all_chunk_id_lists.append(chunk_ids) - - # Merge chunk_ids with ordered deduplication (preserves order, source entities first) - merged_chunk_ids = [] - seen = set() - for chunk_id_list in all_chunk_id_lists: - for chunk_id in chunk_id_list: - if chunk_id not in seen: - seen.add(chunk_id) - merged_chunk_ids.append(chunk_id) - - # Delete source entities' chunk tracking records - entity_keys_to_delete = [ - e for e in source_entities if e != target_entity - ] - if entity_keys_to_delete: - await entity_chunks_storage.delete(entity_keys_to_delete) - - # Update target entity's chunk tracking - if merged_chunk_ids: - await entity_chunks_storage.upsert( - { - target_entity: { - "chunk_ids": merged_chunk_ids, - "count": len(merged_chunk_ids), - } - } - ) - logger.info( - f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" - ) - - # 10. Delete source entities + # 9. Delete source entities for entity_name in source_entities: if entity_name == target_entity: logger.warning( @@ -1397,13 +1279,11 @@ async def amerge_entities( entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - # 11. Save changes + # 10. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, relationships_vdb=relationships_vdb, chunk_entity_relation_graph=chunk_entity_relation_graph, - entity_chunks_storage=entity_chunks_storage, - relation_chunks_storage=relation_chunks_storage, ) logger.info(