diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index dbf4527e..f59a7c3d 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -299,6 +299,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): entity_data (dict): Entity properties including: - description (str): Textual description of the entity - entity_type (str): Category/type of the entity (e.g., PERSON, ORGANIZATION, LOCATION) + - source_id (str): Related chunk_id from which the description originates - Additional custom properties as needed Response Schema: @@ -309,6 +310,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "entity_name": "Tesla", "description": "Electric vehicle manufacturer", "entity_type": "ORGANIZATION", + "source_id": "chunk-123chunk-456" ... (other entity properties) } } @@ -361,10 +363,11 @@ def create_graph_routes(rag, api_key: Optional[str] = None): """ Create a new relationship between two entities in the knowledge graph - This endpoint establishes a directed relationship between two existing entities. - Both the source and target entities must already exist in the knowledge graph. - The system automatically generates vector embeddings for the relationship to - enable semantic search and graph traversal. + This endpoint establishes an undirected relationship between two existing entities. + The provided source/target order is accepted for convenience, but the backend + stored edge is undirected and may be returned with the entities swapped. + Both entities must already exist in the knowledge graph. The system automatically + generates vector embeddings for the relationship to enable semantic search and graph traversal. Prerequisites: - Both source_entity and target_entity must exist in the knowledge graph @@ -376,6 +379,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): relation_data (dict): Relationship properties including: - description (str): Textual description of the relationship - keywords (str): Comma-separated keywords describing the relationship type + - source_id (str): Related chunk_id from which the description originates - weight (float): Relationship strength/importance (default: 1.0) - Additional custom properties as needed @@ -388,6 +392,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "tgt_id": "Tesla", "description": "Elon Musk is the CEO of Tesla", "keywords": "CEO, founder", + "source_id": "chunk-123chunk-456" "weight": 1.0, ... (other relationship properties) } diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ca972712..ced5f40e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3582,6 +3582,7 @@ class LightRAG: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit @@ -3600,6 +3601,8 @@ class LightRAG: entity_name, updated_data, allow_rename, + self.entity_chunks, + self.relation_chunks, ) def edit_entity( @@ -3616,6 +3619,7 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3634,6 +3638,7 @@ class LightRAG: source_entity, target_entity, updated_data, + self.relation_chunks, ) def edit_relation( diff --git a/lightrag/utils.py b/lightrag/utils.py index bfa3cac4..6c382894 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -2551,6 +2551,52 @@ def apply_source_ids_limit( return truncated +def compute_incremental_chunk_ids( + existing_full_chunk_ids: list[str], + old_chunk_ids: list[str], + new_chunk_ids: list[str], +) -> list[str]: + """ + Compute incrementally updated chunk IDs based on changes. + + This function applies delta changes (additions and removals) to an existing + list of chunk IDs while maintaining order and ensuring deduplication. + Delta additions from new_chunk_ids are placed at the end. + + Args: + existing_full_chunk_ids: Complete list of existing chunk IDs from storage + old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced) + new_chunk_ids: New chunk IDs from updated source_id (chunks being added) + + Returns: + Updated list of chunk IDs with deduplication + + Example: + >>> existing = ['chunk-1', 'chunk-2', 'chunk-3'] + >>> old = ['chunk-1', 'chunk-2'] + >>> new = ['chunk-2', 'chunk-4'] + >>> compute_incremental_chunk_ids(existing, old, new) + ['chunk-3', 'chunk-2', 'chunk-4'] + """ + # Calculate changes + chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids) + chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids) + + # Apply changes to full chunk_ids + # Step 1: Remove chunks that are no longer needed + updated_chunk_ids = [ + cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove + ] + + # Step 2: Add new chunks (preserving order from new_chunk_ids) + # Note: 'cid not in updated_chunk_ids' check ensures deduplication + for cid in new_chunk_ids: + if cid in chunks_to_add and cid not in updated_chunk_ids: + updated_chunk_ids.append(cid) + + return updated_chunk_ids + + def subtract_source_ids( source_ids: Iterable[str], ids_to_remove: Collection[str], diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index c2ccd313..6b06cf3c 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -11,16 +11,68 @@ from .utils import compute_mdhash_id, logger from .base import StorageNameSpace +async def _persist_graph_updates( + entities_vdb=None, + relationships_vdb=None, + chunk_entity_relation_graph=None, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> None: + """Unified callback to persist updates after graph operations. + + Ensures all relevant storage instances are properly persisted after + operations like delete, edit, create, or merge. + + Args: + entities_vdb: Entity vector database storage (optional) + relationships_vdb: Relationship vector database storage (optional) + chunk_entity_relation_graph: Graph storage instance (optional) + entity_chunks_storage: Entity-chunk tracking storage (optional) + relation_chunks_storage: Relation-chunk tracking storage (optional) + """ + storages = [] + + # Collect all non-None storage instances + if entities_vdb is not None: + storages.append(entities_vdb) + if relationships_vdb is not None: + storages.append(relationships_vdb) + if chunk_entity_relation_graph is not None: + storages.append(chunk_entity_relation_graph) + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + + # Persist all storage instances in parallel + if storages: + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in storages # type: ignore + ] + ) + + async def adelete_by_entity( - chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete an entity and all its relationships. + Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance entities_vdb: Vector database storage for entities relationships_vdb: Vector database storage for relationships entity_name: Name of the entity to delete + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations @@ -39,14 +91,45 @@ async def adelete_by_entity( edges = await chunk_entity_relation_graph.get_node_edges(entity_name) related_relations_count = len(edges) if edges else 0 + # Clean up chunk tracking storages before deletion + if entity_chunks_storage is not None: + # Delete entity's entry from entity_chunks_storage + await entity_chunks_storage.delete([entity_name]) + logger.info( + f"Entity Delete: removed chunk tracking for `{entity_name}`" + ) + + if relation_chunks_storage is not None and edges: + # Delete all related relationships from relation_chunks_storage + from .utils import make_relation_chunk_key + + relation_keys_to_delete = [] + for src, tgt in edges: + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([src, tgt]) + storage_key = make_relation_chunk_key( + normalized_src, normalized_tgt + ) + relation_keys_to_delete.append(storage_key) + + if relation_keys_to_delete: + await relation_chunks_storage.delete(relation_keys_to_delete) + logger.info( + f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations" + ) + await entities_vdb.delete_entity(entity_name) await relationships_vdb.delete_entity_relation(entity_name) await chunk_entity_relation_graph.delete_node(entity_name) - message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted." + message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" logger.info(message) - await _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) return DeletionResult( status="success", @@ -65,41 +148,33 @@ async def adelete_by_entity( ) -async def _delete_by_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph -) -> None: - """Callback after entity deletion is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def adelete_by_relation( chunk_entity_relation_graph, relationships_vdb, source_entity: str, target_entity: str, + relation_chunks_storage=None, ) -> DeletionResult: """Asynchronously delete a relation between two entities. + Also cleans up relation_chunks_storage to remove chunk tracking. + Args: chunk_entity_relation_graph: Graph storage instance relationships_vdb: Vector database storage for relationships source_entity: Name of the source entity target_entity: Name of the target entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Check if the relation exists edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -114,6 +189,19 @@ async def adelete_by_relation( status_code=404, ) + # Clean up chunk tracking storage before deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + await relation_chunks_storage.delete([storage_key]) + logger.info( + f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`" + ) + # Delete relation from vector database rel_ids_to_delete = [ compute_mdhash_id(source_entity + target_entity, prefix="rel-"), @@ -127,9 +215,13 @@ async def adelete_by_relation( [(source_entity, target_entity)] ) - message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" logger.info(message) - await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, + ) return DeletionResult( status="success", doc_id=relation_str, @@ -147,19 +239,6 @@ async def adelete_by_relation( ) -async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: - """Callback after relation deletion is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def aedit_entity( chunk_entity_relation_graph, entities_vdb, @@ -167,10 +246,13 @@ async def aedit_entity( entity_name: str, updated_data: dict[str, str], allow_rename: bool = True, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -179,6 +261,8 @@ async def aedit_entity( entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing updated entity information @@ -187,6 +271,9 @@ async def aedit_entity( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Save original entity name for chunk tracking updates + original_entity_name = entity_name + # 1. Get current entity information node_exists = await chunk_entity_relation_graph.has_node(entity_name) if not node_exists: @@ -223,7 +310,9 @@ async def aedit_entity( # If renaming entity if is_renaming: - logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") + logger.info( + f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`" + ) # Create new entity await chunk_entity_relation_graph.upsert_node( @@ -269,35 +358,36 @@ async def aedit_entity( # Delete old entity record from vector database old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([old_entity_id]) - logger.info( - f"Deleted old entity '{entity_name}' and its vector embedding from database" - ) # Delete old relation records from vector database await relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) # Update relationship vector representations for src, tgt, edge_data in relations_to_update: + # Normalize entity order for consistent vector ID generation + normalized_src, normalized_tgt = sorted([src, tgt]) + description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") source_id = edge_data.get("source_id", "") weight = float(edge_data.get("weight", 1.0)) - # Create new content for embedding - content = f"{src}\t{tgt}\n{keywords}\n{description}" + # Create content using normalized order + content = ( + f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" + ) - # Calculate relationship ID - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # Calculate relationship ID using normalized order + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) # Prepare data for vector database update relation_data = { relation_id: { "content": content, - "src_id": src, - "tgt_id": tgt, + "src_id": normalized_src, + "tgt_id": normalized_tgt, "source_id": source_id, "description": description, "keywords": keywords, @@ -310,6 +400,7 @@ async def aedit_entity( # Update working entity name to new name entity_name = new_entity_name + else: # If not renaming, directly update node data await chunk_entity_relation_graph.upsert_node( @@ -339,12 +430,158 @@ async def aedit_entity( # Update vector database await entities_vdb.upsert(entity_data) - # 4. Save changes - await _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + # 4. Update chunk tracking storages + if entity_chunks_storage is not None or relation_chunks_storage is not None: + from .utils import ( + make_relation_chunk_key, + compute_incremental_chunk_ids, + ) + + # 4.1 Handle entity chunk tracking + if entity_chunks_storage is not None: + # Get storage key (use original name for renaming scenario) + storage_key = original_entity_name if is_renaming else entity_name + stored_data = await entity_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + # Get old and new source_id + old_source_id = node_data.get("source_id", "") + old_chunk_ids = [ + cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + new_source_id = new_node_data.get("source_id", "") + new_chunk_ids = [ + cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + # Update if: source_id changed OR storage has no data + if source_id_changed or not has_stored_data: + # Get existing full chunk_ids from storage + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + # If no stored data exists, use old source_id as baseline + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + # Use utility function to compute incremental updates + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + # Update storage (even if updated_chunk_ids is empty) + if is_renaming: + # Renaming: delete old + create new + await entity_chunks_storage.delete([original_entity_name]) + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + else: + # Non-renaming: direct update + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + logger.info( + f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" + ) + + # 4.2 Handle relation chunk tracking if entity was renamed + if ( + is_renaming + and relation_chunks_storage is not None + and relations_to_update + ): + for src, tgt, edge_data in relations_to_update: + # Determine old entity pair (before rename) + old_src = original_entity_name if src == entity_name else src + old_tgt = original_entity_name if tgt == entity_name else tgt + + # Normalize entity order for both old and new keys + old_normalized_src, old_normalized_tgt = sorted( + [old_src, old_tgt] + ) + new_normalized_src, new_normalized_tgt = sorted([src, tgt]) + + # Generate storage keys + old_storage_key = make_relation_chunk_key( + old_normalized_src, old_normalized_tgt + ) + new_storage_key = make_relation_chunk_key( + new_normalized_src, new_normalized_tgt + ) + + # If keys are different, we need to move the chunk tracking + if old_storage_key != new_storage_key: + # Get complete chunk IDs from storage first (preserves all existing references) + old_stored_data = await relation_chunks_storage.get_by_id( + old_storage_key + ) + relation_chunk_ids = [] + + if old_stored_data and isinstance(old_stored_data, dict): + # Use complete chunk_ids from storage + relation_chunk_ids = [ + cid + for cid in old_stored_data.get("chunk_ids", []) + if cid + ] + else: + # Fallback: if storage has no data, use graph's source_id + relation_source_id = edge_data.get("source_id", "") + relation_chunk_ids = [ + cid + for cid in relation_source_id.split(GRAPH_FIELD_SEP) + if cid + ] + + # Delete old relation chunk tracking + await relation_chunks_storage.delete([old_storage_key]) + + # Create new relation chunk tracking (migrate complete data) + if relation_chunk_ids: + await relation_chunks_storage.upsert( + { + new_storage_key: { + "chunk_ids": relation_chunk_ids, + "count": len(relation_chunk_ids), + } + } + ) + logger.info( + f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + ) + + # 5. Save changes + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully updated") + logger.info(f"Entity Edit: `{entity_name}` successfully updated") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -356,22 +593,6 @@ async def aedit_entity( raise -async def _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph -) -> None: - """Callback after entity editing is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def aedit_relation( chunk_entity_relation_graph, entities_vdb, @@ -379,10 +600,12 @@ async def aedit_relation( source_entity: str, target_entity: str, updated_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: chunk_entity_relation_graph: Graph storage instance @@ -391,6 +614,7 @@ async def aedit_relation( source_entity: Name of the source entity target_entity: Name of the target entity updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"} + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation Returns: Dictionary containing updated relation information @@ -399,6 +623,10 @@ async def aedit_relation( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # 1. Get current relation information edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -411,12 +639,14 @@ async def aedit_relation( source_entity, target_entity ) # Important: First delete the old relation record from the vector database - old_relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await relationships_vdb.delete([old_relation_id]) - logger.info( - f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" + # Delete both permutations to handle relationships created before normalization + rel_ids_to_delete = [ + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + await relationships_vdb.delete(rel_ids_to_delete) + logger.debug( + f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`" ) # 2. Update relation information in the graph @@ -455,11 +685,79 @@ async def aedit_relation( # Update vector database await relationships_vdb.upsert(relation_data) - # 4. Save changes - await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + # 4. Update relation_chunks_storage in two scenarios: + # - source_id has changed (edit scenario) + # - relation_chunks_storage has no existing data (migration/initialization scenario) + if relation_chunks_storage is not None: + from .utils import ( + make_relation_chunk_key, + compute_incremental_chunk_ids, + ) + + storage_key = make_relation_chunk_key(source_entity, target_entity) + + # Check if storage has existing data + stored_data = await relation_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + # Get old and new source_id + old_source_id = edge_data.get("source_id", "") + old_chunk_ids = [ + cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + new_source_id = new_edge_data.get("source_id", "") + new_chunk_ids = [ + cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid + ] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + # Update if: source_id changed OR storage has no data + if source_id_changed or not has_stored_data: + # Get existing full chunk_ids from storage + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + # If no stored data exists, use old source_id as baseline + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + # Use utility function to compute incremental updates + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + # Update storage (Update even if updated_chunk_ids is empty) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + logger.info( + f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`" + ) + + # 5. Save changes + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, + ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated" ) return await get_relation_info( chunk_entity_relation_graph, @@ -475,29 +773,19 @@ async def aedit_relation( raise -async def _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: - """Callback after relation editing is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def acreate_entity( chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str, entity_data: dict[str, Any], + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new entity. Creates a new entity in the knowledge graph and adds it to the vector database. + Also synchronizes entity_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -505,6 +793,8 @@ async def acreate_entity( relationships_vdb: Vector database storage for relationships entity_name: Name of the new entity entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing created entity information @@ -555,12 +845,34 @@ async def acreate_entity( # Update vector database await entities_vdb.upsert(entity_data_for_vdb) + # Update entity_chunks_storage to track chunk references + if entity_chunks_storage is not None: + source_id = node_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`" + ) + # Save changes - await _edit_entity_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) - logger.info(f"Entity '{entity_name}' successfully created") + logger.info(f"Entity Create: '{entity_name}' successfully created") return await get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -579,10 +891,12 @@ async def acreate_relation( source_entity: str, target_entity: str, relation_data: dict[str, Any], + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously create a new relation between entities. Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + Also synchronizes relation_chunks_storage to track chunk references. Args: chunk_entity_relation_graph: Graph storage instance @@ -591,6 +905,7 @@ async def acreate_relation( source_entity: Name of the source entity target_entity: Name of the target entity relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation Returns: Dictionary containing created relation information @@ -632,6 +947,10 @@ async def acreate_relation( source_entity, target_entity, edge_data ) + # Normalize entity order for undirected relation vector (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Prepare content for embedding description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -663,11 +982,39 @@ async def acreate_relation( # Update vector database await relationships_vdb.upsert(relation_data_for_vdb) + # Update relation_chunks_storage to track chunk references + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + # Normalize entity order for consistent key generation + normalized_src, normalized_tgt = sorted([source_entity, target_entity]) + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + source_id = edge_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + if chunk_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + logger.info( + f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`" + ) + # Save changes - await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, + ) logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully created" + f"Relation Create: `{source_entity}`~`{target_entity}` successfully created" ) return await get_relation_info( chunk_entity_relation_graph, @@ -875,19 +1222,27 @@ async def amerge_entities( tgt = rel_data["tgt"] edge_data = rel_data["data"] + # Normalize entity order for consistent vector storage + normalized_src, normalized_tgt = sorted([src, tgt]) + description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") source_id = edge_data.get("source_id", "") weight = float(edge_data.get("weight", 1.0)) - content = f"{keywords}\t{src}\n{tgt}\n{description}" - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # Use normalized order for content and relation ID + content = ( + f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" + ) + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) relation_data_for_vdb = { relation_id: { "content": content, - "src_id": src, - "tgt_id": tgt, + "src_id": normalized_src, + "tgt_id": normalized_tgt, "source_id": source_id, "description": description, "keywords": keywords, @@ -917,8 +1272,10 @@ async def amerge_entities( ) # 10. Save changes - await _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, ) logger.info( @@ -1044,22 +1401,6 @@ def _merge_relation_attributes( return merged_data -async def _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph -) -> None: - """Callback after entity merging is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def get_entity_info( chunk_entity_relation_graph, entities_vdb, @@ -1094,7 +1435,18 @@ async def get_relation_info( tgt_entity: str, include_vector_data: bool = False, ) -> dict[str, str | None | dict[str, str]]: - """Get detailed information of a relationship""" + """ + Get detailed information of a relationship between two entities. + Relationship is unidirectional, swap src_entity and tgt_entity does not change the relationship. + + Args: + src_entity: Source entity name + tgt_entity: Target entity name + include_vector_data: Whether to include vector database information + + Returns: + Dictionary containing relationship information + """ # Get information from the graph edge_data = await chunk_entity_relation_graph.get_edge(src_entity, tgt_entity)