Refactor graph utils to use unified persistence callback

- Add _persist_graph_updates function
- Remove duplicate callback functions
This commit is contained in:
yangdx 2025-10-26 20:20:16 +08:00
parent a3370b024d
commit 6015e8bc68
2 changed files with 85 additions and 124 deletions

View file

@ -11,6 +11,49 @@ from .utils import compute_mdhash_id, logger
from .base import StorageNameSpace from .base import StorageNameSpace
async def _persist_graph_updates(
entities_vdb=None,
relationships_vdb=None,
chunk_entity_relation_graph=None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Unified callback to persist updates after graph operations.
Ensures all relevant storage instances are properly persisted after
operations like delete, edit, create, or merge.
Args:
entities_vdb: Entity vector database storage (optional)
relationships_vdb: Relationship vector database storage (optional)
chunk_entity_relation_graph: Graph storage instance (optional)
entity_chunks_storage: Entity-chunk tracking storage (optional)
relation_chunks_storage: Relation-chunk tracking storage (optional)
"""
storages = []
# Collect all non-None storage instances
if entities_vdb is not None:
storages.append(entities_vdb)
if relationships_vdb is not None:
storages.append(relationships_vdb)
if chunk_entity_relation_graph is not None:
storages.append(chunk_entity_relation_graph)
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
# Persist all storage instances in parallel
if storages:
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def adelete_by_entity( async def adelete_by_entity(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -64,7 +107,9 @@ async def adelete_by_entity(
for src, tgt in edges: for src, tgt in edges:
# Normalize entity order for consistent key generation # Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([src, tgt]) normalized_src, normalized_tgt = sorted([src, tgt])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) storage_key = make_relation_chunk_key(
normalized_src, normalized_tgt
)
relation_keys_to_delete.append(storage_key) relation_keys_to_delete.append(storage_key)
if relation_keys_to_delete: if relation_keys_to_delete:
@ -79,12 +124,12 @@ async def adelete_by_entity(
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations" message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
logger.info(message) logger.info(message)
await _delete_by_entity_done( await _persist_graph_updates(
entities_vdb, entities_vdb=entities_vdb,
relationships_vdb, relationships_vdb=relationships_vdb,
chunk_entity_relation_graph, chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage, entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage, relation_chunks_storage=relation_chunks_storage,
) )
return DeletionResult( return DeletionResult(
status="success", status="success",
@ -103,28 +148,6 @@ async def adelete_by_entity(
) )
async def _delete_by_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Callback after entity deletion is complete, ensures updates are persisted"""
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def adelete_by_relation( async def adelete_by_relation(
chunk_entity_relation_graph, chunk_entity_relation_graph,
relationships_vdb, relationships_vdb,
@ -194,8 +217,10 @@ async def adelete_by_relation(
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully" message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
logger.info(message) logger.info(message)
await _delete_relation_done( await _persist_graph_updates(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
) )
return DeletionResult( return DeletionResult(
status="success", status="success",
@ -214,22 +239,6 @@ async def adelete_by_relation(
) )
async def _delete_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
) -> None:
"""Callback after relation deletion is complete, ensures updates are persisted"""
storages = [relationships_vdb, chunk_entity_relation_graph]
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def aedit_entity( async def aedit_entity(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -564,12 +573,12 @@ async def aedit_entity(
) )
# 5. Save changes # 5. Save changes
await _edit_entity_done( await _persist_graph_updates(
entities_vdb, entities_vdb=entities_vdb,
relationships_vdb, relationships_vdb=relationships_vdb,
chunk_entity_relation_graph, chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage, entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage, relation_chunks_storage=relation_chunks_storage,
) )
logger.info(f"Entity Edit: `{entity_name}` successfully updated") logger.info(f"Entity Edit: `{entity_name}` successfully updated")
@ -584,28 +593,6 @@ async def aedit_entity(
raise raise
async def _edit_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Callback after entity editing is complete, ensures updates are persisted"""
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def aedit_relation( async def aedit_relation(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -763,8 +750,10 @@ async def aedit_relation(
) )
# 5. Save changes # 5. Save changes
await _edit_relation_done( await _persist_graph_updates(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
) )
logger.info( logger.info(
@ -784,22 +773,6 @@ async def aedit_relation(
raise raise
async def _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
) -> None:
"""Callback after relation editing is complete, ensures updates are persisted"""
storages = [relationships_vdb, chunk_entity_relation_graph]
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def acreate_entity( async def acreate_entity(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -891,12 +864,12 @@ async def acreate_entity(
) )
# Save changes # Save changes
await _edit_entity_done( await _persist_graph_updates(
entities_vdb, entities_vdb=entities_vdb,
relationships_vdb, relationships_vdb=relationships_vdb,
chunk_entity_relation_graph, chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage, entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage, relation_chunks_storage=relation_chunks_storage,
) )
logger.info(f"Entity Create: '{entity_name}' successfully created") logger.info(f"Entity Create: '{entity_name}' successfully created")
@ -1034,8 +1007,10 @@ async def acreate_relation(
) )
# Save changes # Save changes
await _edit_relation_done( await _persist_graph_updates(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
) )
logger.info( logger.info(
@ -1297,8 +1272,10 @@ async def amerge_entities(
) )
# 10. Save changes # 10. Save changes
await _merge_entities_done( await _persist_graph_updates(
entities_vdb, relationships_vdb, chunk_entity_relation_graph entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
) )
logger.info( logger.info(
@ -1424,22 +1401,6 @@ def _merge_relation_attributes(
return merged_data return merged_data
async def _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
) -> None:
"""Callback after entity merging is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def get_entity_info( async def get_entity_info(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,