From 3fbd704bf9743ac0633b7bfade115ff2b9358c0f Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 14:34:56 +0800 Subject: [PATCH 1/4] Enhance entity/relation editing with chunk tracking synchronization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add chunk storage sync to edit ops • Implement incremental chunk ID updates • Support entity renaming migrations • Normalize relation keys consistently • Preserve chunk references on edits --- lightrag/lightrag.py | 5 + lightrag/utils.py | 46 ++++++ lightrag/utils_graph.py | 302 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 325 insertions(+), 28 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ca972712..ced5f40e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3582,6 +3582,7 @@ class LightRAG: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit @@ -3600,6 +3601,8 @@ class LightRAG: entity_name, updated_data, allow_rename, + self.entity_chunks, + self.relation_chunks, ) def edit_entity( @@ -3616,6 +3619,7 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3634,6 +3638,7 @@ class LightRAG: source_entity, target_entity, updated_data, + self.relation_chunks, ) def edit_relation( diff --git a/lightrag/utils.py b/lightrag/utils.py index bfa3cac4..6c382894 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -2551,6 +2551,52 @@ def apply_source_ids_limit( return truncated +def compute_incremental_chunk_ids( + existing_full_chunk_ids: list[str], + old_chunk_ids: list[str], + new_chunk_ids: list[str], +) -> list[str]: + """ + Compute incrementally updated chunk IDs based on changes. + + This function applies delta changes (additions and removals) to an existing + list of chunk IDs while maintaining order and ensuring deduplication. + Delta additions from new_chunk_ids are placed at the end. + + Args: + existing_full_chunk_ids: Complete list of existing chunk IDs from storage + old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced) + new_chunk_ids: New chunk IDs from updated source_id (chunks being added) + + Returns: + Updated list of chunk IDs with deduplication + + Example: + >>> existing = ['chunk-1', 'chunk-2', 'chunk-3'] + >>> old = ['chunk-1', 'chunk-2'] + >>> new = ['chunk-2', 'chunk-4'] + >>> compute_incremental_chunk_ids(existing, old, new) + ['chunk-3', 'chunk-2', 'chunk-4'] + """ + # Calculate changes + chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids) + chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids) + + # Apply changes to full chunk_ids + # Step 1: Remove chunks that are no longer needed + updated_chunk_ids = [ + cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove + ] + + # Step 2: Add new chunks (preserving order from new_chunk_ids) + # Note: 'cid not in updated_chunk_ids' check ensures deduplication + for cid in new_chunk_ids: + if cid in chunks_to_add and cid not in updated_chunk_ids: + updated_chunk_ids.append(cid) + + return updated_chunk_ids + + def subtract_source_ids( source_ids: Iterable[str], ids_to_remove: Collection[str], diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index c2ccd313..97d3dc7a 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -167,10 +167,13 @@ async def aedit_entity( entity_name: str, updated_data: dict[str, str], allow_rename: bool = True, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -179,6 +182,8 @@ async def aedit_entity( entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing updated entity information @@ -187,6 +192,9 @@ async def aedit_entity( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Save original entity name for chunk tracking updates + original_entity_name = entity_name + # 1. Get current entity information node_exists = await chunk_entity_relation_graph.has_node(entity_name) if not node_exists: @@ -223,7 +231,9 @@ async def aedit_entity( # If renaming entity if is_renaming: - logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") + logger.info( + f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`" + ) # Create new entity await chunk_entity_relation_graph.upsert_node( @@ -269,35 +279,36 @@ async def aedit_entity( # Delete old entity record from vector database old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([old_entity_id]) - logger.info( - f"Deleted old entity '{entity_name}' and its vector embedding from database" - ) # Delete old relation records from vector database await relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) # Update relationship vector representations for src, tgt, edge_data in relations_to_update: + # Normalize entity order for consistent vector ID generation + normalized_src, normalized_tgt = sorted([src, tgt]) + description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") source_id = edge_data.get("source_id", "") weight = float(edge_data.get("weight", 1.0)) - # Create new content for embedding - content = f"{src}\t{tgt}\n{keywords}\n{description}" + # Create content using normalized order + content = ( + f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" + ) - # Calculate relationship ID - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # Calculate relationship ID using normalized order + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) # Prepare data for vector database update relation_data = { relation_id: { "content": content, - "src_id": src, - "tgt_id": tgt, + "src_id": normalized_src, + "tgt_id": normalized_tgt, "source_id": source_id, "description": description, "keywords": keywords, @@ -310,6 +321,7 @@ async def aedit_entity( # Update working entity name to new name entity_name = new_entity_name + else: # If not renaming, directly update node data await chunk_entity_relation_graph.upsert_node( @@ -339,9 +351,155 @@ async def aedit_entity( # Update vector database await entities_vdb.upsert(entity_data) - # 4. Save changes + # 4. Update chunk tracking storages + if entity_chunks_storage is not None or relation_chunks_storage is not None: + from .utils import ( + make_relation_chunk_key, + compute_incremental_chunk_ids, + ) + + # 4.1 Handle entity chunk tracking + if entity_chunks_storage is not None: + # Get storage key (use original name for renaming scenario) + storage_key = original_entity_name if is_renaming else entity_name + stored_data = await entity_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + # Get old and new source_id + old_source_id = node_data.get("source_id", "") + old_chunk_ids = [ + cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + new_source_id = new_node_data.get("source_id", "") + new_chunk_ids = [ + cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + # Update if: source_id changed OR storage has no data + if source_id_changed or not has_stored_data: + # Get existing full chunk_ids from storage + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + # If no stored data exists, use old source_id as baseline + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + # Use utility function to compute incremental updates + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + # Update storage (even if updated_chunk_ids is empty) + if is_renaming: + # Renaming: delete old + create new + await entity_chunks_storage.delete([original_entity_name]) + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + else: + # Non-renaming: direct update + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + logger.info( + f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" + ) + + # 4.2 Handle relation chunk tracking if entity was renamed + if ( + is_renaming + and relation_chunks_storage is not None + and relations_to_update + ): + for src, tgt, edge_data in relations_to_update: + # Determine old entity pair (before rename) + old_src = original_entity_name if src == entity_name else src + old_tgt = original_entity_name if tgt == entity_name else tgt + + # Normalize entity order for both old and new keys + old_normalized_src, old_normalized_tgt = sorted( + [old_src, old_tgt] + ) + new_normalized_src, new_normalized_tgt = sorted([src, tgt]) + + # Generate storage keys + old_storage_key = make_relation_chunk_key( + old_normalized_src, old_normalized_tgt + ) + new_storage_key = make_relation_chunk_key( + new_normalized_src, new_normalized_tgt + ) + + # If keys are different, we need to move the chunk tracking + if old_storage_key != new_storage_key: + # Get complete chunk IDs from storage first (preserves all existing references) + old_stored_data = await relation_chunks_storage.get_by_id( + old_storage_key + ) + relation_chunk_ids = [] + + if old_stored_data and isinstance(old_stored_data, dict): + # Use complete chunk_ids from storage + relation_chunk_ids = [ + cid + for cid in old_stored_data.get("chunk_ids", []) + if cid + ] + else: + # Fallback: if storage has no data, use graph's source_id + relation_source_id = edge_data.get("source_id", "") + relation_chunk_ids = [ + cid + for cid in relation_source_id.split(GRAPH_FIELD_SEP) + if cid + ] + + # Delete old relation chunk tracking + await relation_chunks_storage.delete([old_storage_key]) + + # Create new relation chunk tracking (migrate complete data) + if relation_chunk_ids: + await relation_chunks_storage.upsert( + { + new_storage_key: { + "chunk_ids": relation_chunk_ids, + "count": len(relation_chunk_ids), + } + } + ) + logger.info( + f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + ) + + # 5. Save changes await _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage, + relation_chunks_storage, ) logger.info(f"Entity '{entity_name}' successfully updated") @@ -357,17 +515,23 @@ async def aedit_entity( async def _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> None: """Callback after entity editing is complete, ensures updates are persisted""" + storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) @@ -379,10 +543,12 @@ async def aedit_relation( source_entity: str, target_entity: str, updated_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: chunk_entity_relation_graph: Graph storage instance @@ -391,6 +557,7 @@ async def aedit_relation( source_entity: Name of the source entity target_entity: Name of the target entity updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"} + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation Returns: Dictionary containing updated relation information @@ -399,6 +566,10 @@ async def aedit_relation( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # 1. Get current relation information edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -455,8 +626,80 @@ async def aedit_relation( # Update vector database await relationships_vdb.upsert(relation_data) - # 4. Save changes - await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + # 4. Update relation_chunks_storage in two scenarios: + # - source_id has changed (edit scenario) + # - relation_chunks_storage has no existing data (migration/initialization scenario) + if relation_chunks_storage is not None: + from .utils import ( + make_relation_chunk_key, + compute_incremental_chunk_ids, + ) + + storage_key = make_relation_chunk_key(source_entity, target_entity) + + # Check if storage has existing data + stored_data = await relation_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + # Get old and new source_id + old_source_id = edge_data.get("source_id", "") + old_chunk_ids = [ + cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + new_source_id = new_edge_data.get("source_id", "") + new_chunk_ids = [ + cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + # Update if: source_id changed OR storage has no data + if source_id_changed or not has_stored_data: + # Get existing full chunk_ids from storage + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + # If no stored data exists, use old source_id as baseline + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + # Use utility function to compute incremental updates + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + # Update storage (Update even if updated_chunk_ids is empty) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + reason = ( + "source_id changed" + if source_id_changed + else "initialized from graph" + ) + logger.info( + f"Updated relation_chunks_storage for '{source_entity}' -> '{target_entity}': " + f"{len(updated_chunk_ids)} chunks ({reason})" + ) + + # 5. Save changes + await _edit_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + ) logger.info( f"Relation from '{source_entity}' to '{target_entity}' successfully updated" @@ -475,15 +718,18 @@ async def aedit_relation( raise -async def _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: +async def _edit_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None +) -> None: """Callback after relation editing is complete, ensures updates are persisted""" + storages = [relationships_vdb, chunk_entity_relation_graph] + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) From bf1897a67eebc668565f1f1e099e0b8073934552 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 15:53:31 +0800 Subject: [PATCH 2/4] Normalize entity order for undirected graph consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Normalize entity pairs for storage • Update API docs for undirected edges --- lightrag/api/routers/graph_routes.py | 13 +++++++--- lightrag/utils_graph.py | 37 ++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index dbf4527e..f4c29fc2 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -299,6 +299,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): entity_data (dict): Entity properties including: - description (str): Textual description of the entity - entity_type (str): Category/type of the entity (e.g., PERSON, ORGANIZATION, LOCATION) + - source_id (str): Related chunk_id from which the description originates - Additional custom properties as needed Response Schema: @@ -309,6 +310,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "entity_name": "Tesla", "description": "Electric vehicle manufacturer", "entity_type": "ORGANIZATION", + "source_id": "chunk-123chunk-456" ... (other entity properties) } } @@ -361,10 +363,11 @@ def create_graph_routes(rag, api_key: Optional[str] = None): """ Create a new relationship between two entities in the knowledge graph - This endpoint establishes a directed relationship between two existing entities. - Both the source and target entities must already exist in the knowledge graph. - The system automatically generates vector embeddings for the relationship to - enable semantic search and graph traversal. + This endpoint establishes an undirected relationship between two existing entities. + The provided source/target order is accepted for convenience, but the backend + stored edge is undirected and may be returned with the entities swapped. + Both entities must already exist in the knowledge graph. The system automatically + generates vector embeddings for the relationship to enable semantic search and graph traversal. Prerequisites: - Both source_entity and target_entity must exist in the knowledge graph @@ -376,6 +379,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): relation_data (dict): Relationship properties including: - description (str): Textual description of the relationship - keywords (str): Comma-separated keywords describing the relationship type + - source_id (str): Related chunk_id from which the description originates - weight (float): Relationship strength/importance (default: 1.0) - Additional custom properties as needed @@ -388,6 +392,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "tgt_id": "Tesla", "description": "Elon Musk is the CEO of Tesla", "keywords": "CEO, founder", + "source_id": "chunk-123chunk-456" "weight": 1.0, ... (other relationship properties) } diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 97d3dc7a..68d65617 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -100,6 +100,10 @@ async def adelete_by_relation( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Check if the relation exists edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -878,6 +882,10 @@ async def acreate_relation( source_entity, target_entity, edge_data ) + # Normalize entity order for undirected relation vector (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Prepare content for embedding description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -1121,19 +1129,27 @@ async def amerge_entities( tgt = rel_data["tgt"] edge_data = rel_data["data"] + # Normalize entity order for consistent vector storage + normalized_src, normalized_tgt = sorted([src, tgt]) + description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") source_id = edge_data.get("source_id", "") weight = float(edge_data.get("weight", 1.0)) - content = f"{keywords}\t{src}\n{tgt}\n{description}" - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # Use normalized order for content and relation ID + content = ( + f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" + ) + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) relation_data_for_vdb = { relation_id: { "content": content, - "src_id": src, - "tgt_id": tgt, + "src_id": normalized_src, + "tgt_id": normalized_tgt, "source_id": source_id, "description": description, "keywords": keywords, @@ -1340,7 +1356,18 @@ async def get_relation_info( tgt_entity: str, include_vector_data: bool = False, ) -> dict[str, str | None | dict[str, str]]: - """Get detailed information of a relationship""" + """ + Get detailed information of a relationship between two entities. + Relationship is unidirectional, swap src_entity and tgt_entity does not change the relationship. + + Args: + src_entity: Source entity name + tgt_entity: Target entity name + include_vector_data: Whether to include vector database information + + Returns: + Dictionary containing relationship information + """ # Get information from the graph edge_data = await chunk_entity_relation_graph.get_edge(src_entity, tgt_entity) From a3370b024d3baaf446a7ea6a3b2efb1717ce554a Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 17:06:16 +0800 Subject: [PATCH 3/4] Add chunk tracking cleanup to entity/relation deletion and creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Clean up chunk storage on delete • Track chunks in create operations • Normalize relation keys consistently --- lightrag/utils_graph.py | 188 ++++++++++++++++++++++++++++++++-------- 1 file changed, 153 insertions(+), 35 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 68d65617..b3b042c9 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -12,15 +12,24 @@ from .base import StorageNameSpace async def adelete_by_entity( - chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete an entity and all its relationships. + Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance entities_vdb: Vector database storage for entities relationships_vdb: Vector database storage for relationships entity_name: Name of the entity to delete + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations @@ -39,14 +48,43 @@ async def adelete_by_entity( edges = await chunk_entity_relation_graph.get_node_edges(entity_name) related_relations_count = len(edges) if edges else 0 + # Clean up chunk tracking storages before deletion + if entity_chunks_storage is not None: + # Delete entity's entry from entity_chunks_storage + await entity_chunks_storage.delete([entity_name]) + logger.info( + f"Entity Delete: removed chunk tracking for `{entity_name}`" + ) + + if relation_chunks_storage is not None and edges: + # Delete all related relationships from relation_chunks_storage + from .utils import make_relation_chunk_key + + relation_keys_to_delete = [] + for src, tgt in edges: + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([src, tgt]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + relation_keys_to_delete.append(storage_key) + + if relation_keys_to_delete: + await relation_chunks_storage.delete(relation_keys_to_delete) + logger.info( + f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations" + ) + await entities_vdb.delete_entity(entity_name) await relationships_vdb.delete_entity_relation(entity_name) await chunk_entity_relation_graph.delete_node(entity_name) - message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted." + message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" logger.info(message) await _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage, + relation_chunks_storage, ) return DeletionResult( status="success", @@ -66,17 +104,23 @@ async def adelete_by_entity( async def _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> None: """Callback after entity deletion is complete, ensures updates are persisted""" + storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) @@ -86,14 +130,18 @@ async def adelete_by_relation( relationships_vdb, source_entity: str, target_entity: str, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete a relation between two entities. + Also cleans up relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance relationships_vdb: Vector database storage for relationships source_entity: Name of the source entity target_entity: Name of the target entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" graph_db_lock = get_graph_db_lock(enable_logging=False) @@ -118,6 +166,19 @@ async def adelete_by_relation( status_code=404, ) + # Clean up chunk tracking storage before deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + await relation_chunks_storage.delete([storage_key]) + logger.info( + f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`" + ) + # Delete relation from vector database rel_ids_to_delete = [ compute_mdhash_id(source_entity + target_entity, prefix="rel-"), @@ -131,9 +192,11 @@ async def adelete_by_relation( [(source_entity, target_entity)] ) - message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" logger.info(message) - await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _delete_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + ) return DeletionResult( status="success", doc_id=relation_str, @@ -151,15 +214,18 @@ async def adelete_by_relation( ) -async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: +async def _delete_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None +) -> None: """Callback after relation deletion is complete, ensures updates are persisted""" + storages = [relationships_vdb, chunk_entity_relation_graph] + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + await asyncio.gather( *[ cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - relationships_vdb, - chunk_entity_relation_graph, - ] + for storage_inst in storages # type: ignore ] ) @@ -506,7 +572,7 @@ async def aedit_entity( relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully updated") + logger.info(f"Entity Edit: `{entity_name}` successfully updated") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -586,12 +652,14 @@ async def aedit_relation( source_entity, target_entity ) # Important: First delete the old relation record from the vector database - old_relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await relationships_vdb.delete([old_relation_id]) - logger.info( - f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" + # Delete both permutations to handle relationships created before normalization + rel_ids_to_delete = [ + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + await relationships_vdb.delete(rel_ids_to_delete) + logger.debug( + f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`" ) # 2. Update relation information in the graph @@ -690,14 +758,8 @@ async def aedit_relation( } ) - reason = ( - "source_id changed" - if source_id_changed - else "initialized from graph" - ) logger.info( - f"Updated relation_chunks_storage for '{source_entity}' -> '{target_entity}': " - f"{len(updated_chunk_ids)} chunks ({reason})" + f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`" ) # 5. Save changes @@ -706,7 +768,7 @@ async def aedit_relation( ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated" ) return await get_relation_info( chunk_entity_relation_graph, @@ -744,10 +806,13 @@ async def acreate_entity( relationships_vdb, entity_name: str, entity_data: dict[str, Any], + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new entity. Creates a new entity in the knowledge graph and adds it to the vector database. + Also synchronizes entity_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -755,6 +820,8 @@ async def acreate_entity( relationships_vdb: Vector database storage for relationships entity_name: Name of the new entity entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing created entity information @@ -805,12 +872,34 @@ async def acreate_entity( # Update vector database await entities_vdb.upsert(entity_data_for_vdb) + # Update entity_chunks_storage to track chunk references + if entity_chunks_storage is not None: + source_id = node_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`" + ) + # Save changes await _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + entity_chunks_storage, + relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully created") + logger.info(f"Entity Create: '{entity_name}' successfully created") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -829,10 +918,12 @@ async def acreate_relation( source_entity: str, target_entity: str, relation_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new relation between entities. Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + Also synchronizes relation_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -841,6 +932,7 @@ async def acreate_relation( source_entity: Name of the source entity target_entity: Name of the target entity relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation Returns: Dictionary containing created relation information @@ -917,11 +1009,37 @@ async def acreate_relation( # Update vector database await relationships_vdb.upsert(relation_data_for_vdb) + # Update relation_chunks_storage to track chunk references + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + source_id = edge_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`" + ) + # Save changes - await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _edit_relation_done( + relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully created" + f"Relation Create: `{source_entity}`~`{target_entity}` successfully created" ) return await get_relation_info( chunk_entity_relation_graph, From 6015e8bc689f572b678857a20a241c3a855e403d Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 20:20:16 +0800 Subject: [PATCH 4/4] Refactor graph utils to use unified persistence callback - Add _persist_graph_updates function - Remove duplicate callback functions --- lightrag/api/routers/graph_routes.py | 2 +- lightrag/utils_graph.py | 207 +++++++++++---------------- 2 files changed, 85 insertions(+), 124 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index f4c29fc2..f59a7c3d 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -365,7 +365,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): This endpoint establishes an undirected relationship between two existing entities. The provided source/target order is accepted for convenience, but the backend - stored edge is undirected and may be returned with the entities swapped. + stored edge is undirected and may be returned with the entities swapped. Both entities must already exist in the knowledge graph. The system automatically generates vector embeddings for the relationship to enable semantic search and graph traversal. diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index b3b042c9..6b06cf3c 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -11,6 +11,49 @@ from .utils import compute_mdhash_id, logger from .base import StorageNameSpace +async def _persist_graph_updates( + entities_vdb=None, + relationships_vdb=None, + chunk_entity_relation_graph=None, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> None: + """Unified callback to persist updates after graph operations. + + Ensures all relevant storage instances are properly persisted after + operations like delete, edit, create, or merge. + + Args: + entities_vdb: Entity vector database storage (optional) + relationships_vdb: Relationship vector database storage (optional) + chunk_entity_relation_graph: Graph storage instance (optional) + entity_chunks_storage: Entity-chunk tracking storage (optional) + relation_chunks_storage: Relation-chunk tracking storage (optional) + """ + storages = [] + + # Collect all non-None storage instances + if entities_vdb is not None: + storages.append(entities_vdb) + if relationships_vdb is not None: + storages.append(relationships_vdb) + if chunk_entity_relation_graph is not None: + storages.append(chunk_entity_relation_graph) + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + + # Persist all storage instances in parallel + if storages: + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in storages # type: ignore + ] + ) + + async def adelete_by_entity( chunk_entity_relation_graph, entities_vdb, @@ -64,7 +107,9 @@ async def adelete_by_entity( for src, tgt in edges: # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([src, tgt]) - storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + storage_key = make_relation_chunk_key( + normalized_src, normalized_tgt + ) relation_keys_to_delete.append(storage_key) if relation_keys_to_delete: @@ -79,12 +124,12 @@ async def adelete_by_entity( message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" logger.info(message) - await _delete_by_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) return DeletionResult( status="success", @@ -103,28 +148,6 @@ async def adelete_by_entity( ) -async def _delete_by_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage=None, - relation_chunks_storage=None, -) -> None: - """Callback after entity deletion is complete, ensures updates are persisted""" - storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] - if entity_chunks_storage is not None: - storages.append(entity_chunks_storage) - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def adelete_by_relation( chunk_entity_relation_graph, relationships_vdb, @@ -173,7 +196,7 @@ async def adelete_by_relation( # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([source_entity, target_entity]) storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) - + await relation_chunks_storage.delete([storage_key]) logger.info( f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`" @@ -194,8 +217,10 @@ async def adelete_by_relation( message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" logger.info(message) - await _delete_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) return DeletionResult( status="success", @@ -214,22 +239,6 @@ async def adelete_by_relation( ) -async def _delete_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None -) -> None: - """Callback after relation deletion is complete, ensures updates are persisted""" - storages = [relationships_vdb, chunk_entity_relation_graph] - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def aedit_entity( chunk_entity_relation_graph, entities_vdb, @@ -564,12 +573,12 @@ async def aedit_entity( ) # 5. Save changes - await _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info(f"Entity Edit: `{entity_name}` successfully updated") @@ -584,28 +593,6 @@ async def aedit_entity( raise -async def _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage=None, - relation_chunks_storage=None, -) -> None: - """Callback after entity editing is complete, ensures updates are persisted""" - storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] - if entity_chunks_storage is not None: - storages.append(entity_chunks_storage) - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def aedit_relation( chunk_entity_relation_graph, entities_vdb, @@ -763,8 +750,10 @@ async def aedit_relation( ) # 5. Save changes - await _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) logger.info( @@ -784,22 +773,6 @@ async def aedit_relation( raise -async def _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None -) -> None: - """Callback after relation editing is complete, ensures updates are persisted""" - storages = [relationships_vdb, chunk_entity_relation_graph] - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def acreate_entity( chunk_entity_relation_graph, entities_vdb, @@ -876,7 +849,7 @@ async def acreate_entity( if entity_chunks_storage is not None: source_id = node_data.get("source_id", "") chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] - + if chunk_ids: await entity_chunks_storage.upsert( { @@ -891,12 +864,12 @@ async def acreate_entity( ) # Save changes - await _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info(f"Entity Create: '{entity_name}' successfully created") @@ -1016,10 +989,10 @@ async def acreate_relation( # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([source_entity, target_entity]) storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) - + source_id = edge_data.get("source_id", "") chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] - + if chunk_ids: await relation_chunks_storage.upsert( { @@ -1034,8 +1007,10 @@ async def acreate_relation( ) # Save changes - await _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) logger.info( @@ -1297,8 +1272,10 @@ async def amerge_entities( ) # 10. Save changes - await _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, ) logger.info( @@ -1424,22 +1401,6 @@ def _merge_relation_attributes( return merged_data -async def _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph -) -> None: - """Callback after entity merging is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def get_entity_info( chunk_entity_relation_graph, entities_vdb,