Add chunk tracking support to graph entity/relation operations

- Pass chunk storage to graph ops
- Update chunks on entity renames
- Track chunks in create operations
- Merge chunk data in entity merges
- Clean up orphaned chunk entries
This commit is contained in:
yangdx 2025-10-26 01:20:13 +08:00
parent 11f1f3664b
commit bf6cf54765
2 changed files with 239 additions and 3 deletions

View file

@ -3600,6 +3600,8 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
)
def edit_entity(
@ -3634,6 +3636,7 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
relation_chunks_storage=self.relation_chunks,
)
def edit_relation(
@ -3666,6 +3669,7 @@ class LightRAG:
self.relationships_vdb,
entity_name,
entity_data,
entity_chunks_storage=self.entity_chunks,
)
def create_entity(
@ -3698,6 +3702,7 @@ class LightRAG:
source_entity,
target_entity,
relation_data,
relation_chunks_storage=self.relation_chunks,
)
def create_relation(
@ -3745,6 +3750,8 @@ class LightRAG:
target_entity,
merge_strategy,
target_entity_data,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
)
def merge_entities(

View file

@ -7,7 +7,7 @@ from typing import Any, cast
from .base import DeletionResult
from .kg.shared_storage import get_graph_db_lock
from .constants import GRAPH_FIELD_SEP
from .utils import compute_mdhash_id, logger
from .utils import compute_mdhash_id, logger, make_relation_chunk_key
from .base import StorageNameSpace
@ -167,6 +167,8 @@ async def aedit_entity(
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit entity information.
@ -179,6 +181,8 @@ async def aedit_entity(
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
entity_chunks_storage: Optional KV storage for entity chunk tracking
relation_chunks_storage: Optional KV storage for relation chunk tracking
Returns:
Dictionary containing updated entity information
@ -273,12 +277,69 @@ async def aedit_entity(
f"Deleted old entity '{entity_name}' and its vector embedding from database"
)
# Migrate entity_chunks data from old entity to new entity
if entity_chunks_storage:
old_chunks_data = await entity_chunks_storage.get_by_id(entity_name)
if old_chunks_data:
# Create chunk tracking for new entity
await entity_chunks_storage.upsert(
{new_entity_name: old_chunks_data}
)
# Delete old entity's chunk tracking
await entity_chunks_storage.delete([entity_name])
logger.info(
f"Migrated entity_chunks data from '{entity_name}' to '{new_entity_name}'"
)
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)
logger.info(
f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
)
# Migrate relation_chunks data for renamed entity relationships
if relation_chunks_storage and edges:
old_relation_keys_to_delete = []
new_relation_chunks_data = {}
for source, target in edges:
# Generate old relation key
old_key = make_relation_chunk_key(source, target)
# Get old relation chunks data
old_chunks_data = await relation_chunks_storage.get_by_id(
old_key
)
if old_chunks_data:
# Determine new source and target
new_src = (
new_entity_name if source == entity_name else source
)
new_tgt = (
new_entity_name if target == entity_name else target
)
# Generate new relation key
new_key = make_relation_chunk_key(new_src, new_tgt)
# Only update if key changed
if new_key != old_key:
# Store data for new key
new_relation_chunks_data[new_key] = old_chunks_data
# Mark old key for deletion
old_relation_keys_to_delete.append(old_key)
# Apply relation_chunks updates
if new_relation_chunks_data:
await relation_chunks_storage.upsert(new_relation_chunks_data)
if old_relation_keys_to_delete:
await relation_chunks_storage.delete(
old_relation_keys_to_delete
)
logger.info(
f"Migrated {len(old_relation_keys_to_delete)} relation_chunks entries for renamed entity"
)
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
description = edge_data.get("description", "")
@ -339,6 +400,24 @@ async def aedit_entity(
# Update vector database
await entities_vdb.upsert(entity_data)
# 3.5. Update entity_chunks if source_id was updated
if entity_chunks_storage and "source_id" in updated_data:
source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else []
chunk_ids = [cid for cid in source_ids if cid]
if chunk_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
"updated_at": int(time.time()),
}
}
)
logger.info(
f"Updated entity_chunks for '{entity_name}' with {len(chunk_ids)} chunk IDs"
)
# 4. Save changes
await _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
@ -379,6 +458,7 @@ async def aedit_relation(
source_entity: str,
target_entity: str,
updated_data: dict[str, Any],
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit relation information.
@ -391,6 +471,7 @@ async def aedit_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"}
relation_chunks_storage: Optional KV storage for relation chunk tracking
Returns:
Dictionary containing updated relation information
@ -455,6 +536,25 @@ async def aedit_relation(
# Update vector database
await relationships_vdb.upsert(relation_data)
# 3.5. Update relation_chunks if source_id was updated
if relation_chunks_storage and "source_id" in updated_data:
source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else []
chunk_ids = [cid for cid in source_ids if cid]
if chunk_ids:
storage_key = make_relation_chunk_key(source_entity, target_entity)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
"updated_at": int(time.time()),
}
}
)
logger.info(
f"Updated relation_chunks for '{source_entity}' -> '{target_entity}' with {len(chunk_ids)} chunk IDs"
)
# 4. Save changes
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
@ -494,6 +594,7 @@ async def acreate_entity(
relationships_vdb,
entity_name: str,
entity_data: dict[str, Any],
entity_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously create a new entity.
@ -505,6 +606,7 @@ async def acreate_entity(
relationships_vdb: Vector database storage for relationships
entity_name: Name of the new entity
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
entity_chunks_storage: Optional KV storage for entity chunk tracking
Returns:
Dictionary containing created entity information
@ -555,6 +657,24 @@ async def acreate_entity(
# Update vector database
await entities_vdb.upsert(entity_data_for_vdb)
# Create entity_chunks entry if source_id is provided
if entity_chunks_storage and source_id and source_id != "manual_creation":
source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else []
chunk_ids = [cid for cid in source_ids if cid]
if chunk_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
"created_at": int(time.time()),
}
}
)
logger.info(
f"Created entity_chunks for '{entity_name}' with {len(chunk_ids)} chunk IDs"
)
# Save changes
await _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
@ -579,6 +699,7 @@ async def acreate_relation(
source_entity: str,
target_entity: str,
relation_data: dict[str, Any],
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously create a new relation between entities.
@ -591,6 +712,7 @@ async def acreate_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
relation_chunks_storage: Optional KV storage for relation chunk tracking
Returns:
Dictionary containing created relation information
@ -663,6 +785,25 @@ async def acreate_relation(
# Update vector database
await relationships_vdb.upsert(relation_data_for_vdb)
# Create relation_chunks entry if source_id is provided
if relation_chunks_storage and source_id and source_id != "manual_creation":
source_ids = source_id.split(GRAPH_FIELD_SEP) if source_id else []
chunk_ids = [cid for cid in source_ids if cid]
if chunk_ids:
storage_key = make_relation_chunk_key(source_entity, target_entity)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
"created_at": int(time.time()),
}
}
)
logger.info(
f"Created relation_chunks for '{source_entity}' -> '{target_entity}' with {len(chunk_ids)} chunk IDs"
)
# Save changes
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
@ -691,6 +832,8 @@ async def amerge_entities(
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity.
@ -897,7 +1040,89 @@ async def amerge_entities(
await relationships_vdb.upsert(relation_data_for_vdb)
# 9. Delete source entities
# 9. Merge and update chunk storage
# Merge entity_chunks from all source entities
if entity_chunks_storage:
all_chunk_ids = set()
for entity_name in source_entities:
if entity_name == target_entity:
continue
chunks_data = await entity_chunks_storage.get_by_id(entity_name)
if chunks_data and isinstance(chunks_data, dict):
chunk_list = chunks_data.get("chunk_ids", [])
all_chunk_ids.update(chunk_list)
# Get target entity's existing chunks if it exists
if target_exists:
target_chunks_data = await entity_chunks_storage.get_by_id(
target_entity
)
if target_chunks_data and isinstance(target_chunks_data, dict):
chunk_list = target_chunks_data.get("chunk_ids", [])
all_chunk_ids.update(chunk_list)
if all_chunk_ids:
await entity_chunks_storage.upsert(
{
target_entity: {
"chunk_ids": list(all_chunk_ids),
"count": len(all_chunk_ids),
"updated_at": int(time.time()),
}
}
)
logger.info(
f"Merged entity_chunks for '{target_entity}' with {len(all_chunk_ids)} chunk IDs"
)
# Update relation_chunks for merged relationships
if relation_chunks_storage and all_relations:
old_relation_keys_to_delete = []
new_relation_chunks_data = {}
for src, tgt, edge_data in all_relations:
new_src = target_entity if src in source_entities else src
new_tgt = target_entity if tgt in source_entities else tgt
# Skip self-loops
if new_src == new_tgt:
continue
old_key = make_relation_chunk_key(src, tgt)
new_key = make_relation_chunk_key(new_src, new_tgt)
if old_key != new_key:
old_chunks_data = await relation_chunks_storage.get_by_id(
old_key
)
if old_chunks_data:
# Check if new key already has data (merge case)
if new_key in new_relation_chunks_data:
existing_chunks = set(
new_relation_chunks_data[new_key].get(
"chunk_ids", []
)
)
new_chunks = set(old_chunks_data.get("chunk_ids", []))
merged_chunks = existing_chunks.union(new_chunks)
new_relation_chunks_data[new_key] = {
"chunk_ids": list(merged_chunks),
"count": len(merged_chunks),
"updated_at": int(time.time()),
}
else:
new_relation_chunks_data[new_key] = old_chunks_data
old_relation_keys_to_delete.append(old_key)
if new_relation_chunks_data:
await relation_chunks_storage.upsert(new_relation_chunks_data)
if old_relation_keys_to_delete:
await relation_chunks_storage.delete(old_relation_keys_to_delete)
logger.info(
f"Updated {len(new_relation_chunks_data)} relation_chunks entries for merged entities"
)
# 10. Delete source entities
for entity_name in source_entities:
if entity_name == target_entity:
logger.info(
@ -916,7 +1141,11 @@ async def amerge_entities(
f"Deleted source entity '{entity_name}' and its vector embedding from database"
)
# 10. Save changes
# Delete source entity's chunk tracking
if entity_chunks_storage:
await entity_chunks_storage.delete([entity_name])
# 11. Save changes
await _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
)