Fix relation deduplication logic and standardize log message prefixes

(cherry picked from commit a25003c336)
This commit is contained in:
yangdx 2025-10-27 00:52:56 +08:00 committed by Raphaël MANSUY
parent 2ea1fccf1a
commit f13d30206f

View file

@ -1038,14 +1038,11 @@ async def amerge_entities(
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
entity_chunks_storage=None,
relation_chunks_storage=None,
) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity, handling all relationships,
and updating both the knowledge graph and vector database.
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
Args:
chunk_entity_relation_graph: Graph storage instance
@ -1057,8 +1054,6 @@ async def amerge_entities(
customizations are applied but a warning is logged.
target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
Returns:
Dictionary containing the merged entity information
@ -1123,8 +1118,8 @@ async def amerge_entities(
all_relations = []
entities_to_collect = source_entities.copy()
# If target entity exists and not already in source_entities, add it
if target_exists and target_entity not in source_entities:
# If target entity exists, also collect its relationships for merging
if target_exists:
entities_to_collect.append(target_entity)
for entity_name in entities_to_collect:
@ -1153,25 +1148,12 @@ async def amerge_entities(
logger.info(f"Entity Merge: Updated target '{target_entity}'")
# 6. Recreate all relations pointing to the target entity in KG
# Also collect chunk tracking information in the same loop
relation_updates = {} # Track relationships that need to be merged
relations_to_delete = []
# Initialize chunk tracking variables
relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids
old_relation_keys_to_delete = []
for src, tgt, edge_data in all_relations:
relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
# Collect old chunk tracking key for deletion
if relation_chunks_storage is not None:
from .utils import make_relation_chunk_key
old_storage_key = make_relation_chunk_key(src, tgt)
old_relation_keys_to_delete.append(old_storage_key)
new_src = target_entity if src in source_entities else src
new_tgt = target_entity if tgt in source_entities else tgt
@ -1186,34 +1168,6 @@ async def amerge_entities(
normalized_src, normalized_tgt = sorted([new_src, new_tgt])
relation_key = f"{normalized_src}|{normalized_tgt}"
# Process chunk tracking for this relation
if relation_chunks_storage is not None:
storage_key = make_relation_chunk_key(
normalized_src, normalized_tgt
)
# Get chunk_ids from storage for this original relation
stored = await relation_chunks_storage.get_by_id(old_storage_key)
if stored is not None and isinstance(stored, dict):
chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
else:
# Fallback to source_id from graph
source_id = edge_data.get("source_id", "")
chunk_ids = [
cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid
]
# Accumulate chunk_ids with ordered deduplication
if storage_key not in relation_chunk_tracking:
relation_chunk_tracking[storage_key] = []
existing_chunks = set(relation_chunk_tracking[storage_key])
for chunk_id in chunk_ids:
if chunk_id not in existing_chunks:
existing_chunks.add(chunk_id)
relation_chunk_tracking[storage_key].append(chunk_id)
if relation_key in relation_updates:
# Merge relationship data
existing_data = relation_updates[relation_key]["data"]
@ -1250,25 +1204,26 @@ async def amerge_entities(
f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`"
)
# Update relation chunk tracking storage
if relation_chunks_storage is not None and all_relations:
if old_relation_keys_to_delete:
await relation_chunks_storage.delete(old_relation_keys_to_delete)
# 7. Update entity vector representation
description = merged_entity_data.get("description", "")
source_id = merged_entity_data.get("source_id", "")
entity_type = merged_entity_data.get("entity_type", "")
content = target_entity + "\n" + description
if relation_chunk_tracking:
updates = {}
for storage_key, chunk_ids in relation_chunk_tracking.items():
updates[storage_key] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
entity_data_for_vdb = {
entity_id: {
"content": content,
"entity_name": target_entity,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
await entities_vdb.upsert(entity_data_for_vdb)
logger.info(f"Entity Merge: updating vdb `{target_entity}`")
await relation_chunks_storage.upsert(updates)
logger.info(
f"Entity Merge: merged chunk tracking for {len(updates)} relations"
)
# 7. Update relationship vector representations
# 8. Update relationship vector representations
logger.info(
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
)
@ -1307,80 +1262,7 @@ async def amerge_entities(
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
)
# 8. Update entity vector representation
description = merged_entity_data.get("description", "")
source_id = merged_entity_data.get("source_id", "")
entity_type = merged_entity_data.get("entity_type", "")
content = target_entity + "\n" + description
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
entity_data_for_vdb = {
entity_id: {
"content": content,
"entity_name": target_entity,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
await entities_vdb.upsert(entity_data_for_vdb)
logger.info(f"Entity Merge: updating vdb `{target_entity}`")
# 9. Merge entity chunk tracking (source entities first, then target entity)
if entity_chunks_storage is not None:
all_chunk_id_lists = []
# Build list of entities to process (source entities first, then target entity)
entities_to_process = []
# Add source entities first (excluding target if it's already in source list)
for entity_name in source_entities:
if entity_name != target_entity:
entities_to_process.append(entity_name)
# Add target entity last (if it exists)
if target_exists:
entities_to_process.append(target_entity)
# Process all entities in order with unified logic
for entity_name in entities_to_process:
stored = await entity_chunks_storage.get_by_id(entity_name)
if stored and isinstance(stored, dict):
chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
if chunk_ids:
all_chunk_id_lists.append(chunk_ids)
# Merge chunk_ids with ordered deduplication (preserves order, source entities first)
merged_chunk_ids = []
seen = set()
for chunk_id_list in all_chunk_id_lists:
for chunk_id in chunk_id_list:
if chunk_id not in seen:
seen.add(chunk_id)
merged_chunk_ids.append(chunk_id)
# Delete source entities' chunk tracking records
entity_keys_to_delete = [
e for e in source_entities if e != target_entity
]
if entity_keys_to_delete:
await entity_chunks_storage.delete(entity_keys_to_delete)
# Update target entity's chunk tracking
if merged_chunk_ids:
await entity_chunks_storage.upsert(
{
target_entity: {
"chunk_ids": merged_chunk_ids,
"count": len(merged_chunk_ids),
}
}
)
logger.info(
f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'"
)
# 10. Delete source entities
# 9. Delete source entities
for entity_name in source_entities:
if entity_name == target_entity:
logger.warning(
@ -1397,13 +1279,11 @@ async def amerge_entities(
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([entity_id])
# 11. Save changes
# 10. Save changes
await _persist_graph_updates(
entities_vdb=entities_vdb,
relationships_vdb=relationships_vdb,
chunk_entity_relation_graph=chunk_entity_relation_graph,
entity_chunks_storage=entity_chunks_storage,
relation_chunks_storage=relation_chunks_storage,
)
logger.info(