From a3370b024d3baaf446a7ea6a3b2efb1717ce554a Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 17:06:16 +0800 Subject: [PATCH] Add chunk tracking cleanup to entity/relation deletion and creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Clean up chunk storage on delete • Track chunks in create operations • Normalize relation keys consistently --- lightrag/utils_graph.py | 188 ++++++++++++++++++++++++++++++++-------- 1 file changed, 153 insertions(+), 35 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 68d65617..b3b042c9 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -12,15 +12,24 @@ from .base import StorageNameSpace async def adelete_by_entity( - chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete an entity and all its relationships. + Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance entities_vdb: Vector database storage for entities relationships_vdb: Vector database storage for relationships entity_name: Name of the entity to delete + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations @@ -39,14 +48,43 @@ async def adelete_by_entity( edges = await chunk_entity_relation_graph.get_node_edges(entity_name) related_relations_count = len(edges) if edges else 0 + # Clean up chunk tracking storages before deletion + if entity_chunks_storage is not None: + # Delete entity's entry from entity_chunks_storage + await entity_chunks_storage.delete([entity_name]) + logger.info( + f"Entity Delete: removed chunk tracking for `{entity_name}`" + ) + + if relation_chunks_storage is not None and edges: + # Delete all related relationships from relation_chunks_storage + from .utils import make_relation_chunk_key + + relation_keys_to_delete = [] + for src, tgt in edges: + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([src, tgt]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + relation_keys_to_delete.append(storage_key) + + if relation_keys_to_delete: + await relation_chunks_storage.delete(relation_keys_to_delete) + logger.info( + f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations" + ) + await entities_vdb.delete_entity(entity_name) await relationships_vdb.delete_entity_relation(entity_name) await chunk_entity_relation_graph.delete_node(entity_name) - message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted." + message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" logger.info(message) await _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage, + relation_chunks_storage, ) return DeletionResult( status="success", @@ -66,17 +104,23 @@ async def adelete_by_entity( async def _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> None: """Callback after entity deletion is complete, ensures updates are persisted""" + storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) @@ -86,14 +130,18 @@ async def adelete_by_relation( relationships_vdb, source_entity: str, target_entity: str, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete a relation between two entities. + Also cleans up relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance relationships_vdb: Vector database storage for relationships source_entity: Name of the source entity target_entity: Name of the target entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" graph_db_lock = get_graph_db_lock(enable_logging=False) @@ -118,6 +166,19 @@ async def adelete_by_relation( status_code=404, ) + # Clean up chunk tracking storage before deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + await relation_chunks_storage.delete([storage_key]) + logger.info( + f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`" + ) + # Delete relation from vector database rel_ids_to_delete = [ compute_mdhash_id(source_entity + target_entity, prefix="rel-"), @@ -131,9 +192,11 @@ async def adelete_by_relation( [(source_entity, target_entity)] ) - message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" logger.info(message) - await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _delete_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + ) return DeletionResult( status="success", doc_id=relation_str, @@ -151,15 +214,18 @@ async def adelete_by_relation( ) -async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: +async def _delete_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None +) -> None: """Callback after relation deletion is complete, ensures updates are persisted""" + storages = [relationships_vdb, chunk_entity_relation_graph] + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) @@ -506,7 +572,7 @@ async def aedit_entity( relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully updated") + logger.info(f"Entity Edit: `{entity_name}` successfully updated") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -586,12 +652,14 @@ async def aedit_relation( source_entity, target_entity ) # Important: First delete the old relation record from the vector database - old_relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await relationships_vdb.delete([old_relation_id]) - logger.info( - f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" + # Delete both permutations to handle relationships created before normalization + rel_ids_to_delete = [ + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + await relationships_vdb.delete(rel_ids_to_delete) + logger.debug( + f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`" ) # 2. Update relation information in the graph @@ -690,14 +758,8 @@ async def aedit_relation( } ) - reason = ( - "source_id changed" - if source_id_changed - else "initialized from graph" - ) logger.info( - f"Updated relation_chunks_storage for '{source_entity}' -> '{target_entity}': " - f"{len(updated_chunk_ids)} chunks ({reason})" + f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`" ) # 5. Save changes @@ -706,7 +768,7 @@ async def aedit_relation( ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated" ) return await get_relation_info( chunk_entity_relation_graph, @@ -744,10 +806,13 @@ async def acreate_entity( relationships_vdb, entity_name: str, entity_data: dict[str, Any], + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new entity. Creates a new entity in the knowledge graph and adds it to the vector database. + Also synchronizes entity_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -755,6 +820,8 @@ async def acreate_entity( relationships_vdb: Vector database storage for relationships entity_name: Name of the new entity entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing created entity information @@ -805,12 +872,34 @@ async def acreate_entity( # Update vector database await entities_vdb.upsert(entity_data_for_vdb) + # Update entity_chunks_storage to track chunk references + if entity_chunks_storage is not None: + source_id = node_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`" + ) + # Save changes await _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage, + relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully created") + logger.info(f"Entity Create: '{entity_name}' successfully created") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -829,10 +918,12 @@ async def acreate_relation( source_entity: str, target_entity: str, relation_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new relation between entities. Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + Also synchronizes relation_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -841,6 +932,7 @@ async def acreate_relation( source_entity: Name of the source entity target_entity: Name of the target entity relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation Returns: Dictionary containing created relation information @@ -917,11 +1009,37 @@ async def acreate_relation( # Update vector database await relationships_vdb.upsert(relation_data_for_vdb) + # Update relation_chunks_storage to track chunk references + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + source_id = edge_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`" + ) + # Save changes - await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _edit_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully created" + f"Relation Create: `{source_entity}`~`{target_entity}` successfully created" ) return await get_relation_info( chunk_entity_relation_graph,