From bf6cf547658d5cc4ea02aca65a6a438cc742473f Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 01:20:13 +0800 Subject: [PATCH] Add chunk tracking support to graph entity/relation operations - Pass chunk storage to graph ops - Update chunks on entity renames - Track chunks in create operations - Merge chunk data in entity merges - Clean up orphaned chunk entries --- lightrag/lightrag.py | 7 ++ lightrag/utils_graph.py | 235 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 239 insertions(+), 3 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ca972712..ce58dec8 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3600,6 +3600,8 @@ class LightRAG: entity_name, updated_data, allow_rename, + entity_chunks_storage=self.entity_chunks, + relation_chunks_storage=self.relation_chunks, ) def edit_entity( @@ -3634,6 +3636,7 @@ class LightRAG: source_entity, target_entity, updated_data, + relation_chunks_storage=self.relation_chunks, ) def edit_relation( @@ -3666,6 +3669,7 @@ class LightRAG: self.relationships_vdb, entity_name, entity_data, + entity_chunks_storage=self.entity_chunks, ) def create_entity( @@ -3698,6 +3702,7 @@ class LightRAG: source_entity, target_entity, relation_data, + relation_chunks_storage=self.relation_chunks, ) def create_relation( @@ -3745,6 +3750,8 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, + entity_chunks_storage=self.entity_chunks, + relation_chunks_storage=self.relation_chunks, ) def merge_entities( diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index c2ccd313..1caaec13 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -7,7 +7,7 @@ from typing import Any, cast from .base import DeletionResult from .kg.shared_storage import get_graph_db_lock from .constants import GRAPH_FIELD_SEP -from .utils import compute_mdhash_id, logger +from .utils import compute_mdhash_id, logger, make_relation_chunk_key from .base import StorageNameSpace @@ -167,6 +167,8 @@ async def aedit_entity( entity_name: str, updated_data: dict[str, str], allow_rename: bool = True, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit entity information. @@ -179,6 +181,8 @@ async def aedit_entity( entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + entity_chunks_storage: Optional KV storage for entity chunk tracking + relation_chunks_storage: Optional KV storage for relation chunk tracking Returns: Dictionary containing updated entity information @@ -273,12 +277,69 @@ async def aedit_entity( f"Deleted old entity '{entity_name}' and its vector embedding from database" ) + # Migrate entity_chunks data from old entity to new entity + if entity_chunks_storage: + old_chunks_data = await entity_chunks_storage.get_by_id(entity_name) + if old_chunks_data: + # Create chunk tracking for new entity + await entity_chunks_storage.upsert( + {new_entity_name: old_chunks_data} + ) + # Delete old entity's chunk tracking + await entity_chunks_storage.delete([entity_name]) + logger.info( + f"Migrated entity_chunks data from '{entity_name}' to '{new_entity_name}'" + ) + # Delete old relation records from vector database await relationships_vdb.delete(relations_to_delete) logger.info( f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" ) + # Migrate relation_chunks data for renamed entity relationships + if relation_chunks_storage and edges: + old_relation_keys_to_delete = [] + new_relation_chunks_data = {} + + for source, target in edges: + # Generate old relation key + old_key = make_relation_chunk_key(source, target) + + # Get old relation chunks data + old_chunks_data = await relation_chunks_storage.get_by_id( + old_key + ) + if old_chunks_data: + # Determine new source and target + new_src = ( + new_entity_name if source == entity_name else source + ) + new_tgt = ( + new_entity_name if target == entity_name else target + ) + + # Generate new relation key + new_key = make_relation_chunk_key(new_src, new_tgt) + + # Only update if key changed + if new_key != old_key: + # Store data for new key + new_relation_chunks_data[new_key] = old_chunks_data + # Mark old key for deletion + old_relation_keys_to_delete.append(old_key) + + # Apply relation_chunks updates + if new_relation_chunks_data: + await relation_chunks_storage.upsert(new_relation_chunks_data) + if old_relation_keys_to_delete: + await relation_chunks_storage.delete( + old_relation_keys_to_delete + ) + logger.info( + f"Migrated {len(old_relation_keys_to_delete)} relation_chunks entries for renamed entity" + ) + # Update relationship vector representations for src, tgt, edge_data in relations_to_update: description = edge_data.get("description", "") @@ -339,6 +400,24 @@ async def aedit_entity( # Update vector database await entities_vdb.upsert(entity_data) + # 3.5. Update entity_chunks if source_id was updated + if entity_chunks_storage and "source_id" in updated_data: + source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else [] + chunk_ids = [cid for cid in source_ids if cid] + if chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + "updated_at": int(time.time()), + } + } + ) + logger.info( + f"Updated entity_chunks for '{entity_name}' with {len(chunk_ids)} chunk IDs" + ) + # 4. Save changes await _edit_entity_done( entities_vdb, relationships_vdb, chunk_entity_relation_graph @@ -379,6 +458,7 @@ async def aedit_relation( source_entity: str, target_entity: str, updated_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit relation information. @@ -391,6 +471,7 @@ async def aedit_relation( source_entity: Name of the source entity target_entity: Name of the target entity updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"} + relation_chunks_storage: Optional KV storage for relation chunk tracking Returns: Dictionary containing updated relation information @@ -455,6 +536,25 @@ async def aedit_relation( # Update vector database await relationships_vdb.upsert(relation_data) + # 3.5. Update relation_chunks if source_id was updated + if relation_chunks_storage and "source_id" in updated_data: + source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else [] + chunk_ids = [cid for cid in source_ids if cid] + if chunk_ids: + storage_key = make_relation_chunk_key(source_entity, target_entity) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + "updated_at": int(time.time()), + } + } + ) + logger.info( + f"Updated relation_chunks for '{source_entity}' -> '{target_entity}' with {len(chunk_ids)} chunk IDs" + ) + # 4. Save changes await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) @@ -494,6 +594,7 @@ async def acreate_entity( relationships_vdb, entity_name: str, entity_data: dict[str, Any], + entity_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new entity. @@ -505,6 +606,7 @@ async def acreate_entity( relationships_vdb: Vector database storage for relationships entity_name: Name of the new entity entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + entity_chunks_storage: Optional KV storage for entity chunk tracking Returns: Dictionary containing created entity information @@ -555,6 +657,24 @@ async def acreate_entity( # Update vector database await entities_vdb.upsert(entity_data_for_vdb) + # Create entity_chunks entry if source_id is provided + if entity_chunks_storage and source_id and source_id != "manual_creation": + source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else [] + chunk_ids = [cid for cid in source_ids if cid] + if chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + "created_at": int(time.time()), + } + } + ) + logger.info( + f"Created entity_chunks for '{entity_name}' with {len(chunk_ids)} chunk IDs" + ) + # Save changes await _edit_entity_done( entities_vdb, relationships_vdb, chunk_entity_relation_graph @@ -579,6 +699,7 @@ async def acreate_relation( source_entity: str, target_entity: str, relation_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new relation between entities. @@ -591,6 +712,7 @@ async def acreate_relation( source_entity: Name of the source entity target_entity: Name of the target entity relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + relation_chunks_storage: Optional KV storage for relation chunk tracking Returns: Dictionary containing created relation information @@ -663,6 +785,25 @@ async def acreate_relation( # Update vector database await relationships_vdb.upsert(relation_data_for_vdb) + # Create relation_chunks entry if source_id is provided + if relation_chunks_storage and source_id and source_id != "manual_creation": + source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else [] + chunk_ids = [cid for cid in source_ids if cid] + if chunk_ids: + storage_key = make_relation_chunk_key(source_entity, target_entity) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + "created_at": int(time.time()), + } + } + ) + logger.info( + f"Created relation_chunks for '{source_entity}' -> '{target_entity}' with {len(chunk_ids)} chunk IDs" + ) + # Save changes await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) @@ -691,6 +832,8 @@ async def amerge_entities( target_entity: str, merge_strategy: dict[str, str] = None, target_entity_data: dict[str, Any] = None, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously merge multiple entities into one entity. @@ -897,7 +1040,89 @@ async def amerge_entities( await relationships_vdb.upsert(relation_data_for_vdb) - # 9. Delete source entities + # 9. Merge and update chunk storage + # Merge entity_chunks from all source entities + if entity_chunks_storage: + all_chunk_ids = set() + for entity_name in source_entities: + if entity_name == target_entity: + continue + chunks_data = await entity_chunks_storage.get_by_id(entity_name) + if chunks_data and isinstance(chunks_data, dict): + chunk_list = chunks_data.get("chunk_ids", []) + all_chunk_ids.update(chunk_list) + + # Get target entity's existing chunks if it exists + if target_exists: + target_chunks_data = await entity_chunks_storage.get_by_id( + target_entity + ) + if target_chunks_data and isinstance(target_chunks_data, dict): + chunk_list = target_chunks_data.get("chunk_ids", []) + all_chunk_ids.update(chunk_list) + + if all_chunk_ids: + await entity_chunks_storage.upsert( + { + target_entity: { + "chunk_ids": list(all_chunk_ids), + "count": len(all_chunk_ids), + "updated_at": int(time.time()), + } + } + ) + logger.info( + f"Merged entity_chunks for '{target_entity}' with {len(all_chunk_ids)} chunk IDs" + ) + + # Update relation_chunks for merged relationships + if relation_chunks_storage and all_relations: + old_relation_keys_to_delete = [] + new_relation_chunks_data = {} + + for src, tgt, edge_data in all_relations: + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip self-loops + if new_src == new_tgt: + continue + + old_key = make_relation_chunk_key(src, tgt) + new_key = make_relation_chunk_key(new_src, new_tgt) + + if old_key != new_key: + old_chunks_data = await relation_chunks_storage.get_by_id( + old_key + ) + if old_chunks_data: + # Check if new key already has data (merge case) + if new_key in new_relation_chunks_data: + existing_chunks = set( + new_relation_chunks_data[new_key].get( + "chunk_ids", [] + ) + ) + new_chunks = set(old_chunks_data.get("chunk_ids", [])) + merged_chunks = existing_chunks.union(new_chunks) + new_relation_chunks_data[new_key] = { + "chunk_ids": list(merged_chunks), + "count": len(merged_chunks), + "updated_at": int(time.time()), + } + else: + new_relation_chunks_data[new_key] = old_chunks_data + old_relation_keys_to_delete.append(old_key) + + if new_relation_chunks_data: + await relation_chunks_storage.upsert(new_relation_chunks_data) + if old_relation_keys_to_delete: + await relation_chunks_storage.delete(old_relation_keys_to_delete) + logger.info( + f"Updated {len(new_relation_chunks_data)} relation_chunks entries for merged entities" + ) + + # 10. Delete source entities for entity_name in source_entities: if entity_name == target_entity: logger.info( @@ -916,7 +1141,11 @@ async def amerge_entities( f"Deleted source entity '{entity_name}' and its vector embedding from database" ) - # 10. Save changes + # Delete source entity's chunk tracking + if entity_chunks_storage: + await entity_chunks_storage.delete([entity_name]) + + # 11. Save changes await _merge_entities_done( entities_vdb, relationships_vdb, chunk_entity_relation_graph )