From 2c09adb8d3250585610adc585f57ad10ab189ce1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 02:06:21 +0800 Subject: [PATCH] Add chunk tracking support to entity merge functionality - Pass chunk storages to merge function - Merge relation chunk tracking data - Merge entity chunk tracking data - Delete old chunk tracking records - Persist chunk storage updates --- lightrag/lightrag.py | 2 + lightrag/utils_graph.py | 164 ++++++++++++++++++++++++++++++++++------ 2 files changed, 144 insertions(+), 22 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ced5f40e..bdc94b2c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3750,6 +3750,8 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, + self.entity_chunks, + self.relation_chunks, ) def merge_entities( diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 58876af5..5f7d5ac3 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1038,11 +1038,14 @@ async def amerge_entities( target_entity: str, merge_strategy: dict[str, str] = None, target_entity_data: dict[str, Any] = None, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously merge multiple entities into one entity. Merges multiple source entities into a target entity, handling all relationships, and updating both the knowledge graph and vector database. + Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage. Args: chunk_entity_relation_graph: Graph storage instance @@ -1054,6 +1057,8 @@ async def amerge_entities( customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} + entity_chunks_storage: Optional KV storage for tracking chunks that reference entities + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing the merged entity information @@ -1118,8 +1123,8 @@ async def amerge_entities( all_relations = [] entities_to_collect = source_entities.copy() - # If target entity exists, also collect its relationships for merging - if target_exists: + # If target entity exists and not already in source_entities, add it + if target_exists and target_entity not in source_entities: entities_to_collect.append(target_entity) for entity_name in entities_to_collect: @@ -1148,12 +1153,25 @@ async def amerge_entities( logger.info(f"Entity Merge: Updated target '{target_entity}'") # 6. Recreate all relations pointing to the target entity in KG + # Also collect chunk tracking information in the same loop relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] + # Initialize chunk tracking variables + relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids + old_relation_keys_to_delete = [] + for src, tgt, edge_data in all_relations: relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + + # Collect old chunk tracking key for deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + old_storage_key = make_relation_chunk_key(src, tgt) + old_relation_keys_to_delete.append(old_storage_key) + new_src = target_entity if src in source_entities else src new_tgt = target_entity if tgt in source_entities else tgt @@ -1168,6 +1186,34 @@ async def amerge_entities( normalized_src, normalized_tgt = sorted([new_src, new_tgt]) relation_key = f"{normalized_src}|{normalized_tgt}" + # Process chunk tracking for this relation + if relation_chunks_storage is not None: + storage_key = make_relation_chunk_key( + normalized_src, normalized_tgt + ) + + # Get chunk_ids from storage for this original relation + stored = await relation_chunks_storage.get_by_id(old_storage_key) + + if stored is not None and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + else: + # Fallback to source_id from graph + source_id = edge_data.get("source_id", "") + chunk_ids = [ + cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid + ] + + # Accumulate chunk_ids with ordered deduplication + if storage_key not in relation_chunk_tracking: + relation_chunk_tracking[storage_key] = [] + + existing_chunks = set(relation_chunk_tracking[storage_key]) + for chunk_id in chunk_ids: + if chunk_id not in existing_chunks: + existing_chunks.add(chunk_id) + relation_chunk_tracking[storage_key].append(chunk_id) + if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1204,26 +1250,25 @@ async def amerge_entities( f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" ) - # 7. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description + # Update relation chunk tracking storage + if relation_chunks_storage is not None and all_relations: + if old_relation_keys_to_delete: + await relation_chunks_storage.delete(old_relation_keys_to_delete) - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") + if relation_chunk_tracking: + updates = {} + for storage_key, chunk_ids in relation_chunk_tracking.items(): + updates[storage_key] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } - # 8. Update relationship vector representations + await relation_chunks_storage.upsert(updates) + logger.info( + f"Entity Merge: merged chunk tracking for {len(updates)} relations" + ) + + # 7. Update relationship vector representations logger.info( f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" ) @@ -1262,7 +1307,80 @@ async def amerge_entities( f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" ) - # 9. Delete source entities + # 8. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") + + # 9. Merge entity chunk tracking (source entities first, then target entity) + if entity_chunks_storage is not None: + all_chunk_id_lists = [] + + # Build list of entities to process (source entities first, then target entity) + entities_to_process = [] + + # Add source entities first (excluding target if it's already in source list) + for entity_name in source_entities: + if entity_name != target_entity: + entities_to_process.append(entity_name) + + # Add target entity last (if it exists) + if target_exists: + entities_to_process.append(target_entity) + + # Process all entities in order with unified logic + for entity_name in entities_to_process: + stored = await entity_chunks_storage.get_by_id(entity_name) + if stored and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + if chunk_ids: + all_chunk_id_lists.append(chunk_ids) + + # Merge chunk_ids with ordered deduplication (preserves order, source entities first) + merged_chunk_ids = [] + seen = set() + for chunk_id_list in all_chunk_id_lists: + for chunk_id in chunk_id_list: + if chunk_id not in seen: + seen.add(chunk_id) + merged_chunk_ids.append(chunk_id) + + # Delete source entities' chunk tracking records + entity_keys_to_delete = [ + e for e in source_entities if e != target_entity + ] + if entity_keys_to_delete: + await entity_chunks_storage.delete(entity_keys_to_delete) + + # Update target entity's chunk tracking + if merged_chunk_ids: + await entity_chunks_storage.upsert( + { + target_entity: { + "chunk_ids": merged_chunk_ids, + "count": len(merged_chunk_ids), + } + } + ) + logger.info( + f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" + ) + + # 10. Delete source entities for entity_name in source_entities: if entity_name == target_entity: logger.warning( @@ -1279,11 +1397,13 @@ async def amerge_entities( entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - # 10. Save changes + # 11. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, relationships_vdb=relationships_vdb, chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info(