Refactor graph utils to use unified persistence callback
- Add _persist_graph_updates function - Remove duplicate callback functions
This commit is contained in:
parent
a3370b024d
commit
6015e8bc68
2 changed files with 85 additions and 124 deletions
|
|
@ -365,7 +365,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
|
||||||
|
|
||||||
This endpoint establishes an undirected relationship between two existing entities.
|
This endpoint establishes an undirected relationship between two existing entities.
|
||||||
The provided source/target order is accepted for convenience, but the backend
|
The provided source/target order is accepted for convenience, but the backend
|
||||||
stored edge is undirected and may be returned with the entities swapped.
|
stored edge is undirected and may be returned with the entities swapped.
|
||||||
Both entities must already exist in the knowledge graph. The system automatically
|
Both entities must already exist in the knowledge graph. The system automatically
|
||||||
generates vector embeddings for the relationship to enable semantic search and graph traversal.
|
generates vector embeddings for the relationship to enable semantic search and graph traversal.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,49 @@ from .utils import compute_mdhash_id, logger
|
||||||
from .base import StorageNameSpace
|
from .base import StorageNameSpace
|
||||||
|
|
||||||
|
|
||||||
|
async def _persist_graph_updates(
|
||||||
|
entities_vdb=None,
|
||||||
|
relationships_vdb=None,
|
||||||
|
chunk_entity_relation_graph=None,
|
||||||
|
entity_chunks_storage=None,
|
||||||
|
relation_chunks_storage=None,
|
||||||
|
) -> None:
|
||||||
|
"""Unified callback to persist updates after graph operations.
|
||||||
|
|
||||||
|
Ensures all relevant storage instances are properly persisted after
|
||||||
|
operations like delete, edit, create, or merge.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entities_vdb: Entity vector database storage (optional)
|
||||||
|
relationships_vdb: Relationship vector database storage (optional)
|
||||||
|
chunk_entity_relation_graph: Graph storage instance (optional)
|
||||||
|
entity_chunks_storage: Entity-chunk tracking storage (optional)
|
||||||
|
relation_chunks_storage: Relation-chunk tracking storage (optional)
|
||||||
|
"""
|
||||||
|
storages = []
|
||||||
|
|
||||||
|
# Collect all non-None storage instances
|
||||||
|
if entities_vdb is not None:
|
||||||
|
storages.append(entities_vdb)
|
||||||
|
if relationships_vdb is not None:
|
||||||
|
storages.append(relationships_vdb)
|
||||||
|
if chunk_entity_relation_graph is not None:
|
||||||
|
storages.append(chunk_entity_relation_graph)
|
||||||
|
if entity_chunks_storage is not None:
|
||||||
|
storages.append(entity_chunks_storage)
|
||||||
|
if relation_chunks_storage is not None:
|
||||||
|
storages.append(relation_chunks_storage)
|
||||||
|
|
||||||
|
# Persist all storage instances in parallel
|
||||||
|
if storages:
|
||||||
|
await asyncio.gather(
|
||||||
|
*[
|
||||||
|
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||||
|
for storage_inst in storages # type: ignore
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def adelete_by_entity(
|
async def adelete_by_entity(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
entities_vdb,
|
entities_vdb,
|
||||||
|
|
@ -64,7 +107,9 @@ async def adelete_by_entity(
|
||||||
for src, tgt in edges:
|
for src, tgt in edges:
|
||||||
# Normalize entity order for consistent key generation
|
# Normalize entity order for consistent key generation
|
||||||
normalized_src, normalized_tgt = sorted([src, tgt])
|
normalized_src, normalized_tgt = sorted([src, tgt])
|
||||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
storage_key = make_relation_chunk_key(
|
||||||
|
normalized_src, normalized_tgt
|
||||||
|
)
|
||||||
relation_keys_to_delete.append(storage_key)
|
relation_keys_to_delete.append(storage_key)
|
||||||
|
|
||||||
if relation_keys_to_delete:
|
if relation_keys_to_delete:
|
||||||
|
|
@ -79,12 +124,12 @@ async def adelete_by_entity(
|
||||||
|
|
||||||
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
|
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
|
||||||
logger.info(message)
|
logger.info(message)
|
||||||
await _delete_by_entity_done(
|
await _persist_graph_updates(
|
||||||
entities_vdb,
|
entities_vdb=entities_vdb,
|
||||||
relationships_vdb,
|
relationships_vdb=relationships_vdb,
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
entity_chunks_storage,
|
entity_chunks_storage=entity_chunks_storage,
|
||||||
relation_chunks_storage,
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
return DeletionResult(
|
return DeletionResult(
|
||||||
status="success",
|
status="success",
|
||||||
|
|
@ -103,28 +148,6 @@ async def adelete_by_entity(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _delete_by_entity_done(
|
|
||||||
entities_vdb,
|
|
||||||
relationships_vdb,
|
|
||||||
chunk_entity_relation_graph,
|
|
||||||
entity_chunks_storage=None,
|
|
||||||
relation_chunks_storage=None,
|
|
||||||
) -> None:
|
|
||||||
"""Callback after entity deletion is complete, ensures updates are persisted"""
|
|
||||||
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
|
|
||||||
if entity_chunks_storage is not None:
|
|
||||||
storages.append(entity_chunks_storage)
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
storages.append(relation_chunks_storage)
|
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
*[
|
|
||||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
|
||||||
for storage_inst in storages # type: ignore
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def adelete_by_relation(
|
async def adelete_by_relation(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
relationships_vdb,
|
relationships_vdb,
|
||||||
|
|
@ -173,7 +196,7 @@ async def adelete_by_relation(
|
||||||
# Normalize entity order for consistent key generation
|
# Normalize entity order for consistent key generation
|
||||||
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
||||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
||||||
|
|
||||||
await relation_chunks_storage.delete([storage_key])
|
await relation_chunks_storage.delete([storage_key])
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
|
f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
|
||||||
|
|
@ -194,8 +217,10 @@ async def adelete_by_relation(
|
||||||
|
|
||||||
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
|
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
|
||||||
logger.info(message)
|
logger.info(message)
|
||||||
await _delete_relation_done(
|
await _persist_graph_updates(
|
||||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
|
relationships_vdb=relationships_vdb,
|
||||||
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
return DeletionResult(
|
return DeletionResult(
|
||||||
status="success",
|
status="success",
|
||||||
|
|
@ -214,22 +239,6 @@ async def adelete_by_relation(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _delete_relation_done(
|
|
||||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
|
|
||||||
) -> None:
|
|
||||||
"""Callback after relation deletion is complete, ensures updates are persisted"""
|
|
||||||
storages = [relationships_vdb, chunk_entity_relation_graph]
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
storages.append(relation_chunks_storage)
|
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
*[
|
|
||||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
|
||||||
for storage_inst in storages # type: ignore
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def aedit_entity(
|
async def aedit_entity(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
entities_vdb,
|
entities_vdb,
|
||||||
|
|
@ -564,12 +573,12 @@ async def aedit_entity(
|
||||||
)
|
)
|
||||||
|
|
||||||
# 5. Save changes
|
# 5. Save changes
|
||||||
await _edit_entity_done(
|
await _persist_graph_updates(
|
||||||
entities_vdb,
|
entities_vdb=entities_vdb,
|
||||||
relationships_vdb,
|
relationships_vdb=relationships_vdb,
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
entity_chunks_storage,
|
entity_chunks_storage=entity_chunks_storage,
|
||||||
relation_chunks_storage,
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
|
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
|
||||||
|
|
@ -584,28 +593,6 @@ async def aedit_entity(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def _edit_entity_done(
|
|
||||||
entities_vdb,
|
|
||||||
relationships_vdb,
|
|
||||||
chunk_entity_relation_graph,
|
|
||||||
entity_chunks_storage=None,
|
|
||||||
relation_chunks_storage=None,
|
|
||||||
) -> None:
|
|
||||||
"""Callback after entity editing is complete, ensures updates are persisted"""
|
|
||||||
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
|
|
||||||
if entity_chunks_storage is not None:
|
|
||||||
storages.append(entity_chunks_storage)
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
storages.append(relation_chunks_storage)
|
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
*[
|
|
||||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
|
||||||
for storage_inst in storages # type: ignore
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def aedit_relation(
|
async def aedit_relation(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
entities_vdb,
|
entities_vdb,
|
||||||
|
|
@ -763,8 +750,10 @@ async def aedit_relation(
|
||||||
)
|
)
|
||||||
|
|
||||||
# 5. Save changes
|
# 5. Save changes
|
||||||
await _edit_relation_done(
|
await _persist_graph_updates(
|
||||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
|
relationships_vdb=relationships_vdb,
|
||||||
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -784,22 +773,6 @@ async def aedit_relation(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def _edit_relation_done(
|
|
||||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
|
|
||||||
) -> None:
|
|
||||||
"""Callback after relation editing is complete, ensures updates are persisted"""
|
|
||||||
storages = [relationships_vdb, chunk_entity_relation_graph]
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
storages.append(relation_chunks_storage)
|
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
*[
|
|
||||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
|
||||||
for storage_inst in storages # type: ignore
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def acreate_entity(
|
async def acreate_entity(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
entities_vdb,
|
entities_vdb,
|
||||||
|
|
@ -876,7 +849,7 @@ async def acreate_entity(
|
||||||
if entity_chunks_storage is not None:
|
if entity_chunks_storage is not None:
|
||||||
source_id = node_data.get("source_id", "")
|
source_id = node_data.get("source_id", "")
|
||||||
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
||||||
|
|
||||||
if chunk_ids:
|
if chunk_ids:
|
||||||
await entity_chunks_storage.upsert(
|
await entity_chunks_storage.upsert(
|
||||||
{
|
{
|
||||||
|
|
@ -891,12 +864,12 @@ async def acreate_entity(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save changes
|
# Save changes
|
||||||
await _edit_entity_done(
|
await _persist_graph_updates(
|
||||||
entities_vdb,
|
entities_vdb=entities_vdb,
|
||||||
relationships_vdb,
|
relationships_vdb=relationships_vdb,
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
entity_chunks_storage,
|
entity_chunks_storage=entity_chunks_storage,
|
||||||
relation_chunks_storage,
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Entity Create: '{entity_name}' successfully created")
|
logger.info(f"Entity Create: '{entity_name}' successfully created")
|
||||||
|
|
@ -1016,10 +989,10 @@ async def acreate_relation(
|
||||||
# Normalize entity order for consistent key generation
|
# Normalize entity order for consistent key generation
|
||||||
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
||||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
||||||
|
|
||||||
source_id = edge_data.get("source_id", "")
|
source_id = edge_data.get("source_id", "")
|
||||||
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
||||||
|
|
||||||
if chunk_ids:
|
if chunk_ids:
|
||||||
await relation_chunks_storage.upsert(
|
await relation_chunks_storage.upsert(
|
||||||
{
|
{
|
||||||
|
|
@ -1034,8 +1007,10 @@ async def acreate_relation(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save changes
|
# Save changes
|
||||||
await _edit_relation_done(
|
await _persist_graph_updates(
|
||||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
|
relationships_vdb=relationships_vdb,
|
||||||
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -1297,8 +1272,10 @@ async def amerge_entities(
|
||||||
)
|
)
|
||||||
|
|
||||||
# 10. Save changes
|
# 10. Save changes
|
||||||
await _merge_entities_done(
|
await _persist_graph_updates(
|
||||||
entities_vdb, relationships_vdb, chunk_entity_relation_graph
|
entities_vdb=entities_vdb,
|
||||||
|
relationships_vdb=relationships_vdb,
|
||||||
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -1424,22 +1401,6 @@ def _merge_relation_attributes(
|
||||||
return merged_data
|
return merged_data
|
||||||
|
|
||||||
|
|
||||||
async def _merge_entities_done(
|
|
||||||
entities_vdb, relationships_vdb, chunk_entity_relation_graph
|
|
||||||
) -> None:
|
|
||||||
"""Callback after entity merging is complete, ensures updates are persisted"""
|
|
||||||
await asyncio.gather(
|
|
||||||
*[
|
|
||||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
|
||||||
for storage_inst in [ # type: ignore
|
|
||||||
entities_vdb,
|
|
||||||
relationships_vdb,
|
|
||||||
chunk_entity_relation_graph,
|
|
||||||
]
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_entity_info(
|
async def get_entity_info(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
entities_vdb,
|
entities_vdb,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue