diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 58876af5..6e3a52f8 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1050,8 +1050,12 @@ async def amerge_entities( relationships_vdb: Vector database storage for relationships source_entities: List of source entity names to merge target_entity: Name of the target entity after merging - merge_strategy: Deprecated (Each field uses its own default strategy). If provided, - customizations are applied but a warning is logged. + merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} + Supported strategies: + - "concatenate": Concatenate all values (for text fields) + - "keep_first": Keep the first non-empty value + - "keep_last": Keep the last non-empty value + - "join_unique": Join all unique values (for fields separated by delimiter) target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} @@ -1062,22 +1066,18 @@ async def amerge_entities( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: - # Default merge strategy for entities - default_entity_merge_strategy = { + # Default merge strategy + default_strategy = { "description": "concatenate", "entity_type": "keep_first", "source_id": "join_unique", - "file_path": "join_unique", } - effective_entity_merge_strategy = default_entity_merge_strategy - if merge_strategy: - logger.warning( - "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." - ) - effective_entity_merge_strategy = { - **default_entity_merge_strategy, - **merge_strategy, - } + + merge_strategy = ( + default_strategy + if merge_strategy is None + else {**default_strategy, **merge_strategy} + ) target_entity_data = ( {} if target_entity_data is None else target_entity_data ) @@ -1099,15 +1099,14 @@ async def amerge_entities( await chunk_entity_relation_graph.get_node(target_entity) ) logger.info( - "Entity Merge: target entity already exists, source and target entities will be merged" + f"Target entity '{target_entity}' already exists, will merge data" ) # 3. Merge entity data - merged_entity_data = _merge_attributes( + merged_entity_data = _merge_entity_attributes( list(source_entities_data.values()) + ([existing_target_entity_data] if target_exists else []), - effective_entity_merge_strategy, - filter_none_only=False, # Use entity behavior: filter falsy values + merge_strategy, ) # Apply any explicitly provided target entity data (overrides merged data) @@ -1117,11 +1116,11 @@ async def amerge_entities( # 4. Get all relationships of the source entities and target entity (if exists) all_relations = [] entities_to_collect = source_entities.copy() - + # If target entity exists, also collect its relationships for merging if target_exists: entities_to_collect.append(target_entity) - + for entity_name in entities_to_collect: # Get all relationships of the entities edges = await chunk_entity_relation_graph.get_node_edges(entity_name) @@ -1140,14 +1139,14 @@ async def amerge_entities( await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Entity Merge: created target '{target_entity}'") + logger.info(f"Created new target entity '{target_entity}'") else: await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Entity Merge: Updated target '{target_entity}'") + logger.info(f"Updated existing target entity '{target_entity}'") - # 6. Recreate all relations pointing to the target entity in KG + # 6. Recreate all relationships, pointing to the target entity relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] @@ -1160,48 +1159,48 @@ async def amerge_entities( # Skip relationships between source entities to avoid self-loops if new_src == new_tgt: logger.info( - f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" + f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" ) continue - # Normalize entity order for consistent duplicate detection (undirected relationships) - normalized_src, normalized_tgt = sorted([new_src, new_tgt]) - relation_key = f"{normalized_src}|{normalized_tgt}" - + # Check if the same relationship already exists + relation_key = f"{new_src}|{new_tgt}" if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] - merged_relation = _merge_attributes( + merged_relation = _merge_relation_attributes( [existing_data, edge_data], { "description": "concatenate", - "keywords": "join_unique_comma", + "keywords": "join_unique", "source_id": "join_unique", - "file_path": "join_unique", "weight": "max", }, - filter_none_only=True, # Use relation behavior: only filter None ) relation_updates[relation_key]["data"] = merged_relation logger.info( - f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" + f"Merged duplicate relationship: {new_src} -> {new_tgt}" ) else: relation_updates[relation_key] = { - "graph_src": new_src, - "graph_tgt": new_tgt, - "norm_src": normalized_src, - "norm_tgt": normalized_tgt, + "src": new_src, + "tgt": new_tgt, "data": edge_data.copy(), } # Apply relationship updates for rel_data in relation_updates.values(): await chunk_entity_relation_graph.upsert_edge( - rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] + rel_data["src"], rel_data["tgt"], rel_data["data"] ) logger.info( - f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" + f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" + ) + + # Delete relationships records from vector database + await relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity from vector database" ) # 7. Update entity vector representation @@ -1220,18 +1219,17 @@ async def amerge_entities( "entity_type": entity_type, } } + await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") # 8. Update relationship vector representations - logger.info( - f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" - ) - await relationships_vdb.delete(relations_to_delete) for rel_data in relation_updates.values(): + src = rel_data["src"] + tgt = rel_data["tgt"] edge_data = rel_data["data"] - normalized_src = rel_data["norm_src"] - normalized_tgt = rel_data["norm_tgt"] + + # Normalize entity order for consistent vector storage + normalized_src, normalized_tgt = sorted([src, tgt]) description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -1257,28 +1255,28 @@ async def amerge_entities( "weight": weight, } } + await relationships_vdb.upsert(relation_data_for_vdb) - logger.info( - f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" - ) # 9. Delete source entities for entity_name in source_entities: if entity_name == target_entity: - logger.warning( - f"Entity Merge: source entity'{entity_name}' is same as target entity" + logger.info( + f"Skipping deletion of '{entity_name}' as it's also the target entity" ) continue - logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") - - # Delete entity node and related edges from knowledge graph + # Delete entity node from knowledge graph await chunk_entity_relation_graph.delete_node(entity_name) # Delete entity record from vector database entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) + logger.info( + f"Deleted source entity '{entity_name}' and its vector embedding from database" + ) + # 10. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, @@ -1287,7 +1285,7 @@ async def amerge_entities( ) logger.info( - f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" + f"Successfully merged {len(source_entities)} entities into '{target_entity}'" ) return await get_entity_info( chunk_entity_relation_graph, @@ -1301,45 +1299,29 @@ async def amerge_entities( raise -def _merge_attributes( - data_list: list[dict[str, Any]], - merge_strategy: dict[str, str], - filter_none_only: bool = False, +def _merge_entity_attributes( + entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] ) -> dict[str, Any]: - """Merge attributes from multiple entities or relationships. - - This unified function handles merging of both entity and relationship attributes, - applying different merge strategies per field. + """Merge attributes from multiple entities. Args: - data_list: List of dictionaries containing entity or relationship data - merge_strategy: Merge strategy for each field. Supported strategies: - - "concatenate": Join all values with GRAPH_FIELD_SEP - - "keep_first": Keep the first non-empty value - - "keep_last": Keep the last non-empty value - - "join_unique": Join unique items separated by GRAPH_FIELD_SEP - - "join_unique_comma": Join unique items separated by comma and space - - "max": Keep the maximum numeric value (for numeric fields) - filter_none_only: If True, only filter None values (keep empty strings, 0, etc.). - If False, filter all falsy values. Default is False for backward compatibility. + entity_data_list: List of dictionaries containing entity data + merge_strategy: Merge strategy for each field Returns: - Dictionary containing merged data + Dictionary containing merged entity data """ merged_data = {} # Collect all possible keys all_keys = set() - for data in data_list: + for data in entity_data_list: all_keys.update(data.keys()) # Merge values for each key for key in all_keys: - # Get all values for this key based on filtering mode - if filter_none_only: - values = [data.get(key) for data in data_list if data.get(key) is not None] - else: - values = [data.get(key) for data in data_list if data.get(key)] + # Get all values for this key + values = [data.get(key) for data in entity_data_list if data.get(key)] if not values: continue @@ -1348,8 +1330,59 @@ def _merge_attributes( strategy = merge_strategy.get(key, "keep_first") if strategy == "concatenate": - # Convert all values to strings and join with GRAPH_FIELD_SEP - merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values) + merged_data[key] = "\n\n".join(values) + elif strategy == "keep_first": + merged_data[key] = values[0] + elif strategy == "keep_last": + merged_data[key] = values[-1] + elif strategy == "join_unique": + # Handle fields separated by GRAPH_FIELD_SEP + unique_items = set() + for value in values: + items = value.split(GRAPH_FIELD_SEP) + unique_items.update(items) + merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + else: + # Default strategy + merged_data[key] = values[0] + + return merged_data + + +def _merge_relation_attributes( + relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] +) -> dict[str, Any]: + """Merge attributes from multiple relationships. + + Args: + relation_data_list: List of dictionaries containing relationship data + merge_strategy: Merge strategy for each field + + Returns: + Dictionary containing merged relationship data + """ + merged_data = {} + + # Collect all possible keys + all_keys = set() + for data in relation_data_list: + all_keys.update(data.keys()) + + # Merge values for each key + for key in all_keys: + # Get all values for this key + values = [ + data.get(key) for data in relation_data_list if data.get(key) is not None + ] + + if not values: + continue + + # Merge values according to strategy + strategy = merge_strategy.get(key, "keep_first") + + if strategy == "concatenate": + merged_data[key] = "\n\n".join(str(v) for v in values) elif strategy == "keep_first": merged_data[key] = values[0] elif strategy == "keep_last": @@ -1361,22 +1394,14 @@ def _merge_attributes( items = str(value).split(GRAPH_FIELD_SEP) unique_items.update(items) merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) - elif strategy == "join_unique_comma": - # Handle fields separated by comma, join unique items with comma - unique_items = set() - for value in values: - items = str(value).split(",") - unique_items.update(item.strip() for item in items if item.strip()) - merged_data[key] = ",".join(sorted(unique_items)) elif strategy == "max": # For numeric fields like weight try: merged_data[key] = max(float(v) for v in values) except (ValueError, TypeError): - # Fallback to first value if conversion fails merged_data[key] = values[0] else: - # Default strategy: keep first value + # Default strategy merged_data[key] = values[0] return merged_data