Merge pull request #2265 from danielaskdd/edit-kg-new

Refactor: Enhance KG Editing with Chunk Tracking
This commit is contained in:
Daniel.y 2025-10-26 22:45:38 +08:00 committed by GitHub
commit 69b4cda242
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 530 additions and 122 deletions

View file

@ -299,6 +299,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
entity_data (dict): Entity properties including:
- description (str): Textual description of the entity
- entity_type (str): Category/type of the entity (e.g., PERSON, ORGANIZATION, LOCATION)
- source_id (str): Related chunk_id from which the description originates
- Additional custom properties as needed
Response Schema:
@ -309,6 +310,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"entity_name": "Tesla",
"description": "Electric vehicle manufacturer",
"entity_type": "ORGANIZATION",
"source_id": "chunk-123<SEP>chunk-456"
... (other entity properties)
}
}
@ -361,10 +363,11 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"""
Create a new relationship between two entities in the knowledge graph
This endpoint establishes a directed relationship between two existing entities.
Both the source and target entities must already exist in the knowledge graph.
The system automatically generates vector embeddings for the relationship to
enable semantic search and graph traversal.
This endpoint establishes an undirected relationship between two existing entities.
The provided source/target order is accepted for convenience, but the backend
stored edge is undirected and may be returned with the entities swapped.
Both entities must already exist in the knowledge graph. The system automatically
generates vector embeddings for the relationship to enable semantic search and graph traversal.
Prerequisites:
- Both source_entity and target_entity must exist in the knowledge graph
@ -376,6 +379,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
relation_data (dict): Relationship properties including:
- description (str): Textual description of the relationship
- keywords (str): Comma-separated keywords describing the relationship type
- source_id (str): Related chunk_id from which the description originates
- weight (float): Relationship strength/importance (default: 1.0)
- Additional custom properties as needed
@ -388,6 +392,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"tgt_id": "Tesla",
"description": "Elon Musk is the CEO of Tesla",
"keywords": "CEO, founder",
"source_id": "chunk-123<SEP>chunk-456"
"weight": 1.0,
... (other relationship properties)
}

View file

@ -3582,6 +3582,7 @@ class LightRAG:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
entity_name: Name of the entity to edit
@ -3600,6 +3601,8 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
self.entity_chunks,
self.relation_chunks,
)
def edit_entity(
@ -3616,6 +3619,7 @@ class LightRAG:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
source_entity: Name of the source entity
@ -3634,6 +3638,7 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
self.relation_chunks,
)
def edit_relation(

View file

@ -2551,6 +2551,52 @@ def apply_source_ids_limit(
return truncated
def compute_incremental_chunk_ids(
existing_full_chunk_ids: list[str],
old_chunk_ids: list[str],
new_chunk_ids: list[str],
) -> list[str]:
"""
Compute incrementally updated chunk IDs based on changes.
This function applies delta changes (additions and removals) to an existing
list of chunk IDs while maintaining order and ensuring deduplication.
Delta additions from new_chunk_ids are placed at the end.
Args:
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
Returns:
Updated list of chunk IDs with deduplication
Example:
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
>>> old = ['chunk-1', 'chunk-2']
>>> new = ['chunk-2', 'chunk-4']
>>> compute_incremental_chunk_ids(existing, old, new)
['chunk-3', 'chunk-2', 'chunk-4']
"""
# Calculate changes
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
# Apply changes to full chunk_ids
# Step 1: Remove chunks that are no longer needed
updated_chunk_ids = [
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
]
# Step 2: Add new chunks (preserving order from new_chunk_ids)
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
for cid in new_chunk_ids:
if cid in chunks_to_add and cid not in updated_chunk_ids:
updated_chunk_ids.append(cid)
return updated_chunk_ids
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],

View file

@ -11,16 +11,68 @@ from .utils import compute_mdhash_id, logger
from .base import StorageNameSpace
async def _persist_graph_updates(
entities_vdb=None,
relationships_vdb=None,
chunk_entity_relation_graph=None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> None:
"""Unified callback to persist updates after graph operations.
Ensures all relevant storage instances are properly persisted after
operations like delete, edit, create, or merge.
Args:
entities_vdb: Entity vector database storage (optional)
relationships_vdb: Relationship vector database storage (optional)
chunk_entity_relation_graph: Graph storage instance (optional)
entity_chunks_storage: Entity-chunk tracking storage (optional)
relation_chunks_storage: Relation-chunk tracking storage (optional)
"""
storages = []
# Collect all non-None storage instances
if entities_vdb is not None:
storages.append(entities_vdb)
if relationships_vdb is not None:
storages.append(relationships_vdb)
if chunk_entity_relation_graph is not None:
storages.append(chunk_entity_relation_graph)
if entity_chunks_storage is not None:
storages.append(entity_chunks_storage)
if relation_chunks_storage is not None:
storages.append(relation_chunks_storage)
# Persist all storage instances in parallel
if storages:
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in storages # type: ignore
]
)
async def adelete_by_entity(
chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name: str,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships.
Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking.
Args:
chunk_entity_relation_graph: Graph storage instance
entities_vdb: Vector database storage for entities
relationships_vdb: Vector database storage for relationships
entity_name: Name of the entity to delete
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
"""
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
@ -39,14 +91,45 @@ async def adelete_by_entity(
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
related_relations_count = len(edges) if edges else 0
# Clean up chunk tracking storages before deletion
if entity_chunks_storage is not None:
# Delete entity's entry from entity_chunks_storage
await entity_chunks_storage.delete([entity_name])
logger.info(
f"Entity Delete: removed chunk tracking for `{entity_name}`"
)
if relation_chunks_storage is not None and edges:
# Delete all related relationships from relation_chunks_storage
from .utils import make_relation_chunk_key
relation_keys_to_delete = []
for src, tgt in edges:
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([src, tgt])
storage_key = make_relation_chunk_key(
normalized_src, normalized_tgt
)
relation_keys_to_delete.append(storage_key)
if relation_keys_to_delete:
await relation_chunks_storage.delete(relation_keys_to_delete)
logger.info(
f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations"
)
await entities_vdb.delete_entity(entity_name)
await relationships_vdb.delete_entity_relation(entity_name)
await chunk_entity_relation_graph.delete_node(entity_name)
message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted."
message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
logger.info(message)
await _delete_by_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
return DeletionResult(
status="success",
@ -65,41 +148,33 @@ async def adelete_by_entity(
)
async def _delete_by_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
) -> None:
"""Callback after entity deletion is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def adelete_by_relation(
chunk_entity_relation_graph,
relationships_vdb,
source_entity: str,
target_entity: str,
relation_chunks_storage=None,
) -> DeletionResult:
"""Asynchronously delete a relation between two entities.
Also cleans up relation_chunks_storage to remove chunk tracking.
Args:
chunk_entity_relation_graph: Graph storage instance
relationships_vdb: Vector database storage for relationships
source_entity: Name of the source entity
target_entity: Name of the target entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
"""
relation_str = f"{source_entity} -> {target_entity}"
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Check if the relation exists
edge_exists = await chunk_entity_relation_graph.has_edge(
source_entity, target_entity
@ -114,6 +189,19 @@ async def adelete_by_relation(
status_code=404,
)
# Clean up chunk tracking storage before deletion
if relation_chunks_storage is not None:
from .utils import make_relation_chunk_key
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
await relation_chunks_storage.delete([storage_key])
logger.info(
f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
)
# Delete relation from vector database
rel_ids_to_delete = [
compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
@ -127,9 +215,13 @@ async def adelete_by_relation(
[(source_entity, target_entity)]
)
message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
logger.info(message)
await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph)
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
return DeletionResult(
status="success",
doc_id=relation_str,
@ -147,19 +239,6 @@ async def adelete_by_relation(
)
async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None:
"""Callback after relation deletion is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def aedit_entity(
chunk_entity_relation_graph,
entities_vdb,
@ -167,10 +246,13 @@ async def aedit_entity(
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -179,6 +261,8 @@ async def aedit_entity(
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing updated entity information
@ -187,6 +271,9 @@ async def aedit_entity(
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Save original entity name for chunk tracking updates
original_entity_name = entity_name
# 1. Get current entity information
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
@ -223,7 +310,9 @@ async def aedit_entity(
# If renaming entity
if is_renaming:
logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'")
logger.info(
f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`"
)
# Create new entity
await chunk_entity_relation_graph.upsert_node(
@ -269,35 +358,36 @@ async def aedit_entity(
# Delete old entity record from vector database
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
logger.info(
f"Deleted old entity '{entity_name}' and its vector embedding from database"
)
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)
logger.info(
f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
)
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
# Normalize entity order for consistent vector ID generation
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create new content for embedding
content = f"{src}\t{tgt}\n{keywords}\n{description}"
# Create content using normalized order
content = (
f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
)
# Calculate relationship ID
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
# Calculate relationship ID using normalized order
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
@ -310,6 +400,7 @@ async def aedit_entity(
# Update working entity name to new name
entity_name = new_entity_name
else:
# If not renaming, directly update node data
await chunk_entity_relation_graph.upsert_node(
@ -339,12 +430,158 @@ async def aedit_entity(
# Update vector database
await entities_vdb.upsert(entity_data)
# 4. Save changes
await _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
# 4. Update chunk tracking storages
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
# 4.1 Handle entity chunk tracking
if entity_chunks_storage is not None:
# Get storage key (use original name for renaming scenario)
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (even if updated_chunk_ids is empty)
if is_renaming:
# Renaming: delete old + create new
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
# Non-renaming: direct update
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
)
# 4.2 Handle relation chunk tracking if entity was renamed
if (
is_renaming
and relation_chunks_storage is not None
and relations_to_update
):
for src, tgt, edge_data in relations_to_update:
# Determine old entity pair (before rename)
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
# Normalize entity order for both old and new keys
old_normalized_src, old_normalized_tgt = sorted(
[old_src, old_tgt]
)
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
# Generate storage keys
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
# If keys are different, we need to move the chunk tracking
if old_storage_key != new_storage_key:
# Get complete chunk IDs from storage first (preserves all existing references)
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
# Use complete chunk_ids from storage
relation_chunk_ids = [
cid
for cid in old_stored_data.get("chunk_ids", [])
if cid
]
else:
# Fallback: if storage has no data, use graph's source_id
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
# Delete old relation chunk tracking
await relation_chunks_storage.delete([old_storage_key])
# Create new relation chunk tracking (migrate complete data)
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
# 5. Save changes
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity '{entity_name}' successfully updated")
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
return await get_entity_info(
chunk_entity_relation_graph,
entities_vdb,
@ -356,22 +593,6 @@ async def aedit_entity(
raise
async def _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
) -> None:
"""Callback after entity editing is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def aedit_relation(
chunk_entity_relation_graph,
entities_vdb,
@ -379,10 +600,12 @@ async def aedit_relation(
source_entity: str,
target_entity: str,
updated_data: dict[str, Any],
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -391,6 +614,7 @@ async def aedit_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"}
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
Returns:
Dictionary containing updated relation information
@ -399,6 +623,10 @@ async def aedit_relation(
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Normalize entity order for undirected graph (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# 1. Get current relation information
edge_exists = await chunk_entity_relation_graph.has_edge(
source_entity, target_entity
@ -411,12 +639,14 @@ async def aedit_relation(
source_entity, target_entity
)
# Important: First delete the old relation record from the vector database
old_relation_id = compute_mdhash_id(
source_entity + target_entity, prefix="rel-"
)
await relationships_vdb.delete([old_relation_id])
logger.info(
f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}"
# Delete both permutations to handle relationships created before normalization
rel_ids_to_delete = [
compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
compute_mdhash_id(target_entity + source_entity, prefix="rel-"),
]
await relationships_vdb.delete(rel_ids_to_delete)
logger.debug(
f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`"
)
# 2. Update relation information in the graph
@ -455,11 +685,79 @@ async def aedit_relation(
# Update vector database
await relationships_vdb.upsert(relation_data)
# 4. Save changes
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
# 4. Update relation_chunks_storage in two scenarios:
# - source_id has changed (edit scenario)
# - relation_chunks_storage has no existing data (migration/initialization scenario)
if relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
storage_key = make_relation_chunk_key(source_entity, target_entity)
# Check if storage has existing data
stored_data = await relation_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = edge_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_edge_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (Update even if updated_chunk_ids is empty)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`"
)
# 5. Save changes
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(
f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated"
)
return await get_relation_info(
chunk_entity_relation_graph,
@ -475,29 +773,19 @@ async def aedit_relation(
raise
async def _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None:
"""Callback after relation editing is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def acreate_entity(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name: str,
entity_data: dict[str, Any],
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously create a new entity.
Creates a new entity in the knowledge graph and adds it to the vector database.
Also synchronizes entity_chunks_storage to track chunk references.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -505,6 +793,8 @@ async def acreate_entity(
relationships_vdb: Vector database storage for relationships
entity_name: Name of the new entity
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing created entity information
@ -555,12 +845,34 @@ async def acreate_entity(
# Update vector database
await entities_vdb.upsert(entity_data_for_vdb)
# Update entity_chunks_storage to track chunk references
if entity_chunks_storage is not None:
source_id = node_data.get("source_id", "")
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
if chunk_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
}
)
logger.info(
f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`"
)
# Save changes
await _edit_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity '{entity_name}' successfully created")
logger.info(f"Entity Create: '{entity_name}' successfully created")
return await get_entity_info(
chunk_entity_relation_graph,
entities_vdb,
@ -579,10 +891,12 @@ async def acreate_relation(
source_entity: str,
target_entity: str,
relation_data: dict[str, Any],
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously create a new relation between entities.
Creates a new relation (edge) in the knowledge graph and adds it to the vector database.
Also synchronizes relation_chunks_storage to track chunk references.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -591,6 +905,7 @@ async def acreate_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
Returns:
Dictionary containing created relation information
@ -632,6 +947,10 @@ async def acreate_relation(
source_entity, target_entity, edge_data
)
# Normalize entity order for undirected relation vector (ensures consistent key generation)
if source_entity > target_entity:
source_entity, target_entity = target_entity, source_entity
# Prepare content for embedding
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
@ -663,11 +982,39 @@ async def acreate_relation(
# Update vector database
await relationships_vdb.upsert(relation_data_for_vdb)
# Update relation_chunks_storage to track chunk references
if relation_chunks_storage is not None:
from .utils import make_relation_chunk_key
# Normalize entity order for consistent key generation
normalized_src, normalized_tgt = sorted([source_entity, target_entity])
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
source_id = edge_data.get("source_id", "")
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
if chunk_ids:
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
}
)
logger.info(
f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`"
)
# Save changes
await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph)
await _persist_graph_updates(
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(
f"Relation from '{source_entity}' to '{target_entity}' successfully created"
f"Relation Create: `{source_entity}`~`{target_entity}` successfully created"
)
return await get_relation_info(
chunk_entity_relation_graph,
@ -875,19 +1222,27 @@ async def amerge_entities(
tgt = rel_data["tgt"]
edge_data = rel_data["data"]
# Normalize entity order for consistent vector storage
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
content = f"{keywords}\t{src}\n{tgt}\n{description}"
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
# Use normalized order for content and relation ID
content = (
f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}"
)
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
relation_data_for_vdb = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
@ -917,8 +1272,10 @@ async def amerge_entities(
)
# 10. Save changes
await _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
)
logger.info(
@ -1044,22 +1401,6 @@ def _merge_relation_attributes(
return merged_data
async def _merge_entities_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
) -> None:
"""Callback after entity merging is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
entities_vdb,
relationships_vdb,
chunk_entity_relation_graph,
]
]
)
async def get_entity_info(
chunk_entity_relation_graph,
entities_vdb,
@ -1094,7 +1435,18 @@ async def get_relation_info(
tgt_entity: str,
include_vector_data: bool = False,
) -> dict[str, str | None | dict[str, str]]:
"""Get detailed information of a relationship"""
"""
Get detailed information of a relationship between two entities.
Relationship is unidirectional, swap src_entity and tgt_entity does not change the relationship.
Args:
src_entity: Source entity name
tgt_entity: Target entity name
include_vector_data: Whether to include vector database information
Returns:
Dictionary containing relationship information
"""
# Get information from the graph
edge_data = await chunk_entity_relation_graph.get_edge(src_entity, tgt_entity)