Enhance entity/relation editing with chunk tracking synchronization

• Add chunk storage sync to edit ops
• Implement incremental chunk ID updates
• Support entity renaming migrations
• Normalize relation keys consistently
• Preserve chunk references on edits
This commit is contained in:
yangdx 2025-10-26 14:34:56 +08:00
parent 11f1f3664b
commit 3fbd704bf9
3 changed files with 325 additions and 28 deletions

View file

@ -3582,6 +3582,7 @@ class LightRAG:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
entity_name: Name of the entity to edit
@ -3600,6 +3601,8 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
self.entity_chunks,
self.relation_chunks,
)
def edit_entity(
@ -3616,6 +3619,7 @@ class LightRAG:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
source_entity: Name of the source entity
@ -3634,6 +3638,7 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
self.relation_chunks,
)
def edit_relation(

View file

@ -2551,6 +2551,52 @@ def apply_source_ids_limit(
return truncated
def compute_incremental_chunk_ids(
existing_full_chunk_ids: list[str],
old_chunk_ids: list[str],
new_chunk_ids: list[str],
) -> list[str]:
"""
Compute incrementally updated chunk IDs based on changes.
This function applies delta changes (additions and removals) to an existing
list of chunk IDs while maintaining order and ensuring deduplication.
Delta additions from new_chunk_ids are placed at the end.
Args:
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
Returns:
Updated list of chunk IDs with deduplication
Example:
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
>>> old = ['chunk-1', 'chunk-2']
>>> new = ['chunk-2', 'chunk-4']
>>> compute_incremental_chunk_ids(existing, old, new)
['chunk-3', 'chunk-2', 'chunk-4']
"""
# Calculate changes
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
# Apply changes to full chunk_ids
# Step 1: Remove chunks that are no longer needed
updated_chunk_ids = [
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
]
# Step 2: Add new chunks (preserving order from new_chunk_ids)
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
for cid in new_chunk_ids:
if cid in chunks_to_add and cid not in updated_chunk_ids:
updated_chunk_ids.append(cid)
return updated_chunk_ids
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],

View file

@ -167,10 +167,13 @@ async def aedit_entity(
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -179,6 +182,8 @@ async def aedit_entity(
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing updated entity information
@ -187,6 +192,9 @@ async def aedit_entity(
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Save original entity name for chunk tracking updates
original_entity_name = entity_name
# 1. Get current entity information
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
@ -223,7 +231,9 @@ async def aedit_entity(
# If renaming entity
if is_renaming:
logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'")
logger.info(
f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`"
)
# Create new entity
await chunk_entity_relation_graph.upsert_node(
@ -269,35 +279,36 @@ async def aedit_entity(
# Delete old entity record from vector database
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
logger.info(
f"Deleted old entity '{entity_name}' and its vector embedding from database"
)
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)
logger.info(
f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
)
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
# Normalize entity order for consistent vector ID generation
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create new content for embedding
content = f"{src}\t{tgt}\n{keywords}\n{description}"
# Create content using normalized order
content = (
f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
)
# Calculate relationship ID
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
# Calculate relationship ID using normalized order
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
@ -310,6 +321,7 @@ async def aedit_entity(
# Update working entity name to new name
entity_name = new_entity_name
else:
# If not renaming, directly update node data
await chunk_entity_relation_graph.upsert_node(
@ -339,9 +351,155 @@ async def aedit_entity(
# Update vector database
await entities_vdb.upsert(entity_data)
# 4. Save changes
# 4. Update chunk tracking storages
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
# 4.1 Handle entity chunk tracking
if entity_chunks_storage is not None:
# Get storage key (use original name for renaming scenario)
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (even if updated_chunk_ids is empty)
if is_renaming:
# Renaming: delete old + create new
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
# Non-renaming: direct update
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
)
# 4.2 Handle relation chunk tracking if entity was renamed
if (
is_renaming
and relation_chunks_storage is not None
and relations_to_update
):
for src, tgt, edge_data in relations_to_update:
# Determine old entity pair (before rename)
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
# Normalize entity order for both old and new keys
old_normalized_src, old_normalized_tgt = sorted(
[old_src, old_tgt]
)
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
# Generate storage keys
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
# If keys are different, we need to move the chunk tracking
if old_storage_key != new_storage_key:
# Get complete chunk IDs from storage first (preserves all existing references)
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
# Use complete chunk_ids from storage
relation_chunk_ids = [
cid
for cid in old_stored_data.get("chunk_ids", [])
if cid
]
else:
# Fallback: if storage has no data, use graph's source_id
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
# Delete old relation chunk tracking
await relation_chunks_storage.delete([old_storage_key])
# Create new relation chunk tracking (migrate complete data)
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
# 5. Save changes
await _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage,
relation_chunks_storage,
)
logger.info(f"Entity '{entity_name}' successfully updated")
@ -357,17 +515,23 @@ async def aedit_entity(
async def _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Callback after entity editing is complete, ensures updates are persisted"""
storages = [entities_vdb, relationships_vdb, chunk_entity_relation_graph]
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
for storage_inst in storages # type: ignore
]
)
@ -379,10 +543,12 @@ async def aedit_relation(
source_entity: str,
target_entity: str,
updated_data: dict[str, Any],
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -391,6 +557,7 @@ async def aedit_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"}
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
Returns:
Dictionary containing updated relation information
@ -399,6 +566,10 @@ async def aedit_relation(
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# 1. Get current relation information
edge_exists = await chunk_entity_relation_graph.has_edge(
source_entity, target_entity
@ -455,8 +626,80 @@ async def aedit_relation(
# Update vector database
await relationships_vdb.upsert(relation_data)
# 4. Save changes
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
# 4. Update relation_chunks_storage in two scenarios:
# - source_id has changed (edit scenario)
# - relation_chunks_storage has no existing data (migration/initialization scenario)
if relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
storage_key = make_relation_chunk_key(source_entity, target_entity)
# Check if storage has existing data
stored_data = await relation_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = edge_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_edge_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (Update even if updated_chunk_ids is empty)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
reason = (
"source_id changed"
if source_id_changed
else "initialized from graph"
)
logger.info(
f"Updated relation_chunks_storage for '{source_entity}' -> '{target_entity}': "
f"{len(updated_chunk_ids)} chunks ({reason})"
)
# 5. Save changes
await _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage
)
logger.info(
f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
@ -475,15 +718,18 @@ async def aedit_relation(
raise
async def _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None:
async def _edit_relation_done(
relationships_vdb, chunk_entity_relation_graph, relation_chunks_storage=None
) -> None:
"""Callback after relation editing is complete, ensures updates are persisted"""
storages = [relationships_vdb, chunk_entity_relation_graph]
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
relationships_vdb,
chunk_entity_relation_graph,
]
for storage_inst in storages # type: ignore
]
)