This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:39 +08:00
parent 2a451c4e22
commit 56dfc4dab5

View file

@ -246,272 +246,6 @@ async def adelete_by_relation(
) )
async def _edit_entity_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name: str,
updated_data: dict[str, str],
*,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Internal helper that edits an entity without acquiring storage locks.
This function performs the actual entity edit operations without lock management.
It should only be called by public APIs that have already acquired necessary locks.
Args:
chunk_entity_relation_graph: Graph storage instance
entities_vdb: Vector database storage for entities
relationships_vdb: Vector database storage for relationships
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes (including optional entity_name for renaming)
entity_chunks_storage: Optional KV storage for tracking chunks
relation_chunks_storage: Optional KV storage for tracking relation chunks
Returns:
Dictionary containing updated entity information
Note:
Caller must acquire appropriate locks before calling this function.
If renaming (entity_name in updated_data), this function will check if the new name exists.
"""
new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name
original_entity_name = entity_name
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await chunk_entity_relation_graph.get_node(entity_name)
if is_renaming:
existing_node = await chunk_entity_relation_graph.has_node(new_entity_name)
if existing_node:
raise ValueError(
f"Entity name '{new_entity_name}' already exists, cannot rename"
)
new_node_data = {**node_data, **updated_data}
new_node_data["entity_id"] = new_entity_name
if "entity_name" in new_node_data:
del new_node_data[
"entity_name"
] # Node data should not contain entity_name field
if is_renaming:
logger.info(f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`")
await chunk_entity_relation_graph.upsert_node(new_entity_name, new_node_data)
relations_to_update = []
relations_to_delete = []
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
if edges:
for source, target in edges:
edge_data = await chunk_entity_relation_graph.get_edge(source, target)
if edge_data:
relations_to_delete.append(
compute_mdhash_id(source + target, prefix="rel-")
)
relations_to_delete.append(
compute_mdhash_id(target + source, prefix="rel-")
)
if source == entity_name:
await chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data
)
relations_to_update.append((new_entity_name, target, edge_data))
else: # target == entity_name
await chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data
)
relations_to_update.append((source, new_entity_name, edge_data))
await chunk_entity_relation_graph.delete_node(entity_name)
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
await relationships_vdb.delete(relations_to_delete)
for src, tgt, edge_data in relations_to_update:
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
content = f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
relation_data = {
relation_id: {
"content": content,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
}
await relationships_vdb.upsert(relation_data)
entity_name = new_entity_name
else:
await chunk_entity_relation_graph.upsert_node(entity_name, new_node_data)
description = new_node_data.get("description", "")
source_id = new_node_data.get("source_id", "")
entity_type = new_node_data.get("entity_type", "")
content = entity_name + "\n" + description
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
entity_data = {
entity_id: {
"content": content,
"entity_name": entity_name,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
await entities_vdb.upsert(entity_data)
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import make_relation_chunk_key, compute_incremental_chunk_ids
if entity_chunks_storage is not None:
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
if source_id_changed or not has_stored_data:
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
if is_renaming:
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
)
if is_renaming and relation_chunks_storage is not None and relations_to_update:
for src, tgt, edge_data in relations_to_update:
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
old_normalized_src, old_normalized_tgt = sorted([old_src, old_tgt])
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
if old_storage_key != new_storage_key:
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
relation_chunk_ids = [
cid for cid in old_stored_data.get("chunk_ids", []) if cid
]
else:
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
await relation_chunks_storage.delete([old_storage_key])
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(f"Entity Edit: `{entity_name}` successfully updated")
return await get_entity_info(
chunk_entity_relation_graph,
entities_vdb,
entity_name,
include_vector_data=True,
)
async def aedit_entity( async def aedit_entity(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
@ -519,7 +253,6 @@ async def aedit_entity(
entity_name: str, entity_name: str,
updated_data: dict[str, str], updated_data: dict[str, str],
allow_rename: bool = True, allow_rename: bool = True,
allow_merge: bool = False,
entity_chunks_storage=None, entity_chunks_storage=None,
relation_chunks_storage=None, relation_chunks_storage=None,
) -> dict[str, Any]: ) -> dict[str, Any]:
@ -535,177 +268,339 @@ async def aedit_entity(
entity_name: Name of the entity to edit entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name, defaults to False
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns: Returns:
Dictionary containing updated entity information and operation summary with the following structure: Dictionary containing updated entity information
{
"entity_name": str, # Name of the entity
"description": str, # Entity description
"entity_type": str, # Entity type
"source_id": str, # Source chunk IDs
... # Other entity properties
"operation_summary": {
"merged": bool, # Whether entity was merged
"merge_status": str, # "success" | "failed" | "not_attempted"
"merge_error": str | None, # Error message if merge failed
"operation_status": str, # "success" | "partial_success" | "failure"
"target_entity": str | None, # Target entity name if renaming/merging
"final_entity": str, # Final entity name after operation
"renamed": bool # Whether entity was renamed
}
}
operation_status values:
- "success": Operation completed successfully (update/rename/merge all succeeded)
- "partial_success": Non-name updates succeeded but merge failed
- "failure": Operation failed completely
merge_status values:
- "success": Entity successfully merged into target
- "failed": Merge operation failed
- "not_attempted": No merge was attempted (normal update/rename)
""" """
# Determine entities to lock
new_entity_name = updated_data.get("entity_name", entity_name) new_entity_name = updated_data.get("entity_name", entity_name)
is_renaming = new_entity_name != entity_name is_renaming = new_entity_name != entity_name
lock_keys = sorted({entity_name, new_entity_name}) if is_renaming else [entity_name] # Lock both original and new entity names if renaming
lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name]
# Use keyed lock for entity to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "") workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
operation_summary: dict[str, Any] = {
"merged": False,
"merge_status": "not_attempted",
"merge_error": None,
"operation_status": "success",
"target_entity": None,
"final_entity": new_entity_name if is_renaming else entity_name,
"renamed": is_renaming,
}
async with get_storage_keyed_lock( async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False lock_keys, namespace=namespace, enable_logging=False
): ):
try: try:
if is_renaming and not allow_rename: # Save original entity name for chunk tracking updates
original_entity_name = entity_name
# 1. Get current entity information
node_exists = await chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await chunk_entity_relation_graph.get_node(entity_name)
# If renaming, check if new name already exists
if is_renaming:
if not allow_rename:
raise ValueError( raise ValueError(
"Entity renaming is not allowed. Set allow_rename=True to enable this feature" "Entity renaming is not allowed. Set allow_rename=True to enable this feature"
) )
if is_renaming: existing_node = await chunk_entity_relation_graph.has_node(
target_exists = await chunk_entity_relation_graph.has_node(
new_entity_name new_entity_name
) )
if target_exists: if existing_node:
if not allow_merge:
raise ValueError( raise ValueError(
f"Entity name '{new_entity_name}' already exists, cannot rename" f"Entity name '{new_entity_name}' already exists, cannot rename"
) )
# 2. Update entity information in the graph
new_node_data = {**node_data, **updated_data}
new_node_data["entity_id"] = new_entity_name
if "entity_name" in new_node_data:
del new_node_data[
"entity_name"
] # Node data should not contain entity_name field
# If renaming entity
if is_renaming:
logger.info( logger.info(
f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`" f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`"
) )
# Track whether non-name updates were applied # Create new entity
non_name_updates_applied = False await chunk_entity_relation_graph.upsert_node(
non_name_updates = { new_entity_name, new_node_data
key: value )
for key, value in updated_data.items()
if key != "entity_name" # Store relationships that need to be updated
relations_to_update = []
relations_to_delete = []
# Get all edges related to the original entity
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
if edges:
# Recreate edges for the new entity
for source, target in edges:
edge_data = await chunk_entity_relation_graph.get_edge(
source, target
)
if edge_data:
relations_to_delete.append(
compute_mdhash_id(source + target, prefix="rel-")
)
relations_to_delete.append(
compute_mdhash_id(target + source, prefix="rel-")
)
if source == entity_name:
await chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data
)
relations_to_update.append(
(new_entity_name, target, edge_data)
)
else: # target == entity_name
await chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data
)
relations_to_update.append(
(source, new_entity_name, edge_data)
)
# Delete old entity
await chunk_entity_relation_graph.delete_node(entity_name)
# Delete old entity record from vector database
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([old_entity_id])
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
# Normalize entity order for consistent vector ID generation
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create content using normalized order
content = (
f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
)
# Calculate relationship ID using normalized order
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": normalized_src,
"tgt_id": normalized_tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
} }
# Apply non-name updates first # Update vector database
if non_name_updates: await relationships_vdb.upsert(relation_data)
try:
logger.info(
"Entity Edit: applying non-name updates before merge"
)
await _edit_entity_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name,
non_name_updates,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
non_name_updates_applied = True
except Exception as update_error:
# If update fails, re-raise immediately
logger.error(
f"Entity Edit: non-name updates failed: {update_error}"
)
raise
# Attempt to merge entities # Update working entity name to new name
try: entity_name = new_entity_name
merge_result = await _merge_entities_impl(
chunk_entity_relation_graph, else:
entities_vdb, # If not renaming, directly update node data
relationships_vdb, await chunk_entity_relation_graph.upsert_node(
[entity_name], entity_name, new_node_data
new_entity_name, )
merge_strategy=None,
target_entity_data=None, # 3. Recalculate entity's vector representation and update vector database
description = new_node_data.get("description", "")
source_id = new_node_data.get("source_id", "")
entity_type = new_node_data.get("entity_type", "")
content = entity_name + "\n" + description
# Calculate entity ID
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
# Prepare data for vector database update
entity_data = {
entity_id: {
"content": content,
"entity_name": entity_name,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
# Update vector database
await entities_vdb.upsert(entity_data)
# 4. Update chunk tracking storages
if entity_chunks_storage is not None or relation_chunks_storage is not None:
from .utils import (
make_relation_chunk_key,
compute_incremental_chunk_ids,
)
# 4.1 Handle entity chunk tracking
if entity_chunks_storage is not None:
# Get storage key (use original name for renaming scenario)
storage_key = original_entity_name if is_renaming else entity_name
stored_data = await entity_chunks_storage.get_by_id(storage_key)
has_stored_data = (
stored_data
and isinstance(stored_data, dict)
and stored_data.get("chunk_ids")
)
# Get old and new source_id
old_source_id = node_data.get("source_id", "")
old_chunk_ids = [
cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
]
new_source_id = new_node_data.get("source_id", "")
new_chunk_ids = [
cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
]
source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
# Update if: source_id changed OR storage has no data
if source_id_changed or not has_stored_data:
# Get existing full chunk_ids from storage
existing_full_chunk_ids = []
if has_stored_data:
existing_full_chunk_ids = [
cid for cid in stored_data.get("chunk_ids", []) if cid
]
# If no stored data exists, use old source_id as baseline
if not existing_full_chunk_ids:
existing_full_chunk_ids = old_chunk_ids.copy()
# Use utility function to compute incremental updates
updated_chunk_ids = compute_incremental_chunk_ids(
existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
)
# Update storage (even if updated_chunk_ids is empty)
if is_renaming:
# Renaming: delete old + create new
await entity_chunks_storage.delete([original_entity_name])
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
else:
# Non-renaming: direct update
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": updated_chunk_ids,
"count": len(updated_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
)
# 4.2 Handle relation chunk tracking if entity was renamed
if (
is_renaming
and relation_chunks_storage is not None
and relations_to_update
):
for src, tgt, edge_data in relations_to_update:
# Determine old entity pair (before rename)
old_src = original_entity_name if src == entity_name else src
old_tgt = original_entity_name if tgt == entity_name else tgt
# Normalize entity order for both old and new keys
old_normalized_src, old_normalized_tgt = sorted(
[old_src, old_tgt]
)
new_normalized_src, new_normalized_tgt = sorted([src, tgt])
# Generate storage keys
old_storage_key = make_relation_chunk_key(
old_normalized_src, old_normalized_tgt
)
new_storage_key = make_relation_chunk_key(
new_normalized_src, new_normalized_tgt
)
# If keys are different, we need to move the chunk tracking
if old_storage_key != new_storage_key:
# Get complete chunk IDs from storage first (preserves all existing references)
old_stored_data = await relation_chunks_storage.get_by_id(
old_storage_key
)
relation_chunk_ids = []
if old_stored_data and isinstance(old_stored_data, dict):
# Use complete chunk_ids from storage
relation_chunk_ids = [
cid
for cid in old_stored_data.get("chunk_ids", [])
if cid
]
else:
# Fallback: if storage has no data, use graph's source_id
relation_source_id = edge_data.get("source_id", "")
relation_chunk_ids = [
cid
for cid in relation_source_id.split(GRAPH_FIELD_SEP)
if cid
]
# Delete old relation chunk tracking
await relation_chunks_storage.delete([old_storage_key])
# Create new relation chunk tracking (migrate complete data)
if relation_chunk_ids:
await relation_chunks_storage.upsert(
{
new_storage_key: {
"chunk_ids": relation_chunk_ids,
"count": len(relation_chunk_ids),
}
}
)
logger.info(
f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
)
# 5. Save changes
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage, entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage, relation_chunks_storage=relation_chunks_storage,
) )
# Merge succeeded logger.info(f"Entity Edit: `{entity_name}` successfully updated")
operation_summary.update( return await get_entity_info(
{
"merged": True,
"merge_status": "success",
"merge_error": None,
"operation_status": "success",
"target_entity": new_entity_name,
"final_entity": new_entity_name,
}
)
return {**merge_result, "operation_summary": operation_summary}
except Exception as merge_error:
# Merge failed, but update may have succeeded
logger.error(f"Entity Edit: merge failed: {merge_error}")
# Return partial success status (update succeeded but merge failed)
operation_summary.update(
{
"merged": False,
"merge_status": "failed",
"merge_error": str(merge_error),
"operation_status": "partial_success"
if non_name_updates_applied
else "failure",
"target_entity": new_entity_name,
"final_entity": entity_name, # Keep source entity name
}
)
# Get current entity info (with applied updates if any)
entity_info = await get_entity_info(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
entity_name, entity_name,
include_vector_data=True, include_vector_data=True,
) )
return {**entity_info, "operation_summary": operation_summary}
# Normal edit flow (no merge involved)
edit_result = await _edit_entity_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
entity_name,
updated_data,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
operation_summary["operation_status"] = "success"
return {**edit_result, "operation_summary": operation_summary}
except Exception as e: except Exception as e:
logger.error(f"Error while editing entity '{entity_name}': {e}") logger.error(f"Error while editing entity '{entity_name}': {e}")
raise raise
@ -1159,22 +1054,22 @@ async def acreate_relation(
raise raise
async def _merge_entities_impl( async def amerge_entities(
chunk_entity_relation_graph, chunk_entity_relation_graph,
entities_vdb, entities_vdb,
relationships_vdb, relationships_vdb,
source_entities: list[str], source_entities: list[str],
target_entity: str, target_entity: str,
*,
merge_strategy: dict[str, str] = None, merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None, target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None, entity_chunks_storage=None,
relation_chunks_storage=None, relation_chunks_storage=None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Internal helper that merges entities without acquiring storage locks. """Asynchronously merge multiple entities into one entity.
This function performs the actual entity merge operations without lock management. Merges multiple source entities into a target entity, handling all relationships,
It should only be called by public APIs that have already acquired necessary locks. and updating both the knowledge graph and vector database.
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
Args: Args:
chunk_entity_relation_graph: Graph storage instance chunk_entity_relation_graph: Graph storage instance
@ -1182,18 +1077,28 @@ async def _merge_entities_impl(
relationships_vdb: Vector database storage for relationships relationships_vdb: Vector database storage for relationships
source_entities: List of source entity names to merge source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging target_entity: Name of the target entity after merging
merge_strategy: Deprecated. Merge strategy for each field (optional) merge_strategy: Deprecated (Each field uses its own default strategy). If provided,
target_entity_data: Dictionary of specific values to set for target entity (optional) customizations are applied but a warning is logged.
entity_chunks_storage: Optional KV storage for tracking chunks target_entity_data: Dictionary of specific values to set for the target entity,
relation_chunks_storage: Optional KV storage for tracking relation chunks overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns: Returns:
Dictionary containing the merged entity information Dictionary containing the merged entity information
Note:
Caller must acquire appropriate locks before calling this function.
All source entities and the target entity should be locked together.
""" """
# Collect all entities involved (source + target) and lock them all in sorted order
all_entities = set(source_entities)
all_entities.add(target_entity)
lock_keys = sorted(all_entities)
# Use keyed lock for all entities to ensure atomic graph and vector db operations
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False
):
try:
# Default merge strategy for entities # Default merge strategy for entities
default_entity_merge_strategy = { default_entity_merge_strategy = {
"description": "concatenate", "description": "concatenate",
@ -1210,7 +1115,9 @@ async def _merge_entities_impl(
**default_entity_merge_strategy, **default_entity_merge_strategy,
**merge_strategy, **merge_strategy,
} }
target_entity_data = {} if target_entity_data is None else target_entity_data target_entity_data = (
{} if target_entity_data is None else target_entity_data
)
# 1. Check if all source entities exist # 1. Check if all source entities exist
source_entities_data = {} source_entities_data = {}
@ -1225,8 +1132,8 @@ async def _merge_entities_impl(
target_exists = await chunk_entity_relation_graph.has_node(target_entity) target_exists = await chunk_entity_relation_graph.has_node(target_entity)
existing_target_entity_data = {} existing_target_entity_data = {}
if target_exists: if target_exists:
existing_target_entity_data = await chunk_entity_relation_graph.get_node( existing_target_entity_data = (
target_entity await chunk_entity_relation_graph.get_node(target_entity)
) )
logger.info( logger.info(
"Entity Merge: target entity already exists, source and target entities will be merged" "Entity Merge: target entity already exists, source and target entities will be merged"
@ -1259,16 +1166,22 @@ async def _merge_entities_impl(
for src, tgt in edges: for src, tgt in edges:
# Ensure src is the current entity # Ensure src is the current entity
if src == entity_name: if src == entity_name:
edge_data = await chunk_entity_relation_graph.get_edge(src, tgt) edge_data = await chunk_entity_relation_graph.get_edge(
src, tgt
)
all_relations.append((src, tgt, edge_data)) all_relations.append((src, tgt, edge_data))
# 5. Create or update the target entity # 5. Create or update the target entity
merged_entity_data["entity_id"] = target_entity merged_entity_data["entity_id"] = target_entity
if not target_exists: if not target_exists:
await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data
)
logger.info(f"Entity Merge: created target '{target_entity}'") logger.info(f"Entity Merge: created target '{target_entity}'")
else: else:
await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data
)
logger.info(f"Entity Merge: Updated target '{target_entity}'") logger.info(f"Entity Merge: Updated target '{target_entity}'")
# 6. Recreate all relations pointing to the target entity in KG # 6. Recreate all relations pointing to the target entity in KG
@ -1296,7 +1209,9 @@ async def _merge_entities_impl(
# Skip relationships between source entities to avoid self-loops # Skip relationships between source entities to avoid self-loops
if new_src == new_tgt: if new_src == new_tgt:
logger.info(f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop") logger.info(
f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop"
)
continue continue
# Normalize entity order for consistent duplicate detection (undirected relationships) # Normalize entity order for consistent duplicate detection (undirected relationships)
@ -1305,7 +1220,9 @@ async def _merge_entities_impl(
# Process chunk tracking for this relation # Process chunk tracking for this relation
if relation_chunks_storage is not None: if relation_chunks_storage is not None:
storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) storage_key = make_relation_chunk_key(
normalized_src, normalized_tgt
)
# Get chunk_ids from storage for this original relation # Get chunk_ids from storage for this original relation
stored = await relation_chunks_storage.get_by_id(old_storage_key) stored = await relation_chunks_storage.get_by_id(old_storage_key)
@ -1315,7 +1232,9 @@ async def _merge_entities_impl(
else: else:
# Fallback to source_id from graph # Fallback to source_id from graph
source_id = edge_data.get("source_id", "") source_id = edge_data.get("source_id", "")
chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] chunk_ids = [
cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid
]
# Accumulate chunk_ids with ordered deduplication # Accumulate chunk_ids with ordered deduplication
if storage_key not in relation_chunk_tracking: if storage_key not in relation_chunk_tracking:
@ -1382,7 +1301,9 @@ async def _merge_entities_impl(
) )
# 7. Update relationship vector representations # 7. Update relationship vector representations
logger.info(f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb") logger.info(
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
)
await relationships_vdb.delete(relations_to_delete) await relationships_vdb.delete(relations_to_delete)
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
edge_data = rel_data["data"] edge_data = rel_data["data"]
@ -1395,8 +1316,12 @@ async def _merge_entities_impl(
weight = float(edge_data.get("weight", 1.0)) weight = float(edge_data.get("weight", 1.0))
# Use normalized order for content and relation ID # Use normalized order for content and relation ID
content = f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" content = (
relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-") f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}"
)
relation_id = compute_mdhash_id(
normalized_src + normalized_tgt, prefix="rel-"
)
relation_data_for_vdb = { relation_data_for_vdb = {
relation_id: { relation_id: {
@ -1410,7 +1335,9 @@ async def _merge_entities_impl(
} }
} }
await relationships_vdb.upsert(relation_data_for_vdb) await relationships_vdb.upsert(relation_data_for_vdb)
logger.info(f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`") logger.info(
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
)
# 8. Update entity vector representation # 8. Update entity vector representation
description = merged_entity_data.get("description", "") description = merged_entity_data.get("description", "")
@ -1465,7 +1392,9 @@ async def _merge_entities_impl(
merged_chunk_ids.append(chunk_id) merged_chunk_ids.append(chunk_id)
# Delete source entities' chunk tracking records # Delete source entities' chunk tracking records
entity_keys_to_delete = [e for e in source_entities if e != target_entity] entity_keys_to_delete = [
e for e in source_entities if e != target_entity
]
if entity_keys_to_delete: if entity_keys_to_delete:
await entity_chunks_storage.delete(entity_keys_to_delete) await entity_chunks_storage.delete(entity_keys_to_delete)
@ -1519,62 +1448,6 @@ async def _merge_entities_impl(
include_vector_data=True, include_vector_data=True,
) )
async def amerge_entities(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
source_entities: list[str],
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity, handling all relationships,
and updating both the knowledge graph and vector database.
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
Args:
chunk_entity_relation_graph: Graph storage instance
entities_vdb: Vector database storage for entities
relationships_vdb: Vector database storage for relationships
source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging
merge_strategy: Deprecated (Each field uses its own default strategy). If provided,
customizations are applied but a warning is logged.
target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing the merged entity information
"""
# Collect all entities involved (source + target) and lock them all in sorted order
all_entities = set(source_entities)
all_entities.add(target_entity)
lock_keys = sorted(all_entities)
workspace = entities_vdb.global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
lock_keys, namespace=namespace, enable_logging=False
):
try:
return await _merge_entities_impl(
chunk_entity_relation_graph,
entities_vdb,
relationships_vdb,
source_entities,
target_entity,
merge_strategy=merge_strategy,
target_entity_data=target_entity_data,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
except Exception as e: except Exception as e:
logger.error(f"Error merging entities: {e}") logger.error(f"Error merging entities: {e}")
raise raise