From 11a1631d76f811dc67709d89b25226e5245f8fc5 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 14:23:51 +0800 Subject: [PATCH] Refactor entity edit and merge functions to support merge-on-rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Extract internal implementation helpers • Add allow_merge parameter to aedit_entity • Support merging when renaming to existing name • Improve code reusability and modularity • Maintain backward compatibility --- lightrag/utils_graph.py | 1333 ++++++++++++++++++++------------------- 1 file changed, 683 insertions(+), 650 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index d782059d..c18c17c0 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -246,6 +246,272 @@ async def adelete_by_relation( ) +async def _edit_entity_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + updated_data: dict[str, str], + *, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> dict[str, Any]: + """Internal helper that edits an entity without acquiring storage locks. + + This function performs the actual entity edit operations without lock management. + It should only be called by public APIs that have already acquired necessary locks. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + entity_name: Name of the entity to edit + updated_data: Dictionary containing updated attributes (including optional entity_name for renaming) + entity_chunks_storage: Optional KV storage for tracking chunks + relation_chunks_storage: Optional KV storage for tracking relation chunks + + Returns: + Dictionary containing updated entity information + + Note: + Caller must acquire appropriate locks before calling this function. + If renaming (entity_name in updated_data), this function will check if the new name exists. + """ + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + original_entity_name = entity_name + + node_exists = await chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + + if is_renaming: + existing_node = await chunk_entity_relation_graph.has_node(new_entity_name) + if existing_node: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" + ) + + new_node_data = {**node_data, **updated_data} + new_node_data["entity_id"] = new_entity_name + + if "entity_name" in new_node_data: + del new_node_data[ + "entity_name" + ] # Node data should not contain entity_name field + + if is_renaming: + logger.info(f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`") + + await chunk_entity_relation_graph.upsert_node(new_entity_name, new_node_data) + + relations_to_update = [] + relations_to_delete = [] + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + if edges: + for source, target in edges: + edge_data = await chunk_entity_relation_graph.get_edge(source, target) + if edge_data: + relations_to_delete.append( + compute_mdhash_id(source + target, prefix="rel-") + ) + relations_to_delete.append( + compute_mdhash_id(target + source, prefix="rel-") + ) + if source == entity_name: + await chunk_entity_relation_graph.upsert_edge( + new_entity_name, target, edge_data + ) + relations_to_update.append((new_entity_name, target, edge_data)) + else: # target == entity_name + await chunk_entity_relation_graph.upsert_edge( + source, new_entity_name, edge_data + ) + relations_to_update.append((source, new_entity_name, edge_data)) + + await chunk_entity_relation_graph.delete_node(entity_name) + + old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([old_entity_id]) + + await relationships_vdb.delete(relations_to_delete) + + for src, tgt, edge_data in relations_to_update: + normalized_src, normalized_tgt = sorted([src, tgt]) + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + content = f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" + + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) + + relation_data = { + relation_id: { + "content": content, + "src_id": normalized_src, + "tgt_id": normalized_tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + await relationships_vdb.upsert(relation_data) + + entity_name = new_entity_name + else: + await chunk_entity_relation_graph.upsert_node(entity_name, new_node_data) + + description = new_node_data.get("description", "") + source_id = new_node_data.get("source_id", "") + entity_type = new_node_data.get("entity_type", "") + content = entity_name + "\n" + description + + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + entity_data = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + await entities_vdb.upsert(entity_data) + + if entity_chunks_storage is not None or relation_chunks_storage is not None: + from .utils import make_relation_chunk_key, compute_incremental_chunk_ids + + if entity_chunks_storage is not None: + storage_key = original_entity_name if is_renaming else entity_name + stored_data = await entity_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + old_source_id = node_data.get("source_id", "") + old_chunk_ids = [cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid] + + new_source_id = new_node_data.get("source_id", "") + new_chunk_ids = [cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + if source_id_changed or not has_stored_data: + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + if is_renaming: + await entity_chunks_storage.delete([original_entity_name]) + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + else: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + logger.info( + f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" + ) + + if is_renaming and relation_chunks_storage is not None and relations_to_update: + for src, tgt, edge_data in relations_to_update: + old_src = original_entity_name if src == entity_name else src + old_tgt = original_entity_name if tgt == entity_name else tgt + + old_normalized_src, old_normalized_tgt = sorted([old_src, old_tgt]) + new_normalized_src, new_normalized_tgt = sorted([src, tgt]) + + old_storage_key = make_relation_chunk_key( + old_normalized_src, old_normalized_tgt + ) + new_storage_key = make_relation_chunk_key( + new_normalized_src, new_normalized_tgt + ) + + if old_storage_key != new_storage_key: + old_stored_data = await relation_chunks_storage.get_by_id( + old_storage_key + ) + relation_chunk_ids = [] + + if old_stored_data and isinstance(old_stored_data, dict): + relation_chunk_ids = [ + cid for cid in old_stored_data.get("chunk_ids", []) if cid + ] + else: + relation_source_id = edge_data.get("source_id", "") + relation_chunk_ids = [ + cid + for cid in relation_source_id.split(GRAPH_FIELD_SEP) + if cid + ] + + await relation_chunks_storage.delete([old_storage_key]) + + if relation_chunk_ids: + await relation_chunks_storage.upsert( + { + new_storage_key: { + "chunk_ids": relation_chunk_ids, + "count": len(relation_chunk_ids), + } + } + ) + logger.info( + f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + ) + + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + logger.info(f"Entity Edit: `{entity_name}` successfully updated") + return await get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + entity_name, + include_vector_data=True, + ) + + async def aedit_entity( chunk_entity_relation_graph, entities_vdb, @@ -253,6 +519,7 @@ async def aedit_entity( entity_name: str, updated_data: dict[str, str], allow_rename: bool = True, + allow_merge: bool = False, entity_chunks_storage=None, relation_chunks_storage=None, ) -> dict[str, Any]: @@ -268,338 +535,82 @@ async def aedit_entity( entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + allow_merge: Whether to merge into an existing entity when renaming to an existing name, defaults to False entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing updated entity information """ - # Determine entities to lock new_entity_name = updated_data.get("entity_name", entity_name) is_renaming = new_entity_name != entity_name - # Lock both original and new entity names if renaming - lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name] + lock_keys = sorted({entity_name, new_entity_name}) if is_renaming else [entity_name] - # Use keyed lock for entity to ensure atomic graph and vector db operations workspace = entities_vdb.global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( lock_keys, namespace=namespace, enable_logging=False ): try: - # Save original entity name for chunk tracking updates - original_entity_name = entity_name + if is_renaming and not allow_rename: + raise ValueError( + "Entity renaming is not allowed. Set allow_rename=True to enable this feature" + ) - # 1. Get current entity information - node_exists = await chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Entity '{entity_name}' does not exist") - node_data = await chunk_entity_relation_graph.get_node(entity_name) - - # If renaming, check if new name already exists if is_renaming: - if not allow_rename: - raise ValueError( - "Entity renaming is not allowed. Set allow_rename=True to enable this feature" - ) - - existing_node = await chunk_entity_relation_graph.has_node( + target_exists = await chunk_entity_relation_graph.has_node( new_entity_name ) - if existing_node: - raise ValueError( - f"Entity name '{new_entity_name}' already exists, cannot rename" - ) - - # 2. Update entity information in the graph - new_node_data = {**node_data, **updated_data} - new_node_data["entity_id"] = new_entity_name - - if "entity_name" in new_node_data: - del new_node_data[ - "entity_name" - ] # Node data should not contain entity_name field - - # If renaming entity - if is_renaming: - logger.info( - f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`" - ) - - # Create new entity - await chunk_entity_relation_graph.upsert_node( - new_entity_name, new_node_data - ) - - # Store relationships that need to be updated - relations_to_update = [] - relations_to_delete = [] - # Get all edges related to the original entity - edges = await chunk_entity_relation_graph.get_node_edges(entity_name) - if edges: - # Recreate edges for the new entity - for source, target in edges: - edge_data = await chunk_entity_relation_graph.get_edge( - source, target - ) - if edge_data: - relations_to_delete.append( - compute_mdhash_id(source + target, prefix="rel-") - ) - relations_to_delete.append( - compute_mdhash_id(target + source, prefix="rel-") - ) - if source == entity_name: - await chunk_entity_relation_graph.upsert_edge( - new_entity_name, target, edge_data - ) - relations_to_update.append( - (new_entity_name, target, edge_data) - ) - else: # target == entity_name - await chunk_entity_relation_graph.upsert_edge( - source, new_entity_name, edge_data - ) - relations_to_update.append( - (source, new_entity_name, edge_data) - ) - - # Delete old entity - await chunk_entity_relation_graph.delete_node(entity_name) - - # Delete old entity record from vector database - old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await entities_vdb.delete([old_entity_id]) - - # Delete old relation records from vector database - await relationships_vdb.delete(relations_to_delete) - - # Update relationship vector representations - for src, tgt, edge_data in relations_to_update: - # Normalize entity order for consistent vector ID generation - normalized_src, normalized_tgt = sorted([src, tgt]) - - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - # Create content using normalized order - content = ( - f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" - ) - - # Calculate relationship ID using normalized order - relation_id = compute_mdhash_id( - normalized_src + normalized_tgt, prefix="rel-" - ) - - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": normalized_src, - "tgt_id": normalized_tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - # Update vector database - await relationships_vdb.upsert(relation_data) - - # Update working entity name to new name - entity_name = new_entity_name - - else: - # If not renaming, directly update node data - await chunk_entity_relation_graph.upsert_node( - entity_name, new_node_data - ) - - # 3. Recalculate entity's vector representation and update vector database - description = new_node_data.get("description", "") - source_id = new_node_data.get("source_id", "") - entity_type = new_node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - - # Update vector database - await entities_vdb.upsert(entity_data) - - # 4. Update chunk tracking storages - if entity_chunks_storage is not None or relation_chunks_storage is not None: - from .utils import ( - make_relation_chunk_key, - compute_incremental_chunk_ids, - ) - - # 4.1 Handle entity chunk tracking - if entity_chunks_storage is not None: - # Get storage key (use original name for renaming scenario) - storage_key = original_entity_name if is_renaming else entity_name - stored_data = await entity_chunks_storage.get_by_id(storage_key) - has_stored_data = ( - stored_data - and isinstance(stored_data, dict) - and stored_data.get("chunk_ids") - ) - - # Get old and new source_id - old_source_id = node_data.get("source_id", "") - old_chunk_ids = [ - cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid - ] - - new_source_id = new_node_data.get("source_id", "") - new_chunk_ids = [ - cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid - ] - - source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) - - # Update if: source_id changed OR storage has no data - if source_id_changed or not has_stored_data: - # Get existing full chunk_ids from storage - existing_full_chunk_ids = [] - if has_stored_data: - existing_full_chunk_ids = [ - cid for cid in stored_data.get("chunk_ids", []) if cid - ] - - # If no stored data exists, use old source_id as baseline - if not existing_full_chunk_ids: - existing_full_chunk_ids = old_chunk_ids.copy() - - # Use utility function to compute incremental updates - updated_chunk_ids = compute_incremental_chunk_ids( - existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + if target_exists: + if not allow_merge: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" ) - # Update storage (even if updated_chunk_ids is empty) - if is_renaming: - # Renaming: delete old + create new - await entity_chunks_storage.delete([original_entity_name]) - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": updated_chunk_ids, - "count": len(updated_chunk_ids), - } - } - ) - else: - # Non-renaming: direct update - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": updated_chunk_ids, - "count": len(updated_chunk_ids), - } - } - ) - - logger.info( - f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" - ) - - # 4.2 Handle relation chunk tracking if entity was renamed - if ( - is_renaming - and relation_chunks_storage is not None - and relations_to_update - ): - for src, tgt, edge_data in relations_to_update: - # Determine old entity pair (before rename) - old_src = original_entity_name if src == entity_name else src - old_tgt = original_entity_name if tgt == entity_name else tgt - - # Normalize entity order for both old and new keys - old_normalized_src, old_normalized_tgt = sorted( - [old_src, old_tgt] - ) - new_normalized_src, new_normalized_tgt = sorted([src, tgt]) - - # Generate storage keys - old_storage_key = make_relation_chunk_key( - old_normalized_src, old_normalized_tgt - ) - new_storage_key = make_relation_chunk_key( - new_normalized_src, new_normalized_tgt - ) - - # If keys are different, we need to move the chunk tracking - if old_storage_key != new_storage_key: - # Get complete chunk IDs from storage first (preserves all existing references) - old_stored_data = await relation_chunks_storage.get_by_id( - old_storage_key - ) - relation_chunk_ids = [] - - if old_stored_data and isinstance(old_stored_data, dict): - # Use complete chunk_ids from storage - relation_chunk_ids = [ - cid - for cid in old_stored_data.get("chunk_ids", []) - if cid - ] - else: - # Fallback: if storage has no data, use graph's source_id - relation_source_id = edge_data.get("source_id", "") - relation_chunk_ids = [ - cid - for cid in relation_source_id.split(GRAPH_FIELD_SEP) - if cid - ] - - # Delete old relation chunk tracking - await relation_chunks_storage.delete([old_storage_key]) - - # Create new relation chunk tracking (migrate complete data) - if relation_chunk_ids: - await relation_chunks_storage.upsert( - { - new_storage_key: { - "chunk_ids": relation_chunk_ids, - "count": len(relation_chunk_ids), - } - } - ) logger.info( - f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`" ) - # 5. Save changes - await _persist_graph_updates( - entities_vdb=entities_vdb, - relationships_vdb=relationships_vdb, - chunk_entity_relation_graph=chunk_entity_relation_graph, - entity_chunks_storage=entity_chunks_storage, - relation_chunks_storage=relation_chunks_storage, - ) + non_name_updates = { + key: value + for key, value in updated_data.items() + if key != "entity_name" + } + if non_name_updates: + logger.info( + "Entity Edit: applying non-name updates before merge" + ) + await _edit_entity_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name, + non_name_updates, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) - logger.info(f"Entity Edit: `{entity_name}` successfully updated") - return await get_entity_info( + return await _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + [entity_name], + new_entity_name, + merge_strategy=None, + target_entity_data=None, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + return await _edit_entity_impl( chunk_entity_relation_graph, entities_vdb, + relationships_vdb, entity_name, - include_vector_data=True, + updated_data, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) except Exception as e: logger.error(f"Error while editing entity '{entity_name}': {e}") @@ -1054,6 +1065,367 @@ async def acreate_relation( raise +async def _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entities: list[str], + target_entity: str, + *, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> dict[str, Any]: + """Internal helper that merges entities without acquiring storage locks. + + This function performs the actual entity merge operations without lock management. + It should only be called by public APIs that have already acquired necessary locks. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + source_entities: List of source entity names to merge + target_entity: Name of the target entity after merging + merge_strategy: Deprecated. Merge strategy for each field (optional) + target_entity_data: Dictionary of specific values to set for target entity (optional) + entity_chunks_storage: Optional KV storage for tracking chunks + relation_chunks_storage: Optional KV storage for tracking relation chunks + + Returns: + Dictionary containing the merged entity information + + Note: + Caller must acquire appropriate locks before calling this function. + All source entities and the target entity should be locked together. + """ + # Default merge strategy for entities + default_entity_merge_strategy = { + "description": "concatenate", + "entity_type": "keep_first", + "source_id": "join_unique", + "file_path": "join_unique", + } + effective_entity_merge_strategy = default_entity_merge_strategy + if merge_strategy: + logger.warning( + "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." + ) + effective_entity_merge_strategy = { + **default_entity_merge_strategy, + **merge_strategy, + } + target_entity_data = {} if target_entity_data is None else target_entity_data + + # 1. Check if all source entities exist + source_entities_data = {} + for entity_name in source_entities: + node_exists = await chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Source entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + source_entities_data[entity_name] = node_data + + # 2. Check if target entity exists and get its data if it does + target_exists = await chunk_entity_relation_graph.has_node(target_entity) + existing_target_entity_data = {} + if target_exists: + existing_target_entity_data = await chunk_entity_relation_graph.get_node( + target_entity + ) + logger.info( + "Entity Merge: target entity already exists, source and target entities will be merged" + ) + + # 3. Merge entity data + merged_entity_data = _merge_attributes( + list(source_entities_data.values()) + + ([existing_target_entity_data] if target_exists else []), + effective_entity_merge_strategy, + filter_none_only=False, # Use entity behavior: filter falsy values + ) + + # Apply any explicitly provided target entity data (overrides merged data) + for key, value in target_entity_data.items(): + merged_entity_data[key] = value + + # 4. Get all relationships of the source entities and target entity (if exists) + all_relations = [] + entities_to_collect = source_entities.copy() + + # If target entity exists and not already in source_entities, add it + if target_exists and target_entity not in source_entities: + entities_to_collect.append(target_entity) + + for entity_name in entities_to_collect: + # Get all relationships of the entities + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + if edges: + for src, tgt in edges: + # Ensure src is the current entity + if src == entity_name: + edge_data = await chunk_entity_relation_graph.get_edge(src, tgt) + all_relations.append((src, tgt, edge_data)) + + # 5. Create or update the target entity + merged_entity_data["entity_id"] = target_entity + if not target_exists: + await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) + logger.info(f"Entity Merge: created target '{target_entity}'") + else: + await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) + logger.info(f"Entity Merge: Updated target '{target_entity}'") + + # 6. Recreate all relations pointing to the target entity in KG + # Also collect chunk tracking information in the same loop + relation_updates = {} # Track relationships that need to be merged + relations_to_delete = [] + + # Initialize chunk tracking variables + relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids + old_relation_keys_to_delete = [] + + for src, tgt, edge_data in all_relations: + relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) + relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + + # Collect old chunk tracking key for deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + old_storage_key = make_relation_chunk_key(src, tgt) + old_relation_keys_to_delete.append(old_storage_key) + + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip relationships between source entities to avoid self-loops + if new_src == new_tgt: + logger.info(f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop") + continue + + # Normalize entity order for consistent duplicate detection (undirected relationships) + normalized_src, normalized_tgt = sorted([new_src, new_tgt]) + relation_key = f"{normalized_src}|{normalized_tgt}" + + # Process chunk tracking for this relation + if relation_chunks_storage is not None: + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + # Get chunk_ids from storage for this original relation + stored = await relation_chunks_storage.get_by_id(old_storage_key) + + if stored is not None and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + else: + # Fallback to source_id from graph + source_id = edge_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + # Accumulate chunk_ids with ordered deduplication + if storage_key not in relation_chunk_tracking: + relation_chunk_tracking[storage_key] = [] + + existing_chunks = set(relation_chunk_tracking[storage_key]) + for chunk_id in chunk_ids: + if chunk_id not in existing_chunks: + existing_chunks.add(chunk_id) + relation_chunk_tracking[storage_key].append(chunk_id) + + if relation_key in relation_updates: + # Merge relationship data + existing_data = relation_updates[relation_key]["data"] + merged_relation = _merge_attributes( + [existing_data, edge_data], + { + "description": "concatenate", + "keywords": "join_unique_comma", + "source_id": "join_unique", + "file_path": "join_unique", + "weight": "max", + }, + filter_none_only=True, # Use relation behavior: only filter None + ) + relation_updates[relation_key]["data"] = merged_relation + logger.info( + f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" + ) + else: + relation_updates[relation_key] = { + "graph_src": new_src, + "graph_tgt": new_tgt, + "norm_src": normalized_src, + "norm_tgt": normalized_tgt, + "data": edge_data.copy(), + } + + # Apply relationship updates + for rel_data in relation_updates.values(): + await chunk_entity_relation_graph.upsert_edge( + rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] + ) + logger.info( + f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" + ) + + # Update relation chunk tracking storage + if relation_chunks_storage is not None and all_relations: + if old_relation_keys_to_delete: + await relation_chunks_storage.delete(old_relation_keys_to_delete) + + if relation_chunk_tracking: + updates = {} + for storage_key, chunk_ids in relation_chunk_tracking.items(): + updates[storage_key] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + + await relation_chunks_storage.upsert(updates) + logger.info( + f"Entity Merge: merged chunk tracking for {len(updates)} relations" + ) + + # 7. Update relationship vector representations + logger.info(f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb") + await relationships_vdb.delete(relations_to_delete) + for rel_data in relation_updates.values(): + edge_data = rel_data["data"] + normalized_src = rel_data["norm_src"] + normalized_tgt = rel_data["norm_tgt"] + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + # Use normalized order for content and relation ID + content = f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" + relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-") + + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": normalized_src, + "tgt_id": normalized_tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + await relationships_vdb.upsert(relation_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`") + + # 8. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") + + # 9. Merge entity chunk tracking (source entities first, then target entity) + if entity_chunks_storage is not None: + all_chunk_id_lists = [] + + # Build list of entities to process (source entities first, then target entity) + entities_to_process = [] + + # Add source entities first (excluding target if it's already in source list) + for entity_name in source_entities: + if entity_name != target_entity: + entities_to_process.append(entity_name) + + # Add target entity last (if it exists) + if target_exists: + entities_to_process.append(target_entity) + + # Process all entities in order with unified logic + for entity_name in entities_to_process: + stored = await entity_chunks_storage.get_by_id(entity_name) + if stored and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + if chunk_ids: + all_chunk_id_lists.append(chunk_ids) + + # Merge chunk_ids with ordered deduplication (preserves order, source entities first) + merged_chunk_ids = [] + seen = set() + for chunk_id_list in all_chunk_id_lists: + for chunk_id in chunk_id_list: + if chunk_id not in seen: + seen.add(chunk_id) + merged_chunk_ids.append(chunk_id) + + # Delete source entities' chunk tracking records + entity_keys_to_delete = [e for e in source_entities if e != target_entity] + if entity_keys_to_delete: + await entity_chunks_storage.delete(entity_keys_to_delete) + + # Update target entity's chunk tracking + if merged_chunk_ids: + await entity_chunks_storage.upsert( + { + target_entity: { + "chunk_ids": merged_chunk_ids, + "count": len(merged_chunk_ids), + } + } + ) + logger.info( + f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" + ) + + # 10. Delete source entities + for entity_name in source_entities: + if entity_name == target_entity: + logger.warning( + f"Entity Merge: source entity'{entity_name}' is same as target entity" + ) + continue + + logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") + + # Delete entity node and related edges from knowledge graph + await chunk_entity_relation_graph.delete_node(entity_name) + + # Delete entity record from vector database + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([entity_id]) + + # 11. Save changes + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + logger.info( + f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" + ) + return await get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + target_entity, + include_vector_data=True, + ) + + async def amerge_entities( chunk_entity_relation_graph, entities_vdb, @@ -1092,362 +1464,23 @@ async def amerge_entities( all_entities.add(target_entity) lock_keys = sorted(all_entities) - # Use keyed lock for all entities to ensure atomic graph and vector db operations workspace = entities_vdb.global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( lock_keys, namespace=namespace, enable_logging=False ): try: - # Default merge strategy for entities - default_entity_merge_strategy = { - "description": "concatenate", - "entity_type": "keep_first", - "source_id": "join_unique", - "file_path": "join_unique", - } - effective_entity_merge_strategy = default_entity_merge_strategy - if merge_strategy: - logger.warning( - "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." - ) - effective_entity_merge_strategy = { - **default_entity_merge_strategy, - **merge_strategy, - } - target_entity_data = ( - {} if target_entity_data is None else target_entity_data - ) - - # 1. Check if all source entities exist - source_entities_data = {} - for entity_name in source_entities: - node_exists = await chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Source entity '{entity_name}' does not exist") - node_data = await chunk_entity_relation_graph.get_node(entity_name) - source_entities_data[entity_name] = node_data - - # 2. Check if target entity exists and get its data if it does - target_exists = await chunk_entity_relation_graph.has_node(target_entity) - existing_target_entity_data = {} - if target_exists: - existing_target_entity_data = ( - await chunk_entity_relation_graph.get_node(target_entity) - ) - logger.info( - "Entity Merge: target entity already exists, source and target entities will be merged" - ) - - # 3. Merge entity data - merged_entity_data = _merge_attributes( - list(source_entities_data.values()) - + ([existing_target_entity_data] if target_exists else []), - effective_entity_merge_strategy, - filter_none_only=False, # Use entity behavior: filter falsy values - ) - - # Apply any explicitly provided target entity data (overrides merged data) - for key, value in target_entity_data.items(): - merged_entity_data[key] = value - - # 4. Get all relationships of the source entities and target entity (if exists) - all_relations = [] - entities_to_collect = source_entities.copy() - - # If target entity exists and not already in source_entities, add it - if target_exists and target_entity not in source_entities: - entities_to_collect.append(target_entity) - - for entity_name in entities_to_collect: - # Get all relationships of the entities - edges = await chunk_entity_relation_graph.get_node_edges(entity_name) - if edges: - for src, tgt in edges: - # Ensure src is the current entity - if src == entity_name: - edge_data = await chunk_entity_relation_graph.get_edge( - src, tgt - ) - all_relations.append((src, tgt, edge_data)) - - # 5. Create or update the target entity - merged_entity_data["entity_id"] = target_entity - if not target_exists: - await chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Entity Merge: created target '{target_entity}'") - else: - await chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Entity Merge: Updated target '{target_entity}'") - - # 6. Recreate all relations pointing to the target entity in KG - # Also collect chunk tracking information in the same loop - relation_updates = {} # Track relationships that need to be merged - relations_to_delete = [] - - # Initialize chunk tracking variables - relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids - old_relation_keys_to_delete = [] - - for src, tgt, edge_data in all_relations: - relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) - relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - - # Collect old chunk tracking key for deletion - if relation_chunks_storage is not None: - from .utils import make_relation_chunk_key - - old_storage_key = make_relation_chunk_key(src, tgt) - old_relation_keys_to_delete.append(old_storage_key) - - new_src = target_entity if src in source_entities else src - new_tgt = target_entity if tgt in source_entities else tgt - - # Skip relationships between source entities to avoid self-loops - if new_src == new_tgt: - logger.info( - f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" - ) - continue - - # Normalize entity order for consistent duplicate detection (undirected relationships) - normalized_src, normalized_tgt = sorted([new_src, new_tgt]) - relation_key = f"{normalized_src}|{normalized_tgt}" - - # Process chunk tracking for this relation - if relation_chunks_storage is not None: - storage_key = make_relation_chunk_key( - normalized_src, normalized_tgt - ) - - # Get chunk_ids from storage for this original relation - stored = await relation_chunks_storage.get_by_id(old_storage_key) - - if stored is not None and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - else: - # Fallback to source_id from graph - source_id = edge_data.get("source_id", "") - chunk_ids = [ - cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid - ] - - # Accumulate chunk_ids with ordered deduplication - if storage_key not in relation_chunk_tracking: - relation_chunk_tracking[storage_key] = [] - - existing_chunks = set(relation_chunk_tracking[storage_key]) - for chunk_id in chunk_ids: - if chunk_id not in existing_chunks: - existing_chunks.add(chunk_id) - relation_chunk_tracking[storage_key].append(chunk_id) - - if relation_key in relation_updates: - # Merge relationship data - existing_data = relation_updates[relation_key]["data"] - merged_relation = _merge_attributes( - [existing_data, edge_data], - { - "description": "concatenate", - "keywords": "join_unique_comma", - "source_id": "join_unique", - "file_path": "join_unique", - "weight": "max", - }, - filter_none_only=True, # Use relation behavior: only filter None - ) - relation_updates[relation_key]["data"] = merged_relation - logger.info( - f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" - ) - else: - relation_updates[relation_key] = { - "graph_src": new_src, - "graph_tgt": new_tgt, - "norm_src": normalized_src, - "norm_tgt": normalized_tgt, - "data": edge_data.copy(), - } - - # Apply relationship updates - for rel_data in relation_updates.values(): - await chunk_entity_relation_graph.upsert_edge( - rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] - ) - logger.info( - f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" - ) - - # Update relation chunk tracking storage - if relation_chunks_storage is not None and all_relations: - if old_relation_keys_to_delete: - await relation_chunks_storage.delete(old_relation_keys_to_delete) - - if relation_chunk_tracking: - updates = {} - for storage_key, chunk_ids in relation_chunk_tracking.items(): - updates[storage_key] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } - - await relation_chunks_storage.upsert(updates) - logger.info( - f"Entity Merge: merged chunk tracking for {len(updates)} relations" - ) - - # 7. Update relationship vector representations - logger.info( - f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" - ) - await relationships_vdb.delete(relations_to_delete) - for rel_data in relation_updates.values(): - edge_data = rel_data["data"] - normalized_src = rel_data["norm_src"] - normalized_tgt = rel_data["norm_tgt"] - - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - # Use normalized order for content and relation ID - content = ( - f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" - ) - relation_id = compute_mdhash_id( - normalized_src + normalized_tgt, prefix="rel-" - ) - - relation_data_for_vdb = { - relation_id: { - "content": content, - "src_id": normalized_src, - "tgt_id": normalized_tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - await relationships_vdb.upsert(relation_data_for_vdb) - logger.info( - f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" - ) - - # 8. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") - - # 9. Merge entity chunk tracking (source entities first, then target entity) - if entity_chunks_storage is not None: - all_chunk_id_lists = [] - - # Build list of entities to process (source entities first, then target entity) - entities_to_process = [] - - # Add source entities first (excluding target if it's already in source list) - for entity_name in source_entities: - if entity_name != target_entity: - entities_to_process.append(entity_name) - - # Add target entity last (if it exists) - if target_exists: - entities_to_process.append(target_entity) - - # Process all entities in order with unified logic - for entity_name in entities_to_process: - stored = await entity_chunks_storage.get_by_id(entity_name) - if stored and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - if chunk_ids: - all_chunk_id_lists.append(chunk_ids) - - # Merge chunk_ids with ordered deduplication (preserves order, source entities first) - merged_chunk_ids = [] - seen = set() - for chunk_id_list in all_chunk_id_lists: - for chunk_id in chunk_id_list: - if chunk_id not in seen: - seen.add(chunk_id) - merged_chunk_ids.append(chunk_id) - - # Delete source entities' chunk tracking records - entity_keys_to_delete = [ - e for e in source_entities if e != target_entity - ] - if entity_keys_to_delete: - await entity_chunks_storage.delete(entity_keys_to_delete) - - # Update target entity's chunk tracking - if merged_chunk_ids: - await entity_chunks_storage.upsert( - { - target_entity: { - "chunk_ids": merged_chunk_ids, - "count": len(merged_chunk_ids), - } - } - ) - logger.info( - f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" - ) - - # 10. Delete source entities - for entity_name in source_entities: - if entity_name == target_entity: - logger.warning( - f"Entity Merge: source entity'{entity_name}' is same as target entity" - ) - continue - - logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") - - # Delete entity node and related edges from knowledge graph - await chunk_entity_relation_graph.delete_node(entity_name) - - # Delete entity record from vector database - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await entities_vdb.delete([entity_id]) - - # 11. Save changes - await _persist_graph_updates( - entities_vdb=entities_vdb, - relationships_vdb=relationships_vdb, - chunk_entity_relation_graph=chunk_entity_relation_graph, + return await _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entities, + target_entity, + merge_strategy=merge_strategy, + target_entity_data=target_entity_data, entity_chunks_storage=entity_chunks_storage, relation_chunks_storage=relation_chunks_storage, ) - - logger.info( - f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" - ) - return await get_entity_info( - chunk_entity_relation_graph, - entities_vdb, - target_entity, - include_vector_data=True, - ) - except Exception as e: logger.error(f"Error merging entities: {e}") raise