This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:38 +08:00
parent 14413cacbc
commit b7e1e59b91

View file

@ -1050,8 +1050,12 @@ async def amerge_entities(
relationships_vdb: Vector database storage for relationships relationships_vdb: Vector database storage for relationships
source_entities: List of source entity names to merge source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging target_entity: Name of the target entity after merging
merge_strategy: Deprecated (Each field uses its own default strategy). If provided, merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
customizations are applied but a warning is logged. Supported strategies:
- "concatenate": Concatenate all values (for text fields)
- "keep_first": Keep the first non-empty value
- "keep_last": Keep the last non-empty value
- "join_unique": Join all unique values (for fields separated by delimiter)
target_entity_data: Dictionary of specific values to set for the target entity, target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
@ -1062,22 +1066,18 @@ async def amerge_entities(
# Use graph database lock to ensure atomic graph and vector db operations # Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock: async with graph_db_lock:
try: try:
# Default merge strategy for entities # Default merge strategy
default_entity_merge_strategy = { default_strategy = {
"description": "concatenate", "description": "concatenate",
"entity_type": "keep_first", "entity_type": "keep_first",
"source_id": "join_unique", "source_id": "join_unique",
"file_path": "join_unique",
} }
effective_entity_merge_strategy = default_entity_merge_strategy
if merge_strategy: merge_strategy = (
logger.warning( default_strategy
"Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." if merge_strategy is None
) else {**default_strategy, **merge_strategy}
effective_entity_merge_strategy = { )
**default_entity_merge_strategy,
**merge_strategy,
}
target_entity_data = ( target_entity_data = (
{} if target_entity_data is None else target_entity_data {} if target_entity_data is None else target_entity_data
) )
@ -1099,15 +1099,14 @@ async def amerge_entities(
await chunk_entity_relation_graph.get_node(target_entity) await chunk_entity_relation_graph.get_node(target_entity)
) )
logger.info( logger.info(
"Entity Merge: target entity already exists, source and target entities will be merged" f"Target entity '{target_entity}' already exists, will merge data"
) )
# 3. Merge entity data # 3. Merge entity data
merged_entity_data = _merge_attributes( merged_entity_data = _merge_entity_attributes(
list(source_entities_data.values()) list(source_entities_data.values())
+ ([existing_target_entity_data] if target_exists else []), + ([existing_target_entity_data] if target_exists else []),
effective_entity_merge_strategy, merge_strategy,
filter_none_only=False, # Use entity behavior: filter falsy values
) )
# Apply any explicitly provided target entity data (overrides merged data) # Apply any explicitly provided target entity data (overrides merged data)
@ -1117,11 +1116,11 @@ async def amerge_entities(
# 4. Get all relationships of the source entities and target entity (if exists) # 4. Get all relationships of the source entities and target entity (if exists)
all_relations = [] all_relations = []
entities_to_collect = source_entities.copy() entities_to_collect = source_entities.copy()
# If target entity exists, also collect its relationships for merging # If target entity exists, also collect its relationships for merging
if target_exists: if target_exists:
entities_to_collect.append(target_entity) entities_to_collect.append(target_entity)
for entity_name in entities_to_collect: for entity_name in entities_to_collect:
# Get all relationships of the entities # Get all relationships of the entities
edges = await chunk_entity_relation_graph.get_node_edges(entity_name) edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
@ -1140,14 +1139,14 @@ async def amerge_entities(
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data target_entity, merged_entity_data
) )
logger.info(f"Entity Merge: created target '{target_entity}'") logger.info(f"Created new target entity '{target_entity}'")
else: else:
await chunk_entity_relation_graph.upsert_node( await chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data target_entity, merged_entity_data
) )
logger.info(f"Entity Merge: Updated target '{target_entity}'") logger.info(f"Updated existing target entity '{target_entity}'")
# 6. Recreate all relations pointing to the target entity in KG # 6. Recreate all relationships, pointing to the target entity
relation_updates = {} # Track relationships that need to be merged relation_updates = {} # Track relationships that need to be merged
relations_to_delete = [] relations_to_delete = []
@ -1160,48 +1159,48 @@ async def amerge_entities(
# Skip relationships between source entities to avoid self-loops # Skip relationships between source entities to avoid self-loops
if new_src == new_tgt: if new_src == new_tgt:
logger.info( logger.info(
f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
) )
continue continue
# Normalize entity order for consistent duplicate detection (undirected relationships) # Check if the same relationship already exists
normalized_src, normalized_tgt = sorted([new_src, new_tgt]) relation_key = f"{new_src}|{new_tgt}"
relation_key = f"{normalized_src}|{normalized_tgt}"
if relation_key in relation_updates: if relation_key in relation_updates:
# Merge relationship data # Merge relationship data
existing_data = relation_updates[relation_key]["data"] existing_data = relation_updates[relation_key]["data"]
merged_relation = _merge_attributes( merged_relation = _merge_relation_attributes(
[existing_data, edge_data], [existing_data, edge_data],
{ {
"description": "concatenate", "description": "concatenate",
"keywords": "join_unique_comma", "keywords": "join_unique",
"source_id": "join_unique", "source_id": "join_unique",
"file_path": "join_unique",
"weight": "max", "weight": "max",
}, },
filter_none_only=True, # Use relation behavior: only filter None
) )
relation_updates[relation_key]["data"] = merged_relation relation_updates[relation_key]["data"] = merged_relation
logger.info( logger.info(
f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" f"Merged duplicate relationship: {new_src} -> {new_tgt}"
) )
else: else:
relation_updates[relation_key] = { relation_updates[relation_key] = {
"graph_src": new_src, "src": new_src,
"graph_tgt": new_tgt, "tgt": new_tgt,
"norm_src": normalized_src,
"norm_tgt": normalized_tgt,
"data": edge_data.copy(), "data": edge_data.copy(),
} }
# Apply relationship updates # Apply relationship updates
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
await chunk_entity_relation_graph.upsert_edge( await chunk_entity_relation_graph.upsert_edge(
rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] rel_data["src"], rel_data["tgt"], rel_data["data"]
) )
logger.info( logger.info(
f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
)
# Delete relationships records from vector database
await relationships_vdb.delete(relations_to_delete)
logger.info(
f"Deleted {len(relations_to_delete)} relation records for entity from vector database"
) )
# 7. Update entity vector representation # 7. Update entity vector representation
@ -1220,18 +1219,17 @@ async def amerge_entities(
"entity_type": entity_type, "entity_type": entity_type,
} }
} }
await entities_vdb.upsert(entity_data_for_vdb) await entities_vdb.upsert(entity_data_for_vdb)
logger.info(f"Entity Merge: updating vdb `{target_entity}`")
# 8. Update relationship vector representations # 8. Update relationship vector representations
logger.info(
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
)
await relationships_vdb.delete(relations_to_delete)
for rel_data in relation_updates.values(): for rel_data in relation_updates.values():
src = rel_data["src"]
tgt = rel_data["tgt"]
edge_data = rel_data["data"] edge_data = rel_data["data"]
normalized_src = rel_data["norm_src"]
normalized_tgt = rel_data["norm_tgt"] # Normalize entity order for consistent vector storage
normalized_src, normalized_tgt = sorted([src, tgt])
description = edge_data.get("description", "") description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "") keywords = edge_data.get("keywords", "")
@ -1257,28 +1255,28 @@ async def amerge_entities(
"weight": weight, "weight": weight,
} }
} }
await relationships_vdb.upsert(relation_data_for_vdb) await relationships_vdb.upsert(relation_data_for_vdb)
logger.info(
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
)
# 9. Delete source entities # 9. Delete source entities
for entity_name in source_entities: for entity_name in source_entities:
if entity_name == target_entity: if entity_name == target_entity:
logger.warning( logger.info(
f"Entity Merge: source entity'{entity_name}' is same as target entity" f"Skipping deletion of '{entity_name}' as it's also the target entity"
) )
continue continue
logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") # Delete entity node from knowledge graph
# Delete entity node and related edges from knowledge graph
await chunk_entity_relation_graph.delete_node(entity_name) await chunk_entity_relation_graph.delete_node(entity_name)
# Delete entity record from vector database # Delete entity record from vector database
entity_id = compute_mdhash_id(entity_name, prefix="ent-") entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await entities_vdb.delete([entity_id]) await entities_vdb.delete([entity_id])
logger.info(
f"Deleted source entity '{entity_name}' and its vector embedding from database"
)
# 10. Save changes # 10. Save changes
await _persist_graph_updates( await _persist_graph_updates(
entities_vdb=entities_vdb, entities_vdb=entities_vdb,
@ -1287,7 +1285,7 @@ async def amerge_entities(
) )
logger.info( logger.info(
f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
) )
return await get_entity_info( return await get_entity_info(
chunk_entity_relation_graph, chunk_entity_relation_graph,
@ -1301,45 +1299,29 @@ async def amerge_entities(
raise raise
def _merge_attributes( def _merge_entity_attributes(
data_list: list[dict[str, Any]], entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
merge_strategy: dict[str, str],
filter_none_only: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Merge attributes from multiple entities or relationships. """Merge attributes from multiple entities.
This unified function handles merging of both entity and relationship attributes,
applying different merge strategies per field.
Args: Args:
data_list: List of dictionaries containing entity or relationship data entity_data_list: List of dictionaries containing entity data
merge_strategy: Merge strategy for each field. Supported strategies: merge_strategy: Merge strategy for each field
- "concatenate": Join all values with GRAPH_FIELD_SEP
- "keep_first": Keep the first non-empty value
- "keep_last": Keep the last non-empty value
- "join_unique": Join unique items separated by GRAPH_FIELD_SEP
- "join_unique_comma": Join unique items separated by comma and space
- "max": Keep the maximum numeric value (for numeric fields)
filter_none_only: If True, only filter None values (keep empty strings, 0, etc.).
If False, filter all falsy values. Default is False for backward compatibility.
Returns: Returns:
Dictionary containing merged data Dictionary containing merged entity data
""" """
merged_data = {} merged_data = {}
# Collect all possible keys # Collect all possible keys
all_keys = set() all_keys = set()
for data in data_list: for data in entity_data_list:
all_keys.update(data.keys()) all_keys.update(data.keys())
# Merge values for each key # Merge values for each key
for key in all_keys: for key in all_keys:
# Get all values for this key based on filtering mode # Get all values for this key
if filter_none_only: values = [data.get(key) for data in entity_data_list if data.get(key)]
values = [data.get(key) for data in data_list if data.get(key) is not None]
else:
values = [data.get(key) for data in data_list if data.get(key)]
if not values: if not values:
continue continue
@ -1348,8 +1330,59 @@ def _merge_attributes(
strategy = merge_strategy.get(key, "keep_first") strategy = merge_strategy.get(key, "keep_first")
if strategy == "concatenate": if strategy == "concatenate":
# Convert all values to strings and join with GRAPH_FIELD_SEP merged_data[key] = "\n\n".join(values)
merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values) elif strategy == "keep_first":
merged_data[key] = values[0]
elif strategy == "keep_last":
merged_data[key] = values[-1]
elif strategy == "join_unique":
# Handle fields separated by GRAPH_FIELD_SEP
unique_items = set()
for value in values:
items = value.split(GRAPH_FIELD_SEP)
unique_items.update(items)
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
else:
# Default strategy
merged_data[key] = values[0]
return merged_data
def _merge_relation_attributes(
relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
) -> dict[str, Any]:
"""Merge attributes from multiple relationships.
Args:
relation_data_list: List of dictionaries containing relationship data
merge_strategy: Merge strategy for each field
Returns:
Dictionary containing merged relationship data
"""
merged_data = {}
# Collect all possible keys
all_keys = set()
for data in relation_data_list:
all_keys.update(data.keys())
# Merge values for each key
for key in all_keys:
# Get all values for this key
values = [
data.get(key) for data in relation_data_list if data.get(key) is not None
]
if not values:
continue
# Merge values according to strategy
strategy = merge_strategy.get(key, "keep_first")
if strategy == "concatenate":
merged_data[key] = "\n\n".join(str(v) for v in values)
elif strategy == "keep_first": elif strategy == "keep_first":
merged_data[key] = values[0] merged_data[key] = values[0]
elif strategy == "keep_last": elif strategy == "keep_last":
@ -1361,22 +1394,14 @@ def _merge_attributes(
items = str(value).split(GRAPH_FIELD_SEP) items = str(value).split(GRAPH_FIELD_SEP)
unique_items.update(items) unique_items.update(items)
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
elif strategy == "join_unique_comma":
# Handle fields separated by comma, join unique items with comma
unique_items = set()
for value in values:
items = str(value).split(",")
unique_items.update(item.strip() for item in items if item.strip())
merged_data[key] = ",".join(sorted(unique_items))
elif strategy == "max": elif strategy == "max":
# For numeric fields like weight # For numeric fields like weight
try: try:
merged_data[key] = max(float(v) for v in values) merged_data[key] = max(float(v) for v in values)
except (ValueError, TypeError): except (ValueError, TypeError):
# Fallback to first value if conversion fails
merged_data[key] = values[0] merged_data[key] = values[0]
else: else:
# Default strategy: keep first value # Default strategy
merged_data[key] = values[0] merged_data[key] = values[0]
return merged_data return merged_data