Refactor graph utils to use unified persistence callback

- Add _persist_graph_updates function
- Remove duplicate callback functions
This commit is contained in:
yangdx 2025-10-26 20:20:16 +08:00
parent a3370b024d
commit 6015e8bc68
2 changed files with 85 additions and 124 deletions

View file

@ -365,7 +365,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
This endpoint establishes an undirected relationship between two existing entities.
The provided source/target order is accepted for convenience, but the backend
stored edge is undirected and may be returned with the entities swapped.
stored edge is undirected and may be returned with the entities swapped.
Both entities must already exist in the knowledge graph. The system automatically
generates vector embeddings for the relationship to enable semantic search and graph traversal.

View file

@ -11,6 +11,49 @@ from .utils import compute_mdhash_id, logger
from .base import StorageNameSpace
async def _persist_graph_updates(
entities_vdb=None,
relationships_vdb=None,
chunk_entity_relation_graph=None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Unified callback to persist updates after graph operations.
Ensures all relevant storage instances are properly persisted after
operations like delete, edit, create, or merge.
Args:
entities_vdb: Entity vector database storage (optional)
relationships_vdb: Relationship vector database storage (optional)
chunk_entity_relation_graph: Graph storage instance (optional)
entity_chunks_storage: Entity-chunk tracking storage (optional)
relation_chunks_storage: Relation-chunk tracking storage (optional)
"""
storages = []
# Collect all non-None storage instances
if entities_vdb is not None:
storages.append(entities_vdb)
if relationships_vdb is not None:
storages.append(relationships_vdb)
if chunk_entity_relation_graph is not None:
storages.append(chunk_entity_relation_graph)
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
# Persist all storage instances in parallel
if storages:
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def adelete_by_entity(
chunk_entity_relation_graph,
entities_vdb,
@ -64,7 +107,9 @@ async def adelete_by_entity(
for src, tgt in edges:
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([src, tgt])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
storage_key = make_relation_chunk_key(
normalized_src, normalized_tgt
)
relation_keys_to_delete.append(storage_key)
if relation_keys_to_delete:
@ -79,12 +124,12 @@ async def adelete_by_entity(
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
logger.info(message)
await _delete_by_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage,
relation_chunks_storage,
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
return DeletionResult(
status="success",
@ -103,28 +148,6 @@ async def adelete_by_entity(
)
async def _delete_by_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Callback after entity deletion is complete, ensures updates are persisted"""
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def adelete_by_relation(
chunk_entity_relation_graph,
relationships_vdb,
@ -173,7 +196,7 @@ async def adelete_by_relation(
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
await relation_chunks_storage.delete([storage_key])
logger.info(
f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
@ -194,8 +217,10 @@ async def adelete_by_relation(
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
logger.info(message)
await _delete_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
return DeletionResult(
status="success",
@ -214,22 +239,6 @@ async def adelete_by_relation(
)
async def _delete_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
) -> None:
"""Callback after relation deletion is complete, ensures updates are persisted"""
storages = [relationships_vdb, chunk_entity_relation_graph]
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def aedit_entity(
chunk_entity_relation_graph,
entities_vdb,
@ -564,12 +573,12 @@ async def aedit_entity(
)
# 5. Save changes
await _edit_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage,
relation_chunks_storage,
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
@ -584,28 +593,6 @@ async def aedit_entity(
raise
async def _edit_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Callback after entity editing is complete, ensures updates are persisted"""
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def aedit_relation(
chunk_entity_relation_graph,
entities_vdb,
@ -763,8 +750,10 @@ async def aedit_relation(
)
# 5. Save changes
await _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(
@ -784,22 +773,6 @@ async def aedit_relation(
raise
async def _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
) -> None:
"""Callback after relation editing is complete, ensures updates are persisted"""
storages = [relationships_vdb, chunk_entity_relation_graph]
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def acreate_entity(
chunk_entity_relation_graph,
entities_vdb,
@ -876,7 +849,7 @@ async def acreate_entity(
if entity_chunks_storage is not None:
source_id = node_data.get("source_id", "")
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
if chunk_ids:
await entity_chunks_storage.upsert(
{
@ -891,12 +864,12 @@ async def acreate_entity(
)
# Save changes
await _edit_entity_done(
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage,
relation_chunks_storage,
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity Create: '{entity_name}' successfully created")
@ -1016,10 +989,10 @@ async def acreate_relation(
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
source_id = edge_data.get("source_id", "")
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
if chunk_ids:
await relation_chunks_storage.upsert(
{
@ -1034,8 +1007,10 @@ async def acreate_relation(
)
# Save changes
await _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(
@ -1297,8 +1272,10 @@ async def amerge_entities(
)
# 10. Save changes
await _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
)
logger.info(
@ -1424,22 +1401,6 @@ def _merge_relation_attributes(
return merged_data
async def _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
) -> None:
"""Callback after entity merging is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def get_entity_info(
chunk_entity_relation_graph,
entities_vdb,