Add chunk tracking cleanup to entity/relation deletion and creation
• Clean up chunk storage on delete • Track chunks in create operations • Normalize relation keys consistently
This commit is contained in:
parent
bf1897a67e
commit
a3370b024d
1 changed files with 153 additions and 35 deletions
|
|
@ -12,15 +12,24 @@ from .base import StorageNameSpace
|
|||
|
||||
|
||||
async def adelete_by_entity(
|
||||
chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str
|
||||
chunk_entity_relation_graph,
|
||||
entities_vdb,
|
||||
relationships_vdb,
|
||||
entity_name: str,
|
||||
entity_chunks_storage=None,
|
||||
relation_chunks_storage=None,
|
||||
) -> DeletionResult:
|
||||
"""Asynchronously delete an entity and all its relationships.
|
||||
|
||||
Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking.
|
||||
|
||||
Args:
|
||||
chunk_entity_relation_graph: Graph storage instance
|
||||
entities_vdb: Vector database storage for entities
|
||||
relationships_vdb: Vector database storage for relationships
|
||||
entity_name: Name of the entity to delete
|
||||
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
||||
"""
|
||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||
# Use graph database lock to ensure atomic graph and vector db operations
|
||||
|
|
@ -39,14 +48,43 @@ async def adelete_by_entity(
|
|||
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
|
||||
related_relations_count = len(edges) if edges else 0
|
||||
|
||||
# Clean up chunk tracking storages before deletion
|
||||
if entity_chunks_storage is not None:
|
||||
# Delete entity's entry from entity_chunks_storage
|
||||
await entity_chunks_storage.delete([entity_name])
|
||||
logger.info(
|
||||
f"Entity Delete: removed chunk tracking for `{entity_name}`"
|
||||
)
|
||||
|
||||
if relation_chunks_storage is not None and edges:
|
||||
# Delete all related relationships from relation_chunks_storage
|
||||
from .utils import make_relation_chunk_key
|
||||
|
||||
relation_keys_to_delete = []
|
||||
for src, tgt in edges:
|
||||
# Normalize entity order for consistent key generation
|
||||
normalized_src, normalized_tgt = sorted([src, tgt])
|
||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
||||
relation_keys_to_delete.append(storage_key)
|
||||
|
||||
if relation_keys_to_delete:
|
||||
await relation_chunks_storage.delete(relation_keys_to_delete)
|
||||
logger.info(
|
||||
f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations"
|
||||
)
|
||||
|
||||
await entities_vdb.delete_entity(entity_name)
|
||||
await relationships_vdb.delete_entity_relation(entity_name)
|
||||
await chunk_entity_relation_graph.delete_node(entity_name)
|
||||
|
||||
message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted."
|
||||
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
|
||||
logger.info(message)
|
||||
await _delete_by_entity_done(
|
||||
entities_vdb, relationships_vdb, chunk_entity_relation_graph
|
||||
entities_vdb,
|
||||
relationships_vdb,
|
||||
chunk_entity_relation_graph,
|
||||
entity_chunks_storage,
|
||||
relation_chunks_storage,
|
||||
)
|
||||
return DeletionResult(
|
||||
status="success",
|
||||
|
|
@ -66,17 +104,23 @@ async def adelete_by_entity(
|
|||
|
||||
|
||||
async def _delete_by_entity_done(
|
||||
entities_vdb, relationships_vdb, chunk_entity_relation_graph
|
||||
entities_vdb,
|
||||
relationships_vdb,
|
||||
chunk_entity_relation_graph,
|
||||
entity_chunks_storage=None,
|
||||
relation_chunks_storage=None,
|
||||
) -> None:
|
||||
"""Callback after entity deletion is complete, ensures updates are persisted"""
|
||||
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
|
||||
if entity_chunks_storage is not None:
|
||||
storages.append(entity_chunks_storage)
|
||||
if relation_chunks_storage is not None:
|
||||
storages.append(relation_chunks_storage)
|
||||
|
||||
await asyncio.gather(
|
||||
*[
|
||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||
for storage_inst in [ # type: ignore
|
||||
entities_vdb,
|
||||
relationships_vdb,
|
||||
chunk_entity_relation_graph,
|
||||
]
|
||||
for storage_inst in storages # type: ignore
|
||||
]
|
||||
)
|
||||
|
||||
|
|
@ -86,14 +130,18 @@ async def adelete_by_relation(
|
|||
relationships_vdb,
|
||||
source_entity: str,
|
||||
target_entity: str,
|
||||
relation_chunks_storage=None,
|
||||
) -> DeletionResult:
|
||||
"""Asynchronously delete a relation between two entities.
|
||||
|
||||
Also cleans up relation_chunks_storage to remove chunk tracking.
|
||||
|
||||
Args:
|
||||
chunk_entity_relation_graph: Graph storage instance
|
||||
relationships_vdb: Vector database storage for relationships
|
||||
source_entity: Name of the source entity
|
||||
target_entity: Name of the target entity
|
||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
||||
"""
|
||||
relation_str = f"{source_entity} -> {target_entity}"
|
||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||
|
|
@ -118,6 +166,19 @@ async def adelete_by_relation(
|
|||
status_code=404,
|
||||
)
|
||||
|
||||
# Clean up chunk tracking storage before deletion
|
||||
if relation_chunks_storage is not None:
|
||||
from .utils import make_relation_chunk_key
|
||||
|
||||
# Normalize entity order for consistent key generation
|
||||
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
||||
|
||||
await relation_chunks_storage.delete([storage_key])
|
||||
logger.info(
|
||||
f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
|
||||
)
|
||||
|
||||
# Delete relation from vector database
|
||||
rel_ids_to_delete = [
|
||||
compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
|
||||
|
|
@ -131,9 +192,11 @@ async def adelete_by_relation(
|
|||
[(source_entity, target_entity)]
|
||||
)
|
||||
|
||||
message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
|
||||
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
|
||||
logger.info(message)
|
||||
await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph)
|
||||
await _delete_relation_done(
|
||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
|
||||
)
|
||||
return DeletionResult(
|
||||
status="success",
|
||||
doc_id=relation_str,
|
||||
|
|
@ -151,15 +214,18 @@ async def adelete_by_relation(
|
|||
)
|
||||
|
||||
|
||||
async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None:
|
||||
async def _delete_relation_done(
|
||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
|
||||
) -> None:
|
||||
"""Callback after relation deletion is complete, ensures updates are persisted"""
|
||||
storages = [relationships_vdb, chunk_entity_relation_graph]
|
||||
if relation_chunks_storage is not None:
|
||||
storages.append(relation_chunks_storage)
|
||||
|
||||
await asyncio.gather(
|
||||
*[
|
||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||
for storage_inst in [ # type: ignore
|
||||
relationships_vdb,
|
||||
chunk_entity_relation_graph,
|
||||
]
|
||||
for storage_inst in storages # type: ignore
|
||||
]
|
||||
)
|
||||
|
||||
|
|
@ -506,7 +572,7 @@ async def aedit_entity(
|
|||
relation_chunks_storage,
|
||||
)
|
||||
|
||||
logger.info(f"Entity '{entity_name}' successfully updated")
|
||||
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
|
||||
return await get_entity_info(
|
||||
chunk_entity_relation_graph,
|
||||
entities_vdb,
|
||||
|
|
@ -586,12 +652,14 @@ async def aedit_relation(
|
|||
source_entity, target_entity
|
||||
)
|
||||
# Important: First delete the old relation record from the vector database
|
||||
old_relation_id = compute_mdhash_id(
|
||||
source_entity + target_entity, prefix="rel-"
|
||||
)
|
||||
await relationships_vdb.delete([old_relation_id])
|
||||
logger.info(
|
||||
f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}"
|
||||
# Delete both permutations to handle relationships created before normalization
|
||||
rel_ids_to_delete = [
|
||||
compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
|
||||
compute_mdhash_id(target_entity + source_entity, prefix="rel-"),
|
||||
]
|
||||
await relationships_vdb.delete(rel_ids_to_delete)
|
||||
logger.debug(
|
||||
f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`"
|
||||
)
|
||||
|
||||
# 2. Update relation information in the graph
|
||||
|
|
@ -690,14 +758,8 @@ async def aedit_relation(
|
|||
}
|
||||
)
|
||||
|
||||
reason = (
|
||||
"source_id changed"
|
||||
if source_id_changed
|
||||
else "initialized from graph"
|
||||
)
|
||||
logger.info(
|
||||
f"Updated relation_chunks_storage for '{source_entity}' -> '{target_entity}': "
|
||||
f"{len(updated_chunk_ids)} chunks ({reason})"
|
||||
f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`"
|
||||
)
|
||||
|
||||
# 5. Save changes
|
||||
|
|
@ -706,7 +768,7 @@ async def aedit_relation(
|
|||
)
|
||||
|
||||
logger.info(
|
||||
f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
|
||||
f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated"
|
||||
)
|
||||
return await get_relation_info(
|
||||
chunk_entity_relation_graph,
|
||||
|
|
@ -744,10 +806,13 @@ async def acreate_entity(
|
|||
relationships_vdb,
|
||||
entity_name: str,
|
||||
entity_data: dict[str, Any],
|
||||
entity_chunks_storage=None,
|
||||
relation_chunks_storage=None,
|
||||
) -> dict[str, Any]:
|
||||
"""Asynchronously create a new entity.
|
||||
|
||||
Creates a new entity in the knowledge graph and adds it to the vector database.
|
||||
Also synchronizes entity_chunks_storage to track chunk references.
|
||||
|
||||
Args:
|
||||
chunk_entity_relation_graph: Graph storage instance
|
||||
|
|
@ -755,6 +820,8 @@ async def acreate_entity(
|
|||
relationships_vdb: Vector database storage for relationships
|
||||
entity_name: Name of the new entity
|
||||
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
|
||||
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
||||
|
||||
Returns:
|
||||
Dictionary containing created entity information
|
||||
|
|
@ -805,12 +872,34 @@ async def acreate_entity(
|
|||
# Update vector database
|
||||
await entities_vdb.upsert(entity_data_for_vdb)
|
||||
|
||||
# Update entity_chunks_storage to track chunk references
|
||||
if entity_chunks_storage is not None:
|
||||
source_id = node_data.get("source_id", "")
|
||||
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
||||
|
||||
if chunk_ids:
|
||||
await entity_chunks_storage.upsert(
|
||||
{
|
||||
entity_name: {
|
||||
"chunk_ids": chunk_ids,
|
||||
"count": len(chunk_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`"
|
||||
)
|
||||
|
||||
# Save changes
|
||||
await _edit_entity_done(
|
||||
entities_vdb, relationships_vdb, chunk_entity_relation_graph
|
||||
entities_vdb,
|
||||
relationships_vdb,
|
||||
chunk_entity_relation_graph,
|
||||
entity_chunks_storage,
|
||||
relation_chunks_storage,
|
||||
)
|
||||
|
||||
logger.info(f"Entity '{entity_name}' successfully created")
|
||||
logger.info(f"Entity Create: '{entity_name}' successfully created")
|
||||
return await get_entity_info(
|
||||
chunk_entity_relation_graph,
|
||||
entities_vdb,
|
||||
|
|
@ -829,10 +918,12 @@ async def acreate_relation(
|
|||
source_entity: str,
|
||||
target_entity: str,
|
||||
relation_data: dict[str, Any],
|
||||
relation_chunks_storage=None,
|
||||
) -> dict[str, Any]:
|
||||
"""Asynchronously create a new relation between entities.
|
||||
|
||||
Creates a new relation (edge) in the knowledge graph and adds it to the vector database.
|
||||
Also synchronizes relation_chunks_storage to track chunk references.
|
||||
|
||||
Args:
|
||||
chunk_entity_relation_graph: Graph storage instance
|
||||
|
|
@ -841,6 +932,7 @@ async def acreate_relation(
|
|||
source_entity: Name of the source entity
|
||||
target_entity: Name of the target entity
|
||||
relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
|
||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
||||
|
||||
Returns:
|
||||
Dictionary containing created relation information
|
||||
|
|
@ -917,11 +1009,37 @@ async def acreate_relation(
|
|||
# Update vector database
|
||||
await relationships_vdb.upsert(relation_data_for_vdb)
|
||||
|
||||
# Update relation_chunks_storage to track chunk references
|
||||
if relation_chunks_storage is not None:
|
||||
from .utils import make_relation_chunk_key
|
||||
|
||||
# Normalize entity order for consistent key generation
|
||||
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
|
||||
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
|
||||
|
||||
source_id = edge_data.get("source_id", "")
|
||||
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
|
||||
|
||||
if chunk_ids:
|
||||
await relation_chunks_storage.upsert(
|
||||
{
|
||||
storage_key: {
|
||||
"chunk_ids": chunk_ids,
|
||||
"count": len(chunk_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`"
|
||||
)
|
||||
|
||||
# Save changes
|
||||
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
|
||||
await _edit_relation_done(
|
||||
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Relation from '{source_entity}' to '{target_entity}' successfully created"
|
||||
f"Relation Create: `{source_entity}`~`{target_entity}` successfully created"
|
||||
)
|
||||
return await get_relation_info(
|
||||
chunk_entity_relation_graph,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue