diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index f4c29fc2..f59a7c3d 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -365,7 +365,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None): This endpoint establishes an undirected relationship between two existing entities. The provided source/target order is accepted for convenience, but the backend - stored edge is undirected and may be returned with the entities swapped. + stored edge is undirected and may be returned with the entities swapped. Both entities must already exist in the knowledge graph. The system automatically generates vector embeddings for the relationship to enable semantic search and graph traversal. diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index b3b042c9..6b06cf3c 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -11,6 +11,49 @@ from .utils import compute_mdhash_id, logger from .base import StorageNameSpace +async def _persist_graph_updates( + entities_vdb=None, + relationships_vdb=None, + chunk_entity_relation_graph=None, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> None: + """Unified callback to persist updates after graph operations. + + Ensures all relevant storage instances are properly persisted after + operations like delete, edit, create, or merge. + + Args: + entities_vdb: Entity vector database storage (optional) + relationships_vdb: Relationship vector database storage (optional) + chunk_entity_relation_graph: Graph storage instance (optional) + entity_chunks_storage: Entity-chunk tracking storage (optional) + relation_chunks_storage: Relation-chunk tracking storage (optional) + """ + storages = [] + + # Collect all non-None storage instances + if entities_vdb is not None: + storages.append(entities_vdb) + if relationships_vdb is not None: + storages.append(relationships_vdb) + if chunk_entity_relation_graph is not None: + storages.append(chunk_entity_relation_graph) + if entity_chunks_storage is not None: + storages.append(entity_chunks_storage) + if relation_chunks_storage is not None: + storages.append(relation_chunks_storage) + + # Persist all storage instances in parallel + if storages: + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in storages # type: ignore + ] + ) + + async def adelete_by_entity( chunk_entity_relation_graph, entities_vdb, @@ -64,7 +107,9 @@ async def adelete_by_entity( for src, tgt in edges: # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([src, tgt]) - storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + storage_key = make_relation_chunk_key( + normalized_src, normalized_tgt + ) relation_keys_to_delete.append(storage_key) if relation_keys_to_delete: @@ -79,12 +124,12 @@ async def adelete_by_entity( message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" logger.info(message) - await _delete_by_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) return DeletionResult( status="success", @@ -103,28 +148,6 @@ async def adelete_by_entity( ) -async def _delete_by_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage=None, - relation_chunks_storage=None, -) -> None: - """Callback after entity deletion is complete, ensures updates are persisted""" - storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] - if entity_chunks_storage is not None: - storages.append(entity_chunks_storage) - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def adelete_by_relation( chunk_entity_relation_graph, relationships_vdb, @@ -173,7 +196,7 @@ async def adelete_by_relation( # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([source_entity, target_entity]) storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) - + await relation_chunks_storage.delete([storage_key]) logger.info( f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`" @@ -194,8 +217,10 @@ async def adelete_by_relation( message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" logger.info(message) - await _delete_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) return DeletionResult( status="success", @@ -214,22 +239,6 @@ async def adelete_by_relation( ) -async def _delete_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None -) -> None: - """Callback after relation deletion is complete, ensures updates are persisted""" - storages = [relationships_vdb, chunk_entity_relation_graph] - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def aedit_entity( chunk_entity_relation_graph, entities_vdb, @@ -564,12 +573,12 @@ async def aedit_entity( ) # 5. Save changes - await _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info(f"Entity Edit: `{entity_name}` successfully updated") @@ -584,28 +593,6 @@ async def aedit_entity( raise -async def _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage=None, - relation_chunks_storage=None, -) -> None: - """Callback after entity editing is complete, ensures updates are persisted""" - storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph] - if entity_chunks_storage is not None: - storages.append(entity_chunks_storage) - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def aedit_relation( chunk_entity_relation_graph, entities_vdb, @@ -763,8 +750,10 @@ async def aedit_relation( ) # 5. Save changes - await _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) logger.info( @@ -784,22 +773,6 @@ async def aedit_relation( raise -async def _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None -) -> None: - """Callback after relation editing is complete, ensures updates are persisted""" - storages = [relationships_vdb, chunk_entity_relation_graph] - if relation_chunks_storage is not None: - storages.append(relation_chunks_storage) - - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in storages # type: ignore - ] - ) - - async def acreate_entity( chunk_entity_relation_graph, entities_vdb, @@ -876,7 +849,7 @@ async def acreate_entity( if entity_chunks_storage is not None: source_id = node_data.get("source_id", "") chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] - + if chunk_ids: await entity_chunks_storage.upsert( { @@ -891,12 +864,12 @@ async def acreate_entity( ) # Save changes - await _edit_entity_done( - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - entity_chunks_storage, - relation_chunks_storage, + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info(f"Entity Create: '{entity_name}' successfully created") @@ -1016,10 +989,10 @@ async def acreate_relation( # Normalize entity order for consistent key generation normalized_src, normalized_tgt = sorted([source_entity, target_entity]) storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) - + source_id = edge_data.get("source_id", "") chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] - + if chunk_ids: await relation_chunks_storage.upsert( { @@ -1034,8 +1007,10 @@ async def acreate_relation( ) # Save changes - await _edit_relation_done( - relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage + await _persist_graph_updates( + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + relation_chunks_storage=relation_chunks_storage, ) logger.info( @@ -1297,8 +1272,10 @@ async def amerge_entities( ) # 10. Save changes - await _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, ) logger.info( @@ -1424,22 +1401,6 @@ def _merge_relation_attributes( return merged_data -async def _merge_entities_done( - entities_vdb, relationships_vdb, chunk_entity_relation_graph -) -> None: - """Callback after entity merging is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - entities_vdb, - relationships_vdb, - chunk_entity_relation_graph, - ] - ] - ) - - async def get_entity_info( chunk_entity_relation_graph, entities_vdb,