Refactor entity edit and merge functions to support merge-on-rename

• Extract internal implementation helpers
• Add allow_merge parameter to aedit_entity
• Support merging when renaming to existing name
• Improve code reusability and modularity
• Maintain backward compatibility
This commit is contained in:
yangdx 2025-10-27 14:23:51 +08:00
parent 411e92e6b9
commit 11a1631d76

View file

@ -246,6 +246,272 @@ async def adelete_by_relation(
) )
async def _edit_entity_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name: str,
updated_data: dict[str, str],
*,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Internal helper that edits an entity without acquiring storage locks.
This function performs the actual entity edit operations without lock management.
It should only be called by public APIs that have already acquired necessary locks.
Args:
chunk_entity_relation_graph: Graph storage instance
entities_vdb: Vector database storage for entities
relationships_vdb: Vector database storage for relationships
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes (including optional entity_name for renaming)
entity_chunks_storage: Optional KV storage for tracking chunks
relation_chunks_storage: Optional KV storage for tracking relation chunks
Returns:
Dictionary containing updated entity information
Note:
Caller must acquire appropriate locks before calling this function.
If renaming (entity_name in updated_data), this function will check if the new name exists.
"""
new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name
original_entity_name = entity_name
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await chunk_entity_relation_graph.get_node(entity_name)
if is_renaming:
existing_node = await chunk_entity_relation_graph.has_node(new_entity_name)
if existing_node:
raise ValueError(
f"Entity name '{new_entity_name}' already exists, cannot rename"
)
new_node_data = {**node_data, **updated_data}
new_node_data["entity_id"] = new_entity_name
if "entity_name" in new_node_data:
del new_node_data[
"entity_name"
] # Node data should not contain entity_name field
if is_renaming:
logger.info(f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`")
await chunk_entity_relation_graph.upsert_node(new_entity_name, new_node_data)
relations_to_update = []
relations_to_delete = []
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
if edges:
for source, target in edges:
edge_data = await chunk_entity_relation_graph.get_edge(source, target)
if edge_data:
relations_to_delete.append(
compute_mdhash_id(source + target, prefix="rel-")
)
relations_to_delete.append(
compute_mdhash_id(target + source, prefix="rel-")
)
if source == entity_name:
await chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data
)
relations_to_update.append((new_entity_name, target, edge_data))
else: # target == entity_name
await chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data
)
relations_to_update.append((source, new_entity_name, edge_data))
await chunk_entity_relation_graph.delete_node(entity_name)
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
await relationships_vdb.delete(relations_to_delete)
for src, tgt, edge_data in relations_to_update:
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
content = f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
relation_data = {
relation_id: {
"content": content,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
}
await relationships_vdb.upsert(relation_data)
entity_name = new_entity_name
else:
await chunk_entity_relation_graph.upsert_node(entity_name, new_node_data)
description = new_node_data.get("description", "")
source_id = new_node_data.get("source_id", "")
entity_type = new_node_data.get("entity_type", "")
content = entity_name + "\n" + description
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
entity_data = {
entity_id: {
"content": content,
"entity_name": entity_name,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
await entities_vdb.upsert(entity_data)
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import make_relation_chunk_key, compute_incremental_chunk_ids
if entity_chunks_storage is not None:
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
if source_id_changed or not has_stored_data:
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
if is_renaming:
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
)
if is_renaming and relation_chunks_storage is not None and relations_to_update:
for src, tgt, edge_data in relations_to_update:
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
old_normalized_src, old_normalized_tgt = sorted([old_src, old_tgt])
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
if old_storage_key != new_storage_key:
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
relation_chunk_ids = [
cid for cid in old_stored_data.get("chunk_ids", []) if cid
]
else:
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
await relation_chunks_storage.delete([old_storage_key])
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
return await get_entity_info(
chunk_entity_relation_graph,
entities_vdb,
entity_name,
include_vector_data=True,
)
async def aedit_entity( async def aedit_entity(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -253,6 +519,7 @@ async def aedit_entity(
entity_name: str, entity_name: str,
updated_data: dict[str, str], updated_data: dict[str, str],
allow_rename: bool = True, allow_rename: bool = True,
allow_merge: bool = False,
entity_chunks_storage=None, entity_chunks_storage=None,
relation_chunks_storage=None, relation_chunks_storage=None,
) -> dict[str, Any]: ) -> dict[str, Any]:
@ -268,338 +535,82 @@ async def aedit_entity(
entity_name: Name of the entity to edit entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name, defaults to False
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns: Returns:
Dictionary containing updated entity information Dictionary containing updated entity information
""" """
# Determine entities to lock
new_entity_name = updated_data.get("entity_name", entity_name) new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name is_renaming = new_entity_name != entity_name
# Lock both original and new entity names if renaming lock_keys = sorted({entity_name, new_entity_name}) if is_renaming else [entity_name]
lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name]
# Use keyed lock for entity to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "") workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock( async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False lock_keys, namespace=namespace, enable_logging=False
): ):
try: try:
# Save original entity name for chunk tracking updates if is_renaming and not allow_rename:
original_entity_name = entity_name
# 1. Get current entity information
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await chunk_entity_relation_graph.get_node(entity_name)
# If renaming, check if new name already exists
if is_renaming:
if not allow_rename:
raise ValueError( raise ValueError(
"Entity renaming is not allowed. Set allow_rename=True to enable this feature" "Entity renaming is not allowed. Set allow_rename=True to enable this feature"
) )
existing_node = await chunk_entity_relation_graph.has_node( if is_renaming:
target_exists = await chunk_entity_relation_graph.has_node(
new_entity_name new_entity_name
) )
if existing_node: if target_exists:
if not allow_merge:
raise ValueError( raise ValueError(
f"Entity name '{new_entity_name}' already exists, cannot rename" f"Entity name '{new_entity_name}' already exists, cannot rename"
) )
# 2. Update entity information in the graph
new_node_data = {**node_data, **updated_data}
new_node_data["entity_id"] = new_entity_name
if "entity_name" in new_node_data:
del new_node_data[
"entity_name"
] # Node data should not contain entity_name field
# If renaming entity
if is_renaming:
logger.info( logger.info(
f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`" f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`"
) )
# Create new entity non_name_updates = {
await chunk_entity_relation_graph.upsert_node( key: value
new_entity_name, new_node_data for key, value in updated_data.items()
) if key != "entity_name"
# Store relationships that need to be updated
relations_to_update = []
relations_to_delete = []
# Get all edges related to the original entity
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
if edges:
# Recreate edges for the new entity
for source, target in edges:
edge_data = await chunk_entity_relation_graph.get_edge(
source, target
)
if edge_data:
relations_to_delete.append(
compute_mdhash_id(source + target, prefix="rel-")
)
relations_to_delete.append(
compute_mdhash_id(target + source, prefix="rel-")
)
if source == entity_name:
await chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data
)
relations_to_update.append(
(new_entity_name, target, edge_data)
)
else: # target == entity_name
await chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data
)
relations_to_update.append(
(source, new_entity_name, edge_data)
)
# Delete old entity
await chunk_entity_relation_graph.delete_node(entity_name)
# Delete old entity record from vector database
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
# Normalize entity order for consistent vector ID generation
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create content using normalized order
content = (
f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
)
# Calculate relationship ID using normalized order
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
} }
} if non_name_updates:
# Update vector database
await relationships_vdb.upsert(relation_data)
# Update working entity name to new name
entity_name = new_entity_name
else:
# If not renaming, directly update node data
await chunk_entity_relation_graph.upsert_node(
entity_name, new_node_data
)
# 3. Recalculate entity's vector representation and update vector database
description = new_node_data.get("description", "")
source_id = new_node_data.get("source_id", "")
entity_type = new_node_data.get("entity_type", "")
content = entity_name + "\n" + description
# Calculate entity ID
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
# Prepare data for vector database update
entity_data = {
entity_id: {
"content": content,
"entity_name": entity_name,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
# Update vector database
await entities_vdb.upsert(entity_data)
# 4. Update chunk tracking storages
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
# 4.1 Handle entity chunk tracking
if entity_chunks_storage is not None:
# Get storage key (use original name for renaming scenario)
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (even if updated_chunk_ids is empty)
if is_renaming:
# Renaming: delete old + create new
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
# Non-renaming: direct update
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info( logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" "Entity Edit: applying non-name updates before merge"
) )
await _edit_entity_impl(
# 4.2 Handle relation chunk tracking if entity was renamed chunk_entity_relation_graph,
if ( entities_vdb,
is_renaming relationships_vdb,
and relation_chunks_storage is not None entity_name,
and relations_to_update non_name_updates,
):
for src, tgt, edge_data in relations_to_update:
# Determine old entity pair (before rename)
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
# Normalize entity order for both old and new keys
old_normalized_src, old_normalized_tgt = sorted(
[old_src, old_tgt]
)
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
# Generate storage keys
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
# If keys are different, we need to move the chunk tracking
if old_storage_key != new_storage_key:
# Get complete chunk IDs from storage first (preserves all existing references)
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
# Use complete chunk_ids from storage
relation_chunk_ids = [
cid
for cid in old_stored_data.get("chunk_ids", [])
if cid
]
else:
# Fallback: if storage has no data, use graph's source_id
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
# Delete old relation chunk tracking
await relation_chunks_storage.delete([old_storage_key])
# Create new relation chunk tracking (migrate complete data)
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
# 5. Save changes
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage, entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage, relation_chunks_storage=relation_chunks_storage,
) )
logger.info(f"Entity Edit: `{entity_name}` successfully updated") return await _merge_entities_impl(
return await get_entity_info(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
relationships_vdb,
[entity_name],
new_entity_name,
merge_strategy=None,
target_entity_data=None,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
return await _edit_entity_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name, entity_name,
include_vector_data=True, updated_data,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
) )
except Exception as e: except Exception as e:
logger.error(f"Error while editing entity '{entity_name}': {e}") logger.error(f"Error while editing entity '{entity_name}': {e}")
@ -1054,22 +1065,22 @@ async def acreate_relation(
raise raise
async def amerge_entities( async def _merge_entities_impl(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
relationships_vdb, relationships_vdb,
source_entities: list[str], source_entities: list[str],
target_entity: str, target_entity: str,
*,
merge_strategy: dict[str, str] = None, merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None, target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None, entity_chunks_storage=None,
relation_chunks_storage=None, relation_chunks_storage=None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity. """Internal helper that merges entities without acquiring storage locks.
Merges multiple source entities into a target entity, handling all relationships, This function performs the actual entity merge operations without lock management.
and updating both the knowledge graph and vector database. It should only be called by public APIs that have already acquired necessary locks.
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
Args: Args:
chunk_entity_relation_graph: Graph storage instance chunk_entity_relation_graph: Graph storage instance
@ -1077,28 +1088,18 @@ async def amerge_entities(
relationships_vdb: Vector database storage for relationships relationships_vdb: Vector database storage for relationships
source_entities: List of source entity names to merge source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging target_entity: Name of the target entity after merging
merge_strategy: Deprecated (Each field uses its own default strategy). If provided, merge_strategy: Deprecated. Merge strategy for each field (optional)
customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for target entity (optional)
target_entity_data: Dictionary of specific values to set for the target entity, entity_chunks_storage: Optional KV storage for tracking chunks
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} relation_chunks_storage: Optional KV storage for tracking relation chunks
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns: Returns:
Dictionary containing the merged entity information Dictionary containing the merged entity information
"""
# Collect all entities involved (source + target) and lock them all in sorted order
all_entities = set(source_entities)
all_entities.add(target_entity)
lock_keys = sorted(all_entities)
# Use keyed lock for all entities to ensure atomic graph and vector db operations Note:
workspace = entities_vdb.global_config.get("workspace", "") Caller must acquire appropriate locks before calling this function.
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" All source entities and the target entity should be locked together.
async with get_storage_keyed_lock( """
lock_keys, namespace=namespace, enable_logging=False
):
try:
# Default merge strategy for entities # Default merge strategy for entities
default_entity_merge_strategy = { default_entity_merge_strategy = {
"description": "concatenate", "description": "concatenate",
@ -1115,9 +1116,7 @@ async def amerge_entities(
**default_entity_merge_strategy, **default_entity_merge_strategy,
**merge_strategy, **merge_strategy,
} }
target_entity_data = ( target_entity_data = {} if target_entity_data is None else target_entity_data
{} if target_entity_data is None else target_entity_data
)
# 1. Check if all source entities exist # 1. Check if all source entities exist
source_entities_data = {} source_entities_data = {}
@ -1132,8 +1131,8 @@ async def amerge_entities(
target_exists = await chunk_entity_relation_graph.has_node(target_entity) target_exists = await chunk_entity_relation_graph.has_node(target_entity)
existing_target_entity_data = {} existing_target_entity_data = {}
if target_exists: if target_exists:
existing_target_entity_data = ( existing_target_entity_data = await chunk_entity_relation_graph.get_node(
await chunk_entity_relation_graph.get_node(target_entity) target_entity
) )
logger.info( logger.info(
"Entity Merge: target entity already exists, source and target entities will be merged" "Entity Merge: target entity already exists, source and target entities will be merged"
@ -1166,22 +1165,16 @@ async def amerge_entities(
for src, tgt in edges: for src, tgt in edges:
# Ensure src is the current entity # Ensure src is the current entity
if src == entity_name: if src == entity_name:
edge_data = await chunk_entity_relation_graph.get_edge( edge_data = await chunk_entity_relation_graph.get_edge(src, tgt)
src, tgt
)
all_relations.append((src, tgt, edge_data)) all_relations.append((src, tgt, edge_data))
# 5. Create or update the target entity # 5. Create or update the target entity
merged_entity_data["entity_id"] = target_entity merged_entity_data["entity_id"] = target_entity
if not target_exists: if not target_exists:
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data)
target_entity, merged_entity_data
)
logger.info(f"Entity Merge: created target '{target_entity}'") logger.info(f"Entity Merge: created target '{target_entity}'")
else: else:
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data)
target_entity, merged_entity_data
)
logger.info(f"Entity Merge: Updated target '{target_entity}'") logger.info(f"Entity Merge: Updated target '{target_entity}'")
# 6. Recreate all relations pointing to the target entity in KG # 6. Recreate all relations pointing to the target entity in KG
@ -1209,9 +1202,7 @@ async def amerge_entities(
# Skip relationships between source entities to avoid self-loops # Skip relationships between source entities to avoid self-loops
if new_src == new_tgt: if new_src == new_tgt:
logger.info( logger.info(f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop")
f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop"
)
continue continue
# Normalize entity order for consistent duplicate detection (undirected relationships) # Normalize entity order for consistent duplicate detection (undirected relationships)
@ -1220,9 +1211,7 @@ async def amerge_entities(
# Process chunk tracking for this relation # Process chunk tracking for this relation
if relation_chunks_storage is not None: if relation_chunks_storage is not None:
storage_key = make_relation_chunk_key( storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
normalized_src, normalized_tgt
)
# Get chunk_ids from storage for this original relation # Get chunk_ids from storage for this original relation
stored = await relation_chunks_storage.get_by_id(old_storage_key) stored = await relation_chunks_storage.get_by_id(old_storage_key)
@ -1232,9 +1221,7 @@ async def amerge_entities(
else: else:
# Fallback to source_id from graph # Fallback to source_id from graph
source_id = edge_data.get("source_id", "") source_id = edge_data.get("source_id", "")
chunk_ids = [ chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid
]
# Accumulate chunk_ids with ordered deduplication # Accumulate chunk_ids with ordered deduplication
if storage_key not in relation_chunk_tracking: if storage_key not in relation_chunk_tracking:
@ -1301,9 +1288,7 @@ async def amerge_entities(
) )
# 7. Update relationship vector representations # 7. Update relationship vector representations
logger.info( logger.info(f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb")
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
)
await relationships_vdb.delete(relations_to_delete) await relationships_vdb.delete(relations_to_delete)
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
edge_data = rel_data["data"] edge_data = rel_data["data"]
@ -1316,12 +1301,8 @@ async def amerge_entities(
weight = float(edge_data.get("weight", 1.0)) weight = float(edge_data.get("weight", 1.0))
# Use normalized order for content and relation ID # Use normalized order for content and relation ID
content = ( content = f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}"
f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-")
)
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
relation_data_for_vdb = { relation_data_for_vdb = {
relation_id: { relation_id: {
@ -1335,9 +1316,7 @@ async def amerge_entities(
} }
} }
await relationships_vdb.upsert(relation_data_for_vdb) await relationships_vdb.upsert(relation_data_for_vdb)
logger.info( logger.info(f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`")
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
)
# 8. Update entity vector representation # 8. Update entity vector representation
description = merged_entity_data.get("description", "") description = merged_entity_data.get("description", "")
@ -1392,9 +1371,7 @@ async def amerge_entities(
merged_chunk_ids.append(chunk_id) merged_chunk_ids.append(chunk_id)
# Delete source entities' chunk tracking records # Delete source entities' chunk tracking records
entity_keys_to_delete = [ entity_keys_to_delete = [e for e in source_entities if e != target_entity]
e for e in source_entities if e != target_entity
]
if entity_keys_to_delete: if entity_keys_to_delete:
await entity_chunks_storage.delete(entity_keys_to_delete) await entity_chunks_storage.delete(entity_keys_to_delete)
@ -1448,6 +1425,62 @@ async def amerge_entities(
include_vector_data=True, include_vector_data=True,
) )
async def amerge_entities(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
source_entities: list[str],
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity, handling all relationships,
and updating both the knowledge graph and vector database.
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
Args:
chunk_entity_relation_graph: Graph storage instance
entities_vdb: Vector database storage for entities
relationships_vdb: Vector database storage for relationships
source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging
merge_strategy: Deprecated (Each field uses its own default strategy). If provided,
customizations are applied but a warning is logged.
target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing the merged entity information
"""
# Collect all entities involved (source + target) and lock them all in sorted order
all_entities = set(source_entities)
all_entities.add(target_entity)
lock_keys = sorted(all_entities)
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False
):
try:
return await _merge_entities_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
source_entities,
target_entity,
merge_strategy=merge_strategy,
target_entity_data=target_entity_data,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
except Exception as e: except Exception as e:
logger.error(f"Error merging entities: {e}") logger.error(f"Error merging entities: {e}")
raise raise