From 38559373b3ec7f8beda007bddaabec6bf579861b Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 26 Oct 2025 23:13:50 +0800 Subject: [PATCH 01/13] Fix entity merging to include target entity relationships * Include target entity in collection * Merge all relevant relationships * Prevent relationship loss * Fix merge completeness --- lightrag/utils_graph.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 6b06cf3c..6e3a52f8 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1113,10 +1113,16 @@ async def amerge_entities( for key, value in target_entity_data.items(): merged_entity_data[key] = value - # 4. Get all relationships of the source entities + # 4. Get all relationships of the source entities and target entity (if exists) all_relations = [] - for entity_name in source_entities: - # Get all relationships of the source entities + entities_to_collect = source_entities.copy() + + # If target entity exists, also collect its relationships for merging + if target_exists: + entities_to_collect.append(target_entity) + + for entity_name in entities_to_collect: + # Get all relationships of the entities edges = await chunk_entity_relation_graph.get_node_edges(entity_name) if edges: for src, tgt in edges: From ab32456a799c79fbeef401d2b353ecf7891a11d0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 00:04:17 +0800 Subject: [PATCH 02/13] Refactor entity merging with unified attribute merge function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Update GRAPH_FIELD_SEP comment clarity • Deprecate merge_strategy parameter • Unify entity/relation merge logic • Add join_unique_comma strategy --- lightrag/constants.py | 2 +- lightrag/utils_graph.py | 137 +++++++++++++++++----------------------- 2 files changed, 58 insertions(+), 81 deletions(-) diff --git a/lightrag/constants.py b/lightrag/constants.py index c040e0ac..eedecd65 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [ "NaturalObject", ] -# Separator for graph fields +# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted) GRAPH_FIELD_SEP = "" # Query and retrieval configuration defaults diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 6e3a52f8..da7da5a9 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1050,12 +1050,8 @@ async def amerge_entities( relationships_vdb: Vector database storage for relationships source_entities: List of source entity names to merge target_entity: Name of the target entity after merging - merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} - Supported strategies: - - "concatenate": Concatenate all values (for text fields) - - "keep_first": Keep the first non-empty value - - "keep_last": Keep the last non-empty value - - "join_unique": Join all unique values (for fields separated by delimiter) + merge_strategy: Deprecated (Each field uses its own default strategy). If provided, + customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} @@ -1066,18 +1062,23 @@ async def amerge_entities( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: - # Default merge strategy - default_strategy = { + # Default merge strategy for entities + default_entity_merge_strategy = { "description": "concatenate", "entity_type": "keep_first", "source_id": "join_unique", + "file_path": "join_unique", } - - merge_strategy = ( - default_strategy - if merge_strategy is None - else {**default_strategy, **merge_strategy} - ) + effective_entity_merge_strategy = default_entity_merge_strategy + if merge_strategy: + logger.warning( + "merge_strategy parameter is deprecated and will be ignored in a future " + "release. Provided overrides will be applied for now." + ) + effective_entity_merge_strategy = { + **default_entity_merge_strategy, + **merge_strategy, + } target_entity_data = ( {} if target_entity_data is None else target_entity_data ) @@ -1103,10 +1104,11 @@ async def amerge_entities( ) # 3. Merge entity data - merged_entity_data = _merge_entity_attributes( + merged_entity_data = _merge_attributes( list(source_entities_data.values()) + ([existing_target_entity_data] if target_exists else []), - merge_strategy, + effective_entity_merge_strategy, + filter_none_only=False, # Use entity behavior: filter falsy values ) # Apply any explicitly provided target entity data (overrides merged data) @@ -1168,14 +1170,16 @@ async def amerge_entities( if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] - merged_relation = _merge_relation_attributes( + merged_relation = _merge_attributes( [existing_data, edge_data], { "description": "concatenate", - "keywords": "join_unique", + "keywords": "join_unique_comma", "source_id": "join_unique", + "file_path": "join_unique", "weight": "max", }, + filter_none_only=True, # Use relation behavior: only filter None ) relation_updates[relation_key]["data"] = merged_relation logger.info( @@ -1299,81 +1303,45 @@ async def amerge_entities( raise -def _merge_entity_attributes( - entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] +def _merge_attributes( + data_list: list[dict[str, Any]], + merge_strategy: dict[str, str], + filter_none_only: bool = False, ) -> dict[str, Any]: - """Merge attributes from multiple entities. + """Merge attributes from multiple entities or relationships. + + This unified function handles merging of both entity and relationship attributes, + applying different merge strategies per field. Args: - entity_data_list: List of dictionaries containing entity data - merge_strategy: Merge strategy for each field + data_list: List of dictionaries containing entity or relationship data + merge_strategy: Merge strategy for each field. Supported strategies: + - "concatenate": Join all values with GRAPH_FIELD_SEP + - "keep_first": Keep the first non-empty value + - "keep_last": Keep the last non-empty value + - "join_unique": Join unique items separated by GRAPH_FIELD_SEP + - "join_unique_comma": Join unique items separated by comma and space + - "max": Keep the maximum numeric value (for numeric fields) + filter_none_only: If True, only filter None values (keep empty strings, 0, etc.). + If False, filter all falsy values. Default is False for backward compatibility. Returns: - Dictionary containing merged entity data + Dictionary containing merged data """ merged_data = {} # Collect all possible keys all_keys = set() - for data in entity_data_list: + for data in data_list: all_keys.update(data.keys()) # Merge values for each key for key in all_keys: - # Get all values for this key - values = [data.get(key) for data in entity_data_list if data.get(key)] - - if not values: - continue - - # Merge values according to strategy - strategy = merge_strategy.get(key, "keep_first") - - if strategy == "concatenate": - merged_data[key] = "\n\n".join(values) - elif strategy == "keep_first": - merged_data[key] = values[0] - elif strategy == "keep_last": - merged_data[key] = values[-1] - elif strategy == "join_unique": - # Handle fields separated by GRAPH_FIELD_SEP - unique_items = set() - for value in values: - items = value.split(GRAPH_FIELD_SEP) - unique_items.update(items) - merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + # Get all values for this key based on filtering mode + if filter_none_only: + values = [data.get(key) for data in data_list if data.get(key) is not None] else: - # Default strategy - merged_data[key] = values[0] - - return merged_data - - -def _merge_relation_attributes( - relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] -) -> dict[str, Any]: - """Merge attributes from multiple relationships. - - Args: - relation_data_list: List of dictionaries containing relationship data - merge_strategy: Merge strategy for each field - - Returns: - Dictionary containing merged relationship data - """ - merged_data = {} - - # Collect all possible keys - all_keys = set() - for data in relation_data_list: - all_keys.update(data.keys()) - - # Merge values for each key - for key in all_keys: - # Get all values for this key - values = [ - data.get(key) for data in relation_data_list if data.get(key) is not None - ] + values = [data.get(key) for data in data_list if data.get(key)] if not values: continue @@ -1382,7 +1350,8 @@ def _merge_relation_attributes( strategy = merge_strategy.get(key, "keep_first") if strategy == "concatenate": - merged_data[key] = "\n\n".join(str(v) for v in values) + # Convert all values to strings and join with GRAPH_FIELD_SEP + merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values) elif strategy == "keep_first": merged_data[key] = values[0] elif strategy == "keep_last": @@ -1394,14 +1363,22 @@ def _merge_relation_attributes( items = str(value).split(GRAPH_FIELD_SEP) unique_items.update(items) merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + elif strategy == "join_unique_comma": + # Handle fields separated by comma, join unique items with comma + unique_items = set() + for value in values: + items = str(value).split(",") + unique_items.update(item.strip() for item in items if item.strip()) + merged_data[key] = ",".join(sorted(unique_items)) elif strategy == "max": # For numeric fields like weight try: merged_data[key] = max(float(v) for v in values) except (ValueError, TypeError): + # Fallback to first value if conversion fails merged_data[key] = values[0] else: - # Default strategy + # Default strategy: keep first value merged_data[key] = values[0] return merged_data From a25003c3369f3f81553fb78b36c96b99197b2500 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 00:52:56 +0800 Subject: [PATCH 03/13] Fix relation deduplication logic and standardize log message prefixes --- lightrag/utils_graph.py | 72 ++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index da7da5a9..58876af5 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1072,8 +1072,7 @@ async def amerge_entities( effective_entity_merge_strategy = default_entity_merge_strategy if merge_strategy: logger.warning( - "merge_strategy parameter is deprecated and will be ignored in a future " - "release. Provided overrides will be applied for now." + "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." ) effective_entity_merge_strategy = { **default_entity_merge_strategy, @@ -1100,7 +1099,7 @@ async def amerge_entities( await chunk_entity_relation_graph.get_node(target_entity) ) logger.info( - f"Target entity '{target_entity}' already exists, will merge data" + "Entity Merge: target entity already exists, source and target entities will be merged" ) # 3. Merge entity data @@ -1118,11 +1117,11 @@ async def amerge_entities( # 4. Get all relationships of the source entities and target entity (if exists) all_relations = [] entities_to_collect = source_entities.copy() - + # If target entity exists, also collect its relationships for merging if target_exists: entities_to_collect.append(target_entity) - + for entity_name in entities_to_collect: # Get all relationships of the entities edges = await chunk_entity_relation_graph.get_node_edges(entity_name) @@ -1141,14 +1140,14 @@ async def amerge_entities( await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Created new target entity '{target_entity}'") + logger.info(f"Entity Merge: created target '{target_entity}'") else: await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Updated existing target entity '{target_entity}'") + logger.info(f"Entity Merge: Updated target '{target_entity}'") - # 6. Recreate all relationships, pointing to the target entity + # 6. Recreate all relations pointing to the target entity in KG relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] @@ -1161,12 +1160,14 @@ async def amerge_entities( # Skip relationships between source entities to avoid self-loops if new_src == new_tgt: logger.info( - f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" + f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" ) continue - # Check if the same relationship already exists - relation_key = f"{new_src}|{new_tgt}" + # Normalize entity order for consistent duplicate detection (undirected relationships) + normalized_src, normalized_tgt = sorted([new_src, new_tgt]) + relation_key = f"{normalized_src}|{normalized_tgt}" + if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1183,28 +1184,24 @@ async def amerge_entities( ) relation_updates[relation_key]["data"] = merged_relation logger.info( - f"Merged duplicate relationship: {new_src} -> {new_tgt}" + f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" ) else: relation_updates[relation_key] = { - "src": new_src, - "tgt": new_tgt, + "graph_src": new_src, + "graph_tgt": new_tgt, + "norm_src": normalized_src, + "norm_tgt": normalized_tgt, "data": edge_data.copy(), } # Apply relationship updates for rel_data in relation_updates.values(): await chunk_entity_relation_graph.upsert_edge( - rel_data["src"], rel_data["tgt"], rel_data["data"] + rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] ) logger.info( - f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" - ) - - # Delete relationships records from vector database - await relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity from vector database" + f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" ) # 7. Update entity vector representation @@ -1223,17 +1220,18 @@ async def amerge_entities( "entity_type": entity_type, } } - await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") # 8. Update relationship vector representations + logger.info( + f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" + ) + await relationships_vdb.delete(relations_to_delete) for rel_data in relation_updates.values(): - src = rel_data["src"] - tgt = rel_data["tgt"] edge_data = rel_data["data"] - - # Normalize entity order for consistent vector storage - normalized_src, normalized_tgt = sorted([src, tgt]) + normalized_src = rel_data["norm_src"] + normalized_tgt = rel_data["norm_tgt"] description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -1259,28 +1257,28 @@ async def amerge_entities( "weight": weight, } } - await relationships_vdb.upsert(relation_data_for_vdb) + logger.info( + f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" + ) # 9. Delete source entities for entity_name in source_entities: if entity_name == target_entity: - logger.info( - f"Skipping deletion of '{entity_name}' as it's also the target entity" + logger.warning( + f"Entity Merge: source entity'{entity_name}' is same as target entity" ) continue - # Delete entity node from knowledge graph + logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") + + # Delete entity node and related edges from knowledge graph await chunk_entity_relation_graph.delete_node(entity_name) # Delete entity record from vector database entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - logger.info( - f"Deleted source entity '{entity_name}' and its vector embedding from database" - ) - # 10. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, @@ -1289,7 +1287,7 @@ async def amerge_entities( ) logger.info( - f"Successfully merged {len(source_entities)} entities into '{target_entity}'" + f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" ) return await get_entity_info( chunk_entity_relation_graph, From 2c09adb8d3250585610adc585f57ad10ab189ce1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 02:06:21 +0800 Subject: [PATCH 04/13] Add chunk tracking support to entity merge functionality - Pass chunk storages to merge function - Merge relation chunk tracking data - Merge entity chunk tracking data - Delete old chunk tracking records - Persist chunk storage updates --- lightrag/lightrag.py | 2 + lightrag/utils_graph.py | 164 ++++++++++++++++++++++++++++++++++------ 2 files changed, 144 insertions(+), 22 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ced5f40e..bdc94b2c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3750,6 +3750,8 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, + self.entity_chunks, + self.relation_chunks, ) def merge_entities( diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 58876af5..5f7d5ac3 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1038,11 +1038,14 @@ async def amerge_entities( target_entity: str, merge_strategy: dict[str, str] = None, target_entity_data: dict[str, Any] = None, + entity_chunks_storage=None, + relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously merge multiple entities into one entity. Merges multiple source entities into a target entity, handling all relationships, and updating both the knowledge graph and vector database. + Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage. Args: chunk_entity_relation_graph: Graph storage instance @@ -1054,6 +1057,8 @@ async def amerge_entities( customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} + entity_chunks_storage: Optional KV storage for tracking chunks that reference entities + relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing the merged entity information @@ -1118,8 +1123,8 @@ async def amerge_entities( all_relations = [] entities_to_collect = source_entities.copy() - # If target entity exists, also collect its relationships for merging - if target_exists: + # If target entity exists and not already in source_entities, add it + if target_exists and target_entity not in source_entities: entities_to_collect.append(target_entity) for entity_name in entities_to_collect: @@ -1148,12 +1153,25 @@ async def amerge_entities( logger.info(f"Entity Merge: Updated target '{target_entity}'") # 6. Recreate all relations pointing to the target entity in KG + # Also collect chunk tracking information in the same loop relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] + # Initialize chunk tracking variables + relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids + old_relation_keys_to_delete = [] + for src, tgt, edge_data in all_relations: relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + + # Collect old chunk tracking key for deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + old_storage_key = make_relation_chunk_key(src, tgt) + old_relation_keys_to_delete.append(old_storage_key) + new_src = target_entity if src in source_entities else src new_tgt = target_entity if tgt in source_entities else tgt @@ -1168,6 +1186,34 @@ async def amerge_entities( normalized_src, normalized_tgt = sorted([new_src, new_tgt]) relation_key = f"{normalized_src}|{normalized_tgt}" + # Process chunk tracking for this relation + if relation_chunks_storage is not None: + storage_key = make_relation_chunk_key( + normalized_src, normalized_tgt + ) + + # Get chunk_ids from storage for this original relation + stored = await relation_chunks_storage.get_by_id(old_storage_key) + + if stored is not None and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + else: + # Fallback to source_id from graph + source_id = edge_data.get("source_id", "") + chunk_ids = [ + cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid + ] + + # Accumulate chunk_ids with ordered deduplication + if storage_key not in relation_chunk_tracking: + relation_chunk_tracking[storage_key] = [] + + existing_chunks = set(relation_chunk_tracking[storage_key]) + for chunk_id in chunk_ids: + if chunk_id not in existing_chunks: + existing_chunks.add(chunk_id) + relation_chunk_tracking[storage_key].append(chunk_id) + if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1204,26 +1250,25 @@ async def amerge_entities( f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" ) - # 7. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description + # Update relation chunk tracking storage + if relation_chunks_storage is not None and all_relations: + if old_relation_keys_to_delete: + await relation_chunks_storage.delete(old_relation_keys_to_delete) - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") + if relation_chunk_tracking: + updates = {} + for storage_key, chunk_ids in relation_chunk_tracking.items(): + updates[storage_key] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } - # 8. Update relationship vector representations + await relation_chunks_storage.upsert(updates) + logger.info( + f"Entity Merge: merged chunk tracking for {len(updates)} relations" + ) + + # 7. Update relationship vector representations logger.info( f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" ) @@ -1262,7 +1307,80 @@ async def amerge_entities( f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" ) - # 9. Delete source entities + # 8. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") + + # 9. Merge entity chunk tracking (source entities first, then target entity) + if entity_chunks_storage is not None: + all_chunk_id_lists = [] + + # Build list of entities to process (source entities first, then target entity) + entities_to_process = [] + + # Add source entities first (excluding target if it's already in source list) + for entity_name in source_entities: + if entity_name != target_entity: + entities_to_process.append(entity_name) + + # Add target entity last (if it exists) + if target_exists: + entities_to_process.append(target_entity) + + # Process all entities in order with unified logic + for entity_name in entities_to_process: + stored = await entity_chunks_storage.get_by_id(entity_name) + if stored and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + if chunk_ids: + all_chunk_id_lists.append(chunk_ids) + + # Merge chunk_ids with ordered deduplication (preserves order, source entities first) + merged_chunk_ids = [] + seen = set() + for chunk_id_list in all_chunk_id_lists: + for chunk_id in chunk_id_list: + if chunk_id not in seen: + seen.add(chunk_id) + merged_chunk_ids.append(chunk_id) + + # Delete source entities' chunk tracking records + entity_keys_to_delete = [ + e for e in source_entities if e != target_entity + ] + if entity_keys_to_delete: + await entity_chunks_storage.delete(entity_keys_to_delete) + + # Update target entity's chunk tracking + if merged_chunk_ids: + await entity_chunks_storage.upsert( + { + target_entity: { + "chunk_ids": merged_chunk_ids, + "count": len(merged_chunk_ids), + } + } + ) + logger.info( + f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" + ) + + # 10. Delete source entities for entity_name in source_entities: if entity_name == target_entity: logger.warning( @@ -1279,11 +1397,13 @@ async def amerge_entities( entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - # 10. Save changes + # 11. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, relationships_vdb=relationships_vdb, chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) logger.info( From 8dfd3bf4285c34cf578888e40ad11f6bdc0f37f8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 02:55:58 +0800 Subject: [PATCH 05/13] Replace global graph DB lock with fine-grained keyed locking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Use entity/relation-specific locks • Lock multiple entities when needed --- lightrag/utils_graph.py | 100 ++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 34 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 5f7d5ac3..d782059d 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -5,7 +5,7 @@ import asyncio from typing import Any, cast from .base import DeletionResult -from .kg.shared_storage import get_graph_db_lock +from .kg.shared_storage import get_storage_keyed_lock from .constants import GRAPH_FIELD_SEP from .utils import compute_mdhash_id, logger from .base import StorageNameSpace @@ -74,9 +74,12 @@ async def adelete_by_entity( entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + [entity_name], namespace=namespace, enable_logging=False + ): try: # Check if the entity exists if not await chunk_entity_relation_graph.has_node(entity_name): @@ -167,14 +170,18 @@ async def adelete_by_relation( relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): + try: # Check if the relation exists edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -267,9 +274,19 @@ async def aedit_entity( Returns: Dictionary containing updated entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Determine entities to lock + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + # Lock both original and new entity names if renaming + lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name] + + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + lock_keys, namespace=namespace, enable_logging=False + ): try: # Save original entity name for chunk tracking updates original_entity_name = entity_name @@ -280,10 +297,6 @@ async def aedit_entity( raise ValueError(f"Entity '{entity_name}' does not exist") node_data = await chunk_entity_relation_graph.get_node(entity_name) - # Check if entity is being renamed - new_entity_name = updated_data.get("entity_name", entity_name) - is_renaming = new_entity_name != entity_name - # If renaming, check if new name already exists if is_renaming: if not allow_rename: @@ -619,14 +632,18 @@ async def aedit_relation( Returns: Dictionary containing updated relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): + try: # 1. Get current relation information edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -799,9 +816,12 @@ async def acreate_entity( Returns: Dictionary containing created entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for entity to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + [entity_name], namespace=namespace, enable_logging=False + ): try: # Check if entity already exists existing_node = await chunk_entity_relation_graph.has_node(entity_name) @@ -910,9 +930,13 @@ async def acreate_relation( Returns: Dictionary containing created relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Use keyed lock for relation to ensure atomic graph and vector db operations + workspace = relationships_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + sorted_edge_key = sorted([source_entity, target_entity]) + async with get_storage_keyed_lock( + sorted_edge_key, namespace=namespace, enable_logging=False + ): try: # Check if both entities exist source_exists = await chunk_entity_relation_graph.has_node(source_entity) @@ -1063,9 +1087,17 @@ async def amerge_entities( Returns: Dictionary containing the merged entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: + # Collect all entities involved (source + target) and lock them all in sorted order + all_entities = set(source_entities) + all_entities.add(target_entity) + lock_keys = sorted(all_entities) + + # Use keyed lock for all entities to ensure atomic graph and vector db operations + workspace = entities_vdb.global_config.get("workspace", "") + namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + async with get_storage_keyed_lock( + lock_keys, namespace=namespace, enable_logging=False + ): try: # Default merge strategy for entities default_entity_merge_strategy = { From 25f829ef4854c35fa9652e5a80478eca490ec75d Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 12:27:57 +0800 Subject: [PATCH 06/13] Enable editing of entity_type field in node properties --- lightrag_webui/src/components/graph/PropertiesView.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lightrag_webui/src/components/graph/PropertiesView.tsx b/lightrag_webui/src/components/graph/PropertiesView.tsx index 97411f29..463b49da 100644 --- a/lightrag_webui/src/components/graph/PropertiesView.tsx +++ b/lightrag_webui/src/components/graph/PropertiesView.tsx @@ -225,8 +225,8 @@ const PropertyRow = ({ formattedTooltip += `\n(Truncated: ${truncate})` } - // Use EditablePropertyRow for editable fields (description, entity_id and keywords) - if (isEditable && (name === 'description' || name === 'entity_id' || name === 'keywords')) { + // Use EditablePropertyRow for editable fields (description, entity_id and entity_type) + if (isEditable && (name === 'description' || name === 'entity_id' || name === 'entity_type' || name === 'keywords')) { return ( { nodeId={String(node.id)} entityId={node.properties['entity_id']} entityType="node" - isEditable={name === 'description' || name === 'entity_id'} + isEditable={name === 'description' || name === 'entity_id' || name === 'entity_type'} truncate={node.properties['truncate']} /> ) From 94f24a66f2415c945e5182689ff653cbd3a9871a Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 12:28:46 +0800 Subject: [PATCH 07/13] Bump API version to 0246 --- lightrag/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index de364382..b5db555e 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0245" +__api_version__ = "0246" From 411e92e6b9f2e572705f4bcd40240813321b5e5a Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 14:22:16 +0800 Subject: [PATCH 08/13] Fix vector deletion logging to show actual deleted count --- lightrag/kg/nano_vector_db_impl.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index e598e34c..1185241c 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -184,9 +184,17 @@ class NanoVectorDBStorage(BaseVectorStorage): """ try: client = await self._get_client() + # Record count before deletion + before_count = len(client) + client.delete(ids) + + # Calculate actual deleted count + after_count = len(client) + deleted_count = before_count - after_count + logger.debug( - f"[{self.workspace}] Successfully deleted {len(ids)} vectors from {self.namespace}" + f"[{self.workspace}] Successfully deleted {deleted_count} vectors from {self.namespace}" ) except Exception as e: logger.error( From 11a1631d76f811dc67709d89b25226e5245f8fc5 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 14:23:51 +0800 Subject: [PATCH 09/13] Refactor entity edit and merge functions to support merge-on-rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Extract internal implementation helpers • Add allow_merge parameter to aedit_entity • Support merging when renaming to existing name • Improve code reusability and modularity • Maintain backward compatibility --- lightrag/utils_graph.py | 1333 ++++++++++++++++++++------------------- 1 file changed, 683 insertions(+), 650 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index d782059d..c18c17c0 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -246,6 +246,272 @@ async def adelete_by_relation( ) +async def _edit_entity_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + updated_data: dict[str, str], + *, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> dict[str, Any]: + """Internal helper that edits an entity without acquiring storage locks. + + This function performs the actual entity edit operations without lock management. + It should only be called by public APIs that have already acquired necessary locks. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + entity_name: Name of the entity to edit + updated_data: Dictionary containing updated attributes (including optional entity_name for renaming) + entity_chunks_storage: Optional KV storage for tracking chunks + relation_chunks_storage: Optional KV storage for tracking relation chunks + + Returns: + Dictionary containing updated entity information + + Note: + Caller must acquire appropriate locks before calling this function. + If renaming (entity_name in updated_data), this function will check if the new name exists. + """ + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + original_entity_name = entity_name + + node_exists = await chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + + if is_renaming: + existing_node = await chunk_entity_relation_graph.has_node(new_entity_name) + if existing_node: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" + ) + + new_node_data = {**node_data, **updated_data} + new_node_data["entity_id"] = new_entity_name + + if "entity_name" in new_node_data: + del new_node_data[ + "entity_name" + ] # Node data should not contain entity_name field + + if is_renaming: + logger.info(f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`") + + await chunk_entity_relation_graph.upsert_node(new_entity_name, new_node_data) + + relations_to_update = [] + relations_to_delete = [] + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + if edges: + for source, target in edges: + edge_data = await chunk_entity_relation_graph.get_edge(source, target) + if edge_data: + relations_to_delete.append( + compute_mdhash_id(source + target, prefix="rel-") + ) + relations_to_delete.append( + compute_mdhash_id(target + source, prefix="rel-") + ) + if source == entity_name: + await chunk_entity_relation_graph.upsert_edge( + new_entity_name, target, edge_data + ) + relations_to_update.append((new_entity_name, target, edge_data)) + else: # target == entity_name + await chunk_entity_relation_graph.upsert_edge( + source, new_entity_name, edge_data + ) + relations_to_update.append((source, new_entity_name, edge_data)) + + await chunk_entity_relation_graph.delete_node(entity_name) + + old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([old_entity_id]) + + await relationships_vdb.delete(relations_to_delete) + + for src, tgt, edge_data in relations_to_update: + normalized_src, normalized_tgt = sorted([src, tgt]) + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + content = f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" + + relation_id = compute_mdhash_id( + normalized_src + normalized_tgt, prefix="rel-" + ) + + relation_data = { + relation_id: { + "content": content, + "src_id": normalized_src, + "tgt_id": normalized_tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + await relationships_vdb.upsert(relation_data) + + entity_name = new_entity_name + else: + await chunk_entity_relation_graph.upsert_node(entity_name, new_node_data) + + description = new_node_data.get("description", "") + source_id = new_node_data.get("source_id", "") + entity_type = new_node_data.get("entity_type", "") + content = entity_name + "\n" + description + + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + entity_data = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + await entities_vdb.upsert(entity_data) + + if entity_chunks_storage is not None or relation_chunks_storage is not None: + from .utils import make_relation_chunk_key, compute_incremental_chunk_ids + + if entity_chunks_storage is not None: + storage_key = original_entity_name if is_renaming else entity_name + stored_data = await entity_chunks_storage.get_by_id(storage_key) + has_stored_data = ( + stored_data + and isinstance(stored_data, dict) + and stored_data.get("chunk_ids") + ) + + old_source_id = node_data.get("source_id", "") + old_chunk_ids = [cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid] + + new_source_id = new_node_data.get("source_id", "") + new_chunk_ids = [cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid] + + source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) + + if source_id_changed or not has_stored_data: + existing_full_chunk_ids = [] + if has_stored_data: + existing_full_chunk_ids = [ + cid for cid in stored_data.get("chunk_ids", []) if cid + ] + + if not existing_full_chunk_ids: + existing_full_chunk_ids = old_chunk_ids.copy() + + updated_chunk_ids = compute_incremental_chunk_ids( + existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + ) + + if is_renaming: + await entity_chunks_storage.delete([original_entity_name]) + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + else: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": updated_chunk_ids, + "count": len(updated_chunk_ids), + } + } + ) + + logger.info( + f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" + ) + + if is_renaming and relation_chunks_storage is not None and relations_to_update: + for src, tgt, edge_data in relations_to_update: + old_src = original_entity_name if src == entity_name else src + old_tgt = original_entity_name if tgt == entity_name else tgt + + old_normalized_src, old_normalized_tgt = sorted([old_src, old_tgt]) + new_normalized_src, new_normalized_tgt = sorted([src, tgt]) + + old_storage_key = make_relation_chunk_key( + old_normalized_src, old_normalized_tgt + ) + new_storage_key = make_relation_chunk_key( + new_normalized_src, new_normalized_tgt + ) + + if old_storage_key != new_storage_key: + old_stored_data = await relation_chunks_storage.get_by_id( + old_storage_key + ) + relation_chunk_ids = [] + + if old_stored_data and isinstance(old_stored_data, dict): + relation_chunk_ids = [ + cid for cid in old_stored_data.get("chunk_ids", []) if cid + ] + else: + relation_source_id = edge_data.get("source_id", "") + relation_chunk_ids = [ + cid + for cid in relation_source_id.split(GRAPH_FIELD_SEP) + if cid + ] + + await relation_chunks_storage.delete([old_storage_key]) + + if relation_chunk_ids: + await relation_chunks_storage.upsert( + { + new_storage_key: { + "chunk_ids": relation_chunk_ids, + "count": len(relation_chunk_ids), + } + } + ) + logger.info( + f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + ) + + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + logger.info(f"Entity Edit: `{entity_name}` successfully updated") + return await get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + entity_name, + include_vector_data=True, + ) + + async def aedit_entity( chunk_entity_relation_graph, entities_vdb, @@ -253,6 +519,7 @@ async def aedit_entity( entity_name: str, updated_data: dict[str, str], allow_rename: bool = True, + allow_merge: bool = False, entity_chunks_storage=None, relation_chunks_storage=None, ) -> dict[str, Any]: @@ -268,338 +535,82 @@ async def aedit_entity( entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + allow_merge: Whether to merge into an existing entity when renaming to an existing name, defaults to False entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing updated entity information """ - # Determine entities to lock new_entity_name = updated_data.get("entity_name", entity_name) is_renaming = new_entity_name != entity_name - # Lock both original and new entity names if renaming - lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name] + lock_keys = sorted({entity_name, new_entity_name}) if is_renaming else [entity_name] - # Use keyed lock for entity to ensure atomic graph and vector db operations workspace = entities_vdb.global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( lock_keys, namespace=namespace, enable_logging=False ): try: - # Save original entity name for chunk tracking updates - original_entity_name = entity_name + if is_renaming and not allow_rename: + raise ValueError( + "Entity renaming is not allowed. Set allow_rename=True to enable this feature" + ) - # 1. Get current entity information - node_exists = await chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Entity '{entity_name}' does not exist") - node_data = await chunk_entity_relation_graph.get_node(entity_name) - - # If renaming, check if new name already exists if is_renaming: - if not allow_rename: - raise ValueError( - "Entity renaming is not allowed. Set allow_rename=True to enable this feature" - ) - - existing_node = await chunk_entity_relation_graph.has_node( + target_exists = await chunk_entity_relation_graph.has_node( new_entity_name ) - if existing_node: - raise ValueError( - f"Entity name '{new_entity_name}' already exists, cannot rename" - ) - - # 2. Update entity information in the graph - new_node_data = {**node_data, **updated_data} - new_node_data["entity_id"] = new_entity_name - - if "entity_name" in new_node_data: - del new_node_data[ - "entity_name" - ] # Node data should not contain entity_name field - - # If renaming entity - if is_renaming: - logger.info( - f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`" - ) - - # Create new entity - await chunk_entity_relation_graph.upsert_node( - new_entity_name, new_node_data - ) - - # Store relationships that need to be updated - relations_to_update = [] - relations_to_delete = [] - # Get all edges related to the original entity - edges = await chunk_entity_relation_graph.get_node_edges(entity_name) - if edges: - # Recreate edges for the new entity - for source, target in edges: - edge_data = await chunk_entity_relation_graph.get_edge( - source, target - ) - if edge_data: - relations_to_delete.append( - compute_mdhash_id(source + target, prefix="rel-") - ) - relations_to_delete.append( - compute_mdhash_id(target + source, prefix="rel-") - ) - if source == entity_name: - await chunk_entity_relation_graph.upsert_edge( - new_entity_name, target, edge_data - ) - relations_to_update.append( - (new_entity_name, target, edge_data) - ) - else: # target == entity_name - await chunk_entity_relation_graph.upsert_edge( - source, new_entity_name, edge_data - ) - relations_to_update.append( - (source, new_entity_name, edge_data) - ) - - # Delete old entity - await chunk_entity_relation_graph.delete_node(entity_name) - - # Delete old entity record from vector database - old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await entities_vdb.delete([old_entity_id]) - - # Delete old relation records from vector database - await relationships_vdb.delete(relations_to_delete) - - # Update relationship vector representations - for src, tgt, edge_data in relations_to_update: - # Normalize entity order for consistent vector ID generation - normalized_src, normalized_tgt = sorted([src, tgt]) - - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - # Create content using normalized order - content = ( - f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}" - ) - - # Calculate relationship ID using normalized order - relation_id = compute_mdhash_id( - normalized_src + normalized_tgt, prefix="rel-" - ) - - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": normalized_src, - "tgt_id": normalized_tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - # Update vector database - await relationships_vdb.upsert(relation_data) - - # Update working entity name to new name - entity_name = new_entity_name - - else: - # If not renaming, directly update node data - await chunk_entity_relation_graph.upsert_node( - entity_name, new_node_data - ) - - # 3. Recalculate entity's vector representation and update vector database - description = new_node_data.get("description", "") - source_id = new_node_data.get("source_id", "") - entity_type = new_node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - - # Update vector database - await entities_vdb.upsert(entity_data) - - # 4. Update chunk tracking storages - if entity_chunks_storage is not None or relation_chunks_storage is not None: - from .utils import ( - make_relation_chunk_key, - compute_incremental_chunk_ids, - ) - - # 4.1 Handle entity chunk tracking - if entity_chunks_storage is not None: - # Get storage key (use original name for renaming scenario) - storage_key = original_entity_name if is_renaming else entity_name - stored_data = await entity_chunks_storage.get_by_id(storage_key) - has_stored_data = ( - stored_data - and isinstance(stored_data, dict) - and stored_data.get("chunk_ids") - ) - - # Get old and new source_id - old_source_id = node_data.get("source_id", "") - old_chunk_ids = [ - cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid - ] - - new_source_id = new_node_data.get("source_id", "") - new_chunk_ids = [ - cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid - ] - - source_id_changed = set(new_chunk_ids) != set(old_chunk_ids) - - # Update if: source_id changed OR storage has no data - if source_id_changed or not has_stored_data: - # Get existing full chunk_ids from storage - existing_full_chunk_ids = [] - if has_stored_data: - existing_full_chunk_ids = [ - cid for cid in stored_data.get("chunk_ids", []) if cid - ] - - # If no stored data exists, use old source_id as baseline - if not existing_full_chunk_ids: - existing_full_chunk_ids = old_chunk_ids.copy() - - # Use utility function to compute incremental updates - updated_chunk_ids = compute_incremental_chunk_ids( - existing_full_chunk_ids, old_chunk_ids, new_chunk_ids + if target_exists: + if not allow_merge: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" ) - # Update storage (even if updated_chunk_ids is empty) - if is_renaming: - # Renaming: delete old + create new - await entity_chunks_storage.delete([original_entity_name]) - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": updated_chunk_ids, - "count": len(updated_chunk_ids), - } - } - ) - else: - # Non-renaming: direct update - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": updated_chunk_ids, - "count": len(updated_chunk_ids), - } - } - ) - - logger.info( - f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`" - ) - - # 4.2 Handle relation chunk tracking if entity was renamed - if ( - is_renaming - and relation_chunks_storage is not None - and relations_to_update - ): - for src, tgt, edge_data in relations_to_update: - # Determine old entity pair (before rename) - old_src = original_entity_name if src == entity_name else src - old_tgt = original_entity_name if tgt == entity_name else tgt - - # Normalize entity order for both old and new keys - old_normalized_src, old_normalized_tgt = sorted( - [old_src, old_tgt] - ) - new_normalized_src, new_normalized_tgt = sorted([src, tgt]) - - # Generate storage keys - old_storage_key = make_relation_chunk_key( - old_normalized_src, old_normalized_tgt - ) - new_storage_key = make_relation_chunk_key( - new_normalized_src, new_normalized_tgt - ) - - # If keys are different, we need to move the chunk tracking - if old_storage_key != new_storage_key: - # Get complete chunk IDs from storage first (preserves all existing references) - old_stored_data = await relation_chunks_storage.get_by_id( - old_storage_key - ) - relation_chunk_ids = [] - - if old_stored_data and isinstance(old_stored_data, dict): - # Use complete chunk_ids from storage - relation_chunk_ids = [ - cid - for cid in old_stored_data.get("chunk_ids", []) - if cid - ] - else: - # Fallback: if storage has no data, use graph's source_id - relation_source_id = edge_data.get("source_id", "") - relation_chunk_ids = [ - cid - for cid in relation_source_id.split(GRAPH_FIELD_SEP) - if cid - ] - - # Delete old relation chunk tracking - await relation_chunks_storage.delete([old_storage_key]) - - # Create new relation chunk tracking (migrate complete data) - if relation_chunk_ids: - await relation_chunks_storage.upsert( - { - new_storage_key: { - "chunk_ids": relation_chunk_ids, - "count": len(relation_chunk_ids), - } - } - ) logger.info( - f"Entity Edit: migrate {len(relations_to_update)} relations after rename" + f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`" ) - # 5. Save changes - await _persist_graph_updates( - entities_vdb=entities_vdb, - relationships_vdb=relationships_vdb, - chunk_entity_relation_graph=chunk_entity_relation_graph, - entity_chunks_storage=entity_chunks_storage, - relation_chunks_storage=relation_chunks_storage, - ) + non_name_updates = { + key: value + for key, value in updated_data.items() + if key != "entity_name" + } + if non_name_updates: + logger.info( + "Entity Edit: applying non-name updates before merge" + ) + await _edit_entity_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name, + non_name_updates, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) - logger.info(f"Entity Edit: `{entity_name}` successfully updated") - return await get_entity_info( + return await _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + [entity_name], + new_entity_name, + merge_strategy=None, + target_entity_data=None, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + return await _edit_entity_impl( chunk_entity_relation_graph, entities_vdb, + relationships_vdb, entity_name, - include_vector_data=True, + updated_data, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, ) except Exception as e: logger.error(f"Error while editing entity '{entity_name}': {e}") @@ -1054,6 +1065,367 @@ async def acreate_relation( raise +async def _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entities: list[str], + target_entity: str, + *, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, + entity_chunks_storage=None, + relation_chunks_storage=None, +) -> dict[str, Any]: + """Internal helper that merges entities without acquiring storage locks. + + This function performs the actual entity merge operations without lock management. + It should only be called by public APIs that have already acquired necessary locks. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + source_entities: List of source entity names to merge + target_entity: Name of the target entity after merging + merge_strategy: Deprecated. Merge strategy for each field (optional) + target_entity_data: Dictionary of specific values to set for target entity (optional) + entity_chunks_storage: Optional KV storage for tracking chunks + relation_chunks_storage: Optional KV storage for tracking relation chunks + + Returns: + Dictionary containing the merged entity information + + Note: + Caller must acquire appropriate locks before calling this function. + All source entities and the target entity should be locked together. + """ + # Default merge strategy for entities + default_entity_merge_strategy = { + "description": "concatenate", + "entity_type": "keep_first", + "source_id": "join_unique", + "file_path": "join_unique", + } + effective_entity_merge_strategy = default_entity_merge_strategy + if merge_strategy: + logger.warning( + "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." + ) + effective_entity_merge_strategy = { + **default_entity_merge_strategy, + **merge_strategy, + } + target_entity_data = {} if target_entity_data is None else target_entity_data + + # 1. Check if all source entities exist + source_entities_data = {} + for entity_name in source_entities: + node_exists = await chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Source entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + source_entities_data[entity_name] = node_data + + # 2. Check if target entity exists and get its data if it does + target_exists = await chunk_entity_relation_graph.has_node(target_entity) + existing_target_entity_data = {} + if target_exists: + existing_target_entity_data = await chunk_entity_relation_graph.get_node( + target_entity + ) + logger.info( + "Entity Merge: target entity already exists, source and target entities will be merged" + ) + + # 3. Merge entity data + merged_entity_data = _merge_attributes( + list(source_entities_data.values()) + + ([existing_target_entity_data] if target_exists else []), + effective_entity_merge_strategy, + filter_none_only=False, # Use entity behavior: filter falsy values + ) + + # Apply any explicitly provided target entity data (overrides merged data) + for key, value in target_entity_data.items(): + merged_entity_data[key] = value + + # 4. Get all relationships of the source entities and target entity (if exists) + all_relations = [] + entities_to_collect = source_entities.copy() + + # If target entity exists and not already in source_entities, add it + if target_exists and target_entity not in source_entities: + entities_to_collect.append(target_entity) + + for entity_name in entities_to_collect: + # Get all relationships of the entities + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + if edges: + for src, tgt in edges: + # Ensure src is the current entity + if src == entity_name: + edge_data = await chunk_entity_relation_graph.get_edge(src, tgt) + all_relations.append((src, tgt, edge_data)) + + # 5. Create or update the target entity + merged_entity_data["entity_id"] = target_entity + if not target_exists: + await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) + logger.info(f"Entity Merge: created target '{target_entity}'") + else: + await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data) + logger.info(f"Entity Merge: Updated target '{target_entity}'") + + # 6. Recreate all relations pointing to the target entity in KG + # Also collect chunk tracking information in the same loop + relation_updates = {} # Track relationships that need to be merged + relations_to_delete = [] + + # Initialize chunk tracking variables + relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids + old_relation_keys_to_delete = [] + + for src, tgt, edge_data in all_relations: + relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) + relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + + # Collect old chunk tracking key for deletion + if relation_chunks_storage is not None: + from .utils import make_relation_chunk_key + + old_storage_key = make_relation_chunk_key(src, tgt) + old_relation_keys_to_delete.append(old_storage_key) + + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip relationships between source entities to avoid self-loops + if new_src == new_tgt: + logger.info(f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop") + continue + + # Normalize entity order for consistent duplicate detection (undirected relationships) + normalized_src, normalized_tgt = sorted([new_src, new_tgt]) + relation_key = f"{normalized_src}|{normalized_tgt}" + + # Process chunk tracking for this relation + if relation_chunks_storage is not None: + storage_key = make_relation_chunk_key(normalized_src, normalized_tgt) + + # Get chunk_ids from storage for this original relation + stored = await relation_chunks_storage.get_by_id(old_storage_key) + + if stored is not None and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + else: + # Fallback to source_id from graph + source_id = edge_data.get("source_id", "") + chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid] + + # Accumulate chunk_ids with ordered deduplication + if storage_key not in relation_chunk_tracking: + relation_chunk_tracking[storage_key] = [] + + existing_chunks = set(relation_chunk_tracking[storage_key]) + for chunk_id in chunk_ids: + if chunk_id not in existing_chunks: + existing_chunks.add(chunk_id) + relation_chunk_tracking[storage_key].append(chunk_id) + + if relation_key in relation_updates: + # Merge relationship data + existing_data = relation_updates[relation_key]["data"] + merged_relation = _merge_attributes( + [existing_data, edge_data], + { + "description": "concatenate", + "keywords": "join_unique_comma", + "source_id": "join_unique", + "file_path": "join_unique", + "weight": "max", + }, + filter_none_only=True, # Use relation behavior: only filter None + ) + relation_updates[relation_key]["data"] = merged_relation + logger.info( + f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" + ) + else: + relation_updates[relation_key] = { + "graph_src": new_src, + "graph_tgt": new_tgt, + "norm_src": normalized_src, + "norm_tgt": normalized_tgt, + "data": edge_data.copy(), + } + + # Apply relationship updates + for rel_data in relation_updates.values(): + await chunk_entity_relation_graph.upsert_edge( + rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] + ) + logger.info( + f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" + ) + + # Update relation chunk tracking storage + if relation_chunks_storage is not None and all_relations: + if old_relation_keys_to_delete: + await relation_chunks_storage.delete(old_relation_keys_to_delete) + + if relation_chunk_tracking: + updates = {} + for storage_key, chunk_ids in relation_chunk_tracking.items(): + updates[storage_key] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + + await relation_chunks_storage.upsert(updates) + logger.info( + f"Entity Merge: merged chunk tracking for {len(updates)} relations" + ) + + # 7. Update relationship vector representations + logger.info(f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb") + await relationships_vdb.delete(relations_to_delete) + for rel_data in relation_updates.values(): + edge_data = rel_data["data"] + normalized_src = rel_data["norm_src"] + normalized_tgt = rel_data["norm_tgt"] + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + # Use normalized order for content and relation ID + content = f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" + relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-") + + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": normalized_src, + "tgt_id": normalized_tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + await relationships_vdb.upsert(relation_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`") + + # 8. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + await entities_vdb.upsert(entity_data_for_vdb) + logger.info(f"Entity Merge: updating vdb `{target_entity}`") + + # 9. Merge entity chunk tracking (source entities first, then target entity) + if entity_chunks_storage is not None: + all_chunk_id_lists = [] + + # Build list of entities to process (source entities first, then target entity) + entities_to_process = [] + + # Add source entities first (excluding target if it's already in source list) + for entity_name in source_entities: + if entity_name != target_entity: + entities_to_process.append(entity_name) + + # Add target entity last (if it exists) + if target_exists: + entities_to_process.append(target_entity) + + # Process all entities in order with unified logic + for entity_name in entities_to_process: + stored = await entity_chunks_storage.get_by_id(entity_name) + if stored and isinstance(stored, dict): + chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] + if chunk_ids: + all_chunk_id_lists.append(chunk_ids) + + # Merge chunk_ids with ordered deduplication (preserves order, source entities first) + merged_chunk_ids = [] + seen = set() + for chunk_id_list in all_chunk_id_lists: + for chunk_id in chunk_id_list: + if chunk_id not in seen: + seen.add(chunk_id) + merged_chunk_ids.append(chunk_id) + + # Delete source entities' chunk tracking records + entity_keys_to_delete = [e for e in source_entities if e != target_entity] + if entity_keys_to_delete: + await entity_chunks_storage.delete(entity_keys_to_delete) + + # Update target entity's chunk tracking + if merged_chunk_ids: + await entity_chunks_storage.upsert( + { + target_entity: { + "chunk_ids": merged_chunk_ids, + "count": len(merged_chunk_ids), + } + } + ) + logger.info( + f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" + ) + + # 10. Delete source entities + for entity_name in source_entities: + if entity_name == target_entity: + logger.warning( + f"Entity Merge: source entity'{entity_name}' is same as target entity" + ) + continue + + logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") + + # Delete entity node and related edges from knowledge graph + await chunk_entity_relation_graph.delete_node(entity_name) + + # Delete entity record from vector database + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([entity_id]) + + # 11. Save changes + await _persist_graph_updates( + entities_vdb=entities_vdb, + relationships_vdb=relationships_vdb, + chunk_entity_relation_graph=chunk_entity_relation_graph, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + + logger.info( + f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" + ) + return await get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + target_entity, + include_vector_data=True, + ) + + async def amerge_entities( chunk_entity_relation_graph, entities_vdb, @@ -1092,362 +1464,23 @@ async def amerge_entities( all_entities.add(target_entity) lock_keys = sorted(all_entities) - # Use keyed lock for all entities to ensure atomic graph and vector db operations workspace = entities_vdb.global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( lock_keys, namespace=namespace, enable_logging=False ): try: - # Default merge strategy for entities - default_entity_merge_strategy = { - "description": "concatenate", - "entity_type": "keep_first", - "source_id": "join_unique", - "file_path": "join_unique", - } - effective_entity_merge_strategy = default_entity_merge_strategy - if merge_strategy: - logger.warning( - "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." - ) - effective_entity_merge_strategy = { - **default_entity_merge_strategy, - **merge_strategy, - } - target_entity_data = ( - {} if target_entity_data is None else target_entity_data - ) - - # 1. Check if all source entities exist - source_entities_data = {} - for entity_name in source_entities: - node_exists = await chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Source entity '{entity_name}' does not exist") - node_data = await chunk_entity_relation_graph.get_node(entity_name) - source_entities_data[entity_name] = node_data - - # 2. Check if target entity exists and get its data if it does - target_exists = await chunk_entity_relation_graph.has_node(target_entity) - existing_target_entity_data = {} - if target_exists: - existing_target_entity_data = ( - await chunk_entity_relation_graph.get_node(target_entity) - ) - logger.info( - "Entity Merge: target entity already exists, source and target entities will be merged" - ) - - # 3. Merge entity data - merged_entity_data = _merge_attributes( - list(source_entities_data.values()) - + ([existing_target_entity_data] if target_exists else []), - effective_entity_merge_strategy, - filter_none_only=False, # Use entity behavior: filter falsy values - ) - - # Apply any explicitly provided target entity data (overrides merged data) - for key, value in target_entity_data.items(): - merged_entity_data[key] = value - - # 4. Get all relationships of the source entities and target entity (if exists) - all_relations = [] - entities_to_collect = source_entities.copy() - - # If target entity exists and not already in source_entities, add it - if target_exists and target_entity not in source_entities: - entities_to_collect.append(target_entity) - - for entity_name in entities_to_collect: - # Get all relationships of the entities - edges = await chunk_entity_relation_graph.get_node_edges(entity_name) - if edges: - for src, tgt in edges: - # Ensure src is the current entity - if src == entity_name: - edge_data = await chunk_entity_relation_graph.get_edge( - src, tgt - ) - all_relations.append((src, tgt, edge_data)) - - # 5. Create or update the target entity - merged_entity_data["entity_id"] = target_entity - if not target_exists: - await chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Entity Merge: created target '{target_entity}'") - else: - await chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Entity Merge: Updated target '{target_entity}'") - - # 6. Recreate all relations pointing to the target entity in KG - # Also collect chunk tracking information in the same loop - relation_updates = {} # Track relationships that need to be merged - relations_to_delete = [] - - # Initialize chunk tracking variables - relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids - old_relation_keys_to_delete = [] - - for src, tgt, edge_data in all_relations: - relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) - relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - - # Collect old chunk tracking key for deletion - if relation_chunks_storage is not None: - from .utils import make_relation_chunk_key - - old_storage_key = make_relation_chunk_key(src, tgt) - old_relation_keys_to_delete.append(old_storage_key) - - new_src = target_entity if src in source_entities else src - new_tgt = target_entity if tgt in source_entities else tgt - - # Skip relationships between source entities to avoid self-loops - if new_src == new_tgt: - logger.info( - f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" - ) - continue - - # Normalize entity order for consistent duplicate detection (undirected relationships) - normalized_src, normalized_tgt = sorted([new_src, new_tgt]) - relation_key = f"{normalized_src}|{normalized_tgt}" - - # Process chunk tracking for this relation - if relation_chunks_storage is not None: - storage_key = make_relation_chunk_key( - normalized_src, normalized_tgt - ) - - # Get chunk_ids from storage for this original relation - stored = await relation_chunks_storage.get_by_id(old_storage_key) - - if stored is not None and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - else: - # Fallback to source_id from graph - source_id = edge_data.get("source_id", "") - chunk_ids = [ - cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid - ] - - # Accumulate chunk_ids with ordered deduplication - if storage_key not in relation_chunk_tracking: - relation_chunk_tracking[storage_key] = [] - - existing_chunks = set(relation_chunk_tracking[storage_key]) - for chunk_id in chunk_ids: - if chunk_id not in existing_chunks: - existing_chunks.add(chunk_id) - relation_chunk_tracking[storage_key].append(chunk_id) - - if relation_key in relation_updates: - # Merge relationship data - existing_data = relation_updates[relation_key]["data"] - merged_relation = _merge_attributes( - [existing_data, edge_data], - { - "description": "concatenate", - "keywords": "join_unique_comma", - "source_id": "join_unique", - "file_path": "join_unique", - "weight": "max", - }, - filter_none_only=True, # Use relation behavior: only filter None - ) - relation_updates[relation_key]["data"] = merged_relation - logger.info( - f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" - ) - else: - relation_updates[relation_key] = { - "graph_src": new_src, - "graph_tgt": new_tgt, - "norm_src": normalized_src, - "norm_tgt": normalized_tgt, - "data": edge_data.copy(), - } - - # Apply relationship updates - for rel_data in relation_updates.values(): - await chunk_entity_relation_graph.upsert_edge( - rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] - ) - logger.info( - f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" - ) - - # Update relation chunk tracking storage - if relation_chunks_storage is not None and all_relations: - if old_relation_keys_to_delete: - await relation_chunks_storage.delete(old_relation_keys_to_delete) - - if relation_chunk_tracking: - updates = {} - for storage_key, chunk_ids in relation_chunk_tracking.items(): - updates[storage_key] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } - - await relation_chunks_storage.upsert(updates) - logger.info( - f"Entity Merge: merged chunk tracking for {len(updates)} relations" - ) - - # 7. Update relationship vector representations - logger.info( - f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" - ) - await relationships_vdb.delete(relations_to_delete) - for rel_data in relation_updates.values(): - edge_data = rel_data["data"] - normalized_src = rel_data["norm_src"] - normalized_tgt = rel_data["norm_tgt"] - - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - # Use normalized order for content and relation ID - content = ( - f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}" - ) - relation_id = compute_mdhash_id( - normalized_src + normalized_tgt, prefix="rel-" - ) - - relation_data_for_vdb = { - relation_id: { - "content": content, - "src_id": normalized_src, - "tgt_id": normalized_tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - await relationships_vdb.upsert(relation_data_for_vdb) - logger.info( - f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" - ) - - # 8. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") - - # 9. Merge entity chunk tracking (source entities first, then target entity) - if entity_chunks_storage is not None: - all_chunk_id_lists = [] - - # Build list of entities to process (source entities first, then target entity) - entities_to_process = [] - - # Add source entities first (excluding target if it's already in source list) - for entity_name in source_entities: - if entity_name != target_entity: - entities_to_process.append(entity_name) - - # Add target entity last (if it exists) - if target_exists: - entities_to_process.append(target_entity) - - # Process all entities in order with unified logic - for entity_name in entities_to_process: - stored = await entity_chunks_storage.get_by_id(entity_name) - if stored and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - if chunk_ids: - all_chunk_id_lists.append(chunk_ids) - - # Merge chunk_ids with ordered deduplication (preserves order, source entities first) - merged_chunk_ids = [] - seen = set() - for chunk_id_list in all_chunk_id_lists: - for chunk_id in chunk_id_list: - if chunk_id not in seen: - seen.add(chunk_id) - merged_chunk_ids.append(chunk_id) - - # Delete source entities' chunk tracking records - entity_keys_to_delete = [ - e for e in source_entities if e != target_entity - ] - if entity_keys_to_delete: - await entity_chunks_storage.delete(entity_keys_to_delete) - - # Update target entity's chunk tracking - if merged_chunk_ids: - await entity_chunks_storage.upsert( - { - target_entity: { - "chunk_ids": merged_chunk_ids, - "count": len(merged_chunk_ids), - } - } - ) - logger.info( - f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" - ) - - # 10. Delete source entities - for entity_name in source_entities: - if entity_name == target_entity: - logger.warning( - f"Entity Merge: source entity'{entity_name}' is same as target entity" - ) - continue - - logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") - - # Delete entity node and related edges from knowledge graph - await chunk_entity_relation_graph.delete_node(entity_name) - - # Delete entity record from vector database - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await entities_vdb.delete([entity_id]) - - # 11. Save changes - await _persist_graph_updates( - entities_vdb=entities_vdb, - relationships_vdb=relationships_vdb, - chunk_entity_relation_graph=chunk_entity_relation_graph, + return await _merge_entities_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entities, + target_entity, + merge_strategy=merge_strategy, + target_entity_data=target_entity_data, entity_chunks_storage=entity_chunks_storage, relation_chunks_storage=relation_chunks_storage, ) - - logger.info( - f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" - ) - return await get_entity_info( - chunk_entity_relation_graph, - entities_vdb, - target_entity, - include_vector_data=True, - ) - except Exception as e: logger.error(f"Error merging entities: {e}") raise From 97034f06e39f509b7f19f6e2b0edeb42dbaf0a58 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 14:30:27 +0800 Subject: [PATCH 10/13] Add allow_merge parameter to entity update API endpoint --- lightrag/api/routers/graph_routes.py | 40 ++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index f59a7c3d..db75b231 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -17,6 +17,7 @@ class EntityUpdateRequest(BaseModel): entity_name: str updated_data: Dict[str, Any] allow_rename: bool = False + allow_merge: bool = False class RelationUpdateRequest(BaseModel): @@ -221,17 +222,52 @@ def create_graph_routes(rag, api_key: Optional[str] = None): """ Update an entity's properties in the knowledge graph + This endpoint allows updating entity properties, including renaming entities. + When renaming to an existing entity name, the behavior depends on allow_merge: + Args: - request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag + request (EntityUpdateRequest): Request containing: + - entity_name (str): Name of the entity to update + - updated_data (Dict[str, Any]): Dictionary of properties to update + - allow_rename (bool): Whether to allow entity renaming (default: False) + - allow_merge (bool): Whether to merge into existing entity when renaming + causes name conflict (default: False) Returns: - Dict: Updated entity information + Dict: Updated entity information with status + + Behavior when renaming to an existing entity: + - If allow_merge=False: Raises ValueError with 400 status (default behavior) + - If allow_merge=True: Automatically merges the source entity into the existing target entity, + preserving all relationships and applying non-name updates first + + Example Request (simple update): + POST /graph/entity/edit + { + "entity_name": "Tesla", + "updated_data": {"description": "Updated description"}, + "allow_rename": false, + "allow_merge": false + } + + Example Request (rename with auto-merge): + POST /graph/entity/edit + { + "entity_name": "Elon Msk", + "updated_data": { + "entity_name": "Elon Musk", + "description": "Corrected description" + }, + "allow_rename": true, + "allow_merge": true + } """ try: result = await rag.aedit_entity( entity_name=request.entity_name, updated_data=request.updated_data, allow_rename=request.allow_rename, + allow_merge=request.allow_merge, ) return { "status": "success", From 5155edd8d2a3d0c4d59bc250b11156f81d0d26f3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 23:42:08 +0800 Subject: [PATCH 11/13] feat: Improve entity merge and edit UX - **API:** The `graph/entity/edit` endpoint now returns a detailed `operation_summary` for better client-side handling of update, rename, and merge outcomes. - **Web UI:** Added an "auto-merge on rename" option. The UI now gracefully handles merge success, partial failures (update OK, merge fail), and other errors with specific user feedback. --- lightrag/api/routers/graph_routes.py | 127 ++++++++++++- lightrag/lightrag.py | 16 +- lightrag/utils_graph.py | 132 +++++++++++-- lightrag_webui/src/api/lightrag.ts | 24 ++- .../components/graph/EditablePropertyRow.tsx | 176 ++++++++++++++++-- .../components/graph/PropertyEditDialog.tsx | 50 +++-- lightrag_webui/src/locales/ar.json | 17 +- lightrag_webui/src/locales/en.json | 17 +- lightrag_webui/src/locales/fr.json | 17 +- lightrag_webui/src/locales/zh.json | 17 +- lightrag_webui/src/locales/zh_TW.json | 17 +- 11 files changed, 538 insertions(+), 72 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index db75b231..e892ff01 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -234,7 +234,49 @@ def create_graph_routes(rag, api_key: Optional[str] = None): causes name conflict (default: False) Returns: - Dict: Updated entity information with status + Dict with the following structure: + { + "status": "success", + "message": "Entity updated successfully" | "Entity merged successfully into 'target_name'", + "data": { + "entity_name": str, # Final entity name + "description": str, # Entity description + "entity_type": str, # Entity type + "source_id": str, # Source chunk IDs + ... # Other entity properties + }, + "operation_summary": { + "merged": bool, # Whether entity was merged into another + "merge_status": str, # "success" | "failed" | "not_attempted" + "merge_error": str | None, # Error message if merge failed + "operation_status": str, # "success" | "partial_success" | "failure" + "target_entity": str | None, # Target entity name if renaming/merging + "final_entity": str, # Final entity name after operation + "renamed": bool # Whether entity was renamed + } + } + + operation_status values explained: + - "success": All operations completed successfully + * For simple updates: entity properties updated + * For renames: entity renamed successfully + * For merges: non-name updates applied AND merge completed + + - "partial_success": Update succeeded but merge failed + * Non-name property updates were applied successfully + * Merge operation failed (entity not merged) + * Original entity still exists with updated properties + * Use merge_error for failure details + + - "failure": Operation failed completely + * If merge_status == "failed": Merge attempted but both update and merge failed + * If merge_status == "not_attempted": Regular update failed + * No changes were applied to the entity + + merge_status values explained: + - "success": Entity successfully merged into target entity + - "failed": Merge operation was attempted but failed + - "not_attempted": No merge was attempted (normal update/rename) Behavior when renaming to an existing entity: - If allow_merge=False: Raises ValueError with 400 status (default behavior) @@ -250,6 +292,22 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "allow_merge": false } + Example Response (simple update success): + { + "status": "success", + "message": "Entity updated successfully", + "data": { ... }, + "operation_summary": { + "merged": false, + "merge_status": "not_attempted", + "merge_error": null, + "operation_status": "success", + "target_entity": null, + "final_entity": "Tesla", + "renamed": false + } + } + Example Request (rename with auto-merge): POST /graph/entity/edit { @@ -261,6 +319,38 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "allow_rename": true, "allow_merge": true } + + Example Response (merge success): + { + "status": "success", + "message": "Entity merged successfully into 'Elon Musk'", + "data": { ... }, + "operation_summary": { + "merged": true, + "merge_status": "success", + "merge_error": null, + "operation_status": "success", + "target_entity": "Elon Musk", + "final_entity": "Elon Musk", + "renamed": true + } + } + + Example Response (partial success - update succeeded but merge failed): + { + "status": "success", + "message": "Entity updated successfully", + "data": { ... }, # Data reflects updated "Elon Msk" entity + "operation_summary": { + "merged": false, + "merge_status": "failed", + "merge_error": "Target entity locked by another operation", + "operation_status": "partial_success", + "target_entity": "Elon Musk", + "final_entity": "Elon Msk", # Original entity still exists + "renamed": true + } + } """ try: result = await rag.aedit_entity( @@ -269,10 +359,41 @@ def create_graph_routes(rag, api_key: Optional[str] = None): allow_rename=request.allow_rename, allow_merge=request.allow_merge, ) + + # Extract operation_summary from result, with fallback for backward compatibility + operation_summary = result.get( + "operation_summary", + { + "merged": False, + "merge_status": "not_attempted", + "merge_error": None, + "operation_status": "success", + "target_entity": None, + "final_entity": request.updated_data.get( + "entity_name", request.entity_name + ), + "renamed": request.updated_data.get( + "entity_name", request.entity_name + ) + != request.entity_name, + }, + ) + + # Separate entity data from operation_summary for clean response + entity_data = dict(result) + entity_data.pop("operation_summary", None) + + # Generate appropriate response message based on merge status + response_message = ( + f"Entity merged successfully into '{operation_summary['final_entity']}'" + if operation_summary.get("merged") + else "Entity updated successfully" + ) return { "status": "success", - "message": "Entity updated successfully", - "data": result, + "message": response_message, + "data": entity_data, + "operation_summary": operation_summary, } except ValueError as ve: logger.error( diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index bdc94b2c..45f7afd5 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3577,7 +3577,11 @@ class LightRAG: ) async def aedit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: """Asynchronously edit entity information. @@ -3588,6 +3592,7 @@ class LightRAG: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3601,16 +3606,21 @@ class LightRAG: entity_name, updated_data, allow_rename, + allow_merge, self.entity_chunks, self.relation_chunks, ) def edit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename) + self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) ) async def aedit_relation( diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index c18c17c0..0a4dae92 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -540,7 +540,33 @@ async def aedit_entity( relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: - Dictionary containing updated entity information + Dictionary containing updated entity information and operation summary with the following structure: + { + "entity_name": str, # Name of the entity + "description": str, # Entity description + "entity_type": str, # Entity type + "source_id": str, # Source chunk IDs + ... # Other entity properties + "operation_summary": { + "merged": bool, # Whether entity was merged + "merge_status": str, # "success" | "failed" | "not_attempted" + "merge_error": str | None, # Error message if merge failed + "operation_status": str, # "success" | "partial_success" | "failure" + "target_entity": str | None, # Target entity name if renaming/merging + "final_entity": str, # Final entity name after operation + "renamed": bool # Whether entity was renamed + } + } + + operation_status values: + - "success": Operation completed successfully (update/rename/merge all succeeded) + - "partial_success": Non-name updates succeeded but merge failed + - "failure": Operation failed completely + + merge_status values: + - "success": Entity successfully merged into target + - "failed": Merge operation failed + - "not_attempted": No merge was attempted (normal update/rename) """ new_entity_name = updated_data.get("entity_name", entity_name) is_renaming = new_entity_name != entity_name @@ -549,6 +575,16 @@ async def aedit_entity( workspace = entities_vdb.global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" + + operation_summary: dict[str, Any] = { + "merged": False, + "merge_status": "not_attempted", + "merge_error": None, + "operation_status": "success", + "target_entity": None, + "final_entity": new_entity_name if is_renaming else entity_name, + "renamed": is_renaming, + } async with get_storage_keyed_lock( lock_keys, namespace=namespace, enable_logging=False ): @@ -572,38 +608,93 @@ async def aedit_entity( f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`" ) + # Track whether non-name updates were applied + non_name_updates_applied = False non_name_updates = { key: value for key, value in updated_data.items() if key != "entity_name" } + + # Apply non-name updates first if non_name_updates: - logger.info( - "Entity Edit: applying non-name updates before merge" - ) - await _edit_entity_impl( + try: + logger.info( + "Entity Edit: applying non-name updates before merge" + ) + await _edit_entity_impl( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name, + non_name_updates, + entity_chunks_storage=entity_chunks_storage, + relation_chunks_storage=relation_chunks_storage, + ) + non_name_updates_applied = True + except Exception as update_error: + # If update fails, re-raise immediately + logger.error( + f"Entity Edit: non-name updates failed: {update_error}" + ) + raise + + # Attempt to merge entities + try: + merge_result = await _merge_entities_impl( chunk_entity_relation_graph, entities_vdb, relationships_vdb, - entity_name, - non_name_updates, + [entity_name], + new_entity_name, + merge_strategy=None, + target_entity_data=None, entity_chunks_storage=entity_chunks_storage, relation_chunks_storage=relation_chunks_storage, ) - return await _merge_entities_impl( - chunk_entity_relation_graph, - entities_vdb, - relationships_vdb, - [entity_name], - new_entity_name, - merge_strategy=None, - target_entity_data=None, - entity_chunks_storage=entity_chunks_storage, - relation_chunks_storage=relation_chunks_storage, - ) + # Merge succeeded + operation_summary.update( + { + "merged": True, + "merge_status": "success", + "merge_error": None, + "operation_status": "success", + "target_entity": new_entity_name, + "final_entity": new_entity_name, + } + ) + return {**merge_result, "operation_summary": operation_summary} - return await _edit_entity_impl( + except Exception as merge_error: + # Merge failed, but update may have succeeded + logger.error(f"Entity Edit: merge failed: {merge_error}") + + # Return partial success status (update succeeded but merge failed) + operation_summary.update( + { + "merged": False, + "merge_status": "failed", + "merge_error": str(merge_error), + "operation_status": "partial_success" + if non_name_updates_applied + else "failure", + "target_entity": new_entity_name, + "final_entity": entity_name, # Keep source entity name + } + ) + + # Get current entity info (with applied updates if any) + entity_info = await get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + entity_name, + include_vector_data=True, + ) + return {**entity_info, "operation_summary": operation_summary} + + # Normal edit flow (no merge involved) + edit_result = await _edit_entity_impl( chunk_entity_relation_graph, entities_vdb, relationships_vdb, @@ -612,6 +703,9 @@ async def aedit_entity( entity_chunks_storage=entity_chunks_storage, relation_chunks_storage=relation_chunks_storage, ) + operation_summary["operation_status"] = "success" + return {**edit_result, "operation_summary": operation_summary} + except Exception as e: logger.error(f"Error while editing entity '{entity_name}': {e}") raise diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 7a268642..7cf1aec6 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -143,6 +143,21 @@ export type QueryResponse = { response: string } +export type EntityUpdateResponse = { + status: string + message: string + data: Record + operation_summary?: { + merged: boolean + merge_status: 'success' | 'failed' | 'not_attempted' + merge_error: string | null + operation_status: 'success' | 'partial_success' | 'failure' + target_entity: string | null + final_entity?: string | null + renamed?: boolean + } +} + export type DocActionResponse = { status: 'success' | 'partial_success' | 'failure' | 'duplicated' message: string @@ -719,17 +734,20 @@ export const loginToServer = async (username: string, password: string): Promise * @param entityName The name of the entity to update * @param updatedData Dictionary containing updated attributes * @param allowRename Whether to allow renaming the entity (default: false) + * @param allowMerge Whether to merge into an existing entity when renaming to a duplicate name * @returns Promise with the updated entity information */ export const updateEntity = async ( entityName: string, updatedData: Record, - allowRename: boolean = false -): Promise => { + allowRename: boolean = false, + allowMerge: boolean = false +): Promise => { const response = await axiosInstance.post('/graph/entity/edit', { entity_name: entityName, updated_data: updatedData, - allow_rename: allowRename + allow_rename: allowRename, + allow_merge: allowMerge }) return response.data } diff --git a/lightrag_webui/src/components/graph/EditablePropertyRow.tsx b/lightrag_webui/src/components/graph/EditablePropertyRow.tsx index 40db2bdf..8f6639c1 100644 --- a/lightrag_webui/src/components/graph/EditablePropertyRow.tsx +++ b/lightrag_webui/src/components/graph/EditablePropertyRow.tsx @@ -3,6 +3,16 @@ import { useTranslation } from 'react-i18next' import { toast } from 'sonner' import { updateEntity, updateRelation, checkEntityNameExists } from '@/api/lightrag' import { useGraphStore } from '@/stores/graph' +import { useSettingsStore } from '@/stores/settings' +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle +} from '@/components/ui/Dialog' +import Button from '@/components/ui/Button' import { PropertyName, EditIcon, PropertyValue } from './PropertyRowComponents' import PropertyEditDialog from './PropertyEditDialog' @@ -48,6 +58,12 @@ const EditablePropertyRow = ({ const [isEditing, setIsEditing] = useState(false) const [isSubmitting, setIsSubmitting] = useState(false) const [currentValue, setCurrentValue] = useState(initialValue) + const [errorMessage, setErrorMessage] = useState(null) + const [mergeDialogOpen, setMergeDialogOpen] = useState(false) + const [mergeDialogInfo, setMergeDialogInfo] = useState<{ + targetEntity: string + sourceEntity: string + } | null>(null) useEffect(() => { setCurrentValue(initialValue) @@ -56,42 +72,111 @@ const EditablePropertyRow = ({ const handleEditClick = () => { if (isEditable && !isEditing) { setIsEditing(true) + setErrorMessage(null) } } const handleCancel = () => { setIsEditing(false) + setErrorMessage(null) } - const handleSave = async (value: string) => { + const handleSave = async (value: string, options?: { allowMerge?: boolean }) => { if (isSubmitting || value === String(currentValue)) { setIsEditing(false) + setErrorMessage(null) return } setIsSubmitting(true) + setErrorMessage(null) try { if (entityType === 'node' && entityId && nodeId) { let updatedData = { [name]: value } + const allowMerge = options?.allowMerge ?? false if (name === 'entity_id') { - const exists = await checkEntityNameExists(value) - if (exists) { - toast.error(t('graphPanel.propertiesView.errors.duplicateName')) - return + if (!allowMerge) { + const exists = await checkEntityNameExists(value) + if (exists) { + const errorMsg = t('graphPanel.propertiesView.errors.duplicateName') + setErrorMessage(errorMsg) + toast.error(errorMsg) + return + } } updatedData = { 'entity_name': value } } - await updateEntity(entityId, updatedData, true) - try { - await useGraphStore.getState().updateNodeAndSelect(nodeId, entityId, name, value) - } catch (error) { - console.error('Error updating node in graph:', error) - throw new Error('Failed to update node in graph') + const response = await updateEntity(entityId, updatedData, true, allowMerge) + const operationSummary = response.operation_summary + const operationStatus = operationSummary?.operation_status || 'complete_success' + const finalValue = operationSummary?.final_entity ?? value + + // Handle different operation statuses + if (operationStatus === 'success') { + if (operationSummary?.merged) { + // Node was successfully merged into an existing entity + setMergeDialogInfo({ + targetEntity: finalValue, + sourceEntity: entityId, + }) + setMergeDialogOpen(true) + toast.success(t('graphPanel.propertiesView.success.entityMerged')) + } else { + // Node was updated/renamed normally + try { + await useGraphStore + .getState() + .updateNodeAndSelect(nodeId, entityId, name, finalValue) + } catch (error) { + console.error('Error updating node in graph:', error) + throw new Error('Failed to update node in graph') + } + toast.success(t('graphPanel.propertiesView.success.entityUpdated')) + } + + // Update local state and notify parent component + // For entity_id updates, use finalValue (which may be different due to merging) + // For other properties, use the original value the user entered + const valueToSet = name === 'entity_id' ? finalValue : value + setCurrentValue(valueToSet) + onValueChange?.(valueToSet) + + } else if (operationStatus === 'partial_success') { + // Partial success: update succeeded but merge failed + // Do NOT update graph data to keep frontend in sync with backend + const mergeError = operationSummary?.merge_error || 'Unknown error' + + const errorMsg = t('graphPanel.propertiesView.errors.updateSuccessButMergeFailed', { + error: mergeError + }) + setErrorMessage(errorMsg) + toast.error(errorMsg) + // Do not update currentValue or call onValueChange + return + + } else { + // Complete failure or unknown status + // Check if this was a merge attempt or just a regular update + if (operationSummary?.merge_status === 'failed') { + // Merge operation was attempted but failed + const mergeError = operationSummary?.merge_error || 'Unknown error' + const errorMsg = t('graphPanel.propertiesView.errors.mergeFailed', { + error: mergeError + }) + setErrorMessage(errorMsg) + toast.error(errorMsg) + } else { + // Regular update failed (no merge involved) + const errorMsg = t('graphPanel.propertiesView.errors.updateFailed') + setErrorMessage(errorMsg) + toast.error(errorMsg) + } + // Do not update currentValue or call onValueChange + return } - toast.success(t('graphPanel.propertiesView.success.entityUpdated')) } else if (entityType === 'edge' && sourceId && targetId && edgeId && dynamicId) { const updatedData = { [name]: value } await updateRelation(sourceId, targetId, updatedData) @@ -102,19 +187,42 @@ const EditablePropertyRow = ({ throw new Error('Failed to update edge in graph') } toast.success(t('graphPanel.propertiesView.success.relationUpdated')) + setCurrentValue(value) + onValueChange?.(value) } setIsEditing(false) - setCurrentValue(value) - onValueChange?.(value) } catch (error) { console.error('Error updating property:', error) - toast.error(t('graphPanel.propertiesView.errors.updateFailed')) + const errorMsg = error instanceof Error ? error.message : t('graphPanel.propertiesView.errors.updateFailed') + setErrorMessage(errorMsg) + toast.error(errorMsg) + return } finally { setIsSubmitting(false) } } + const handleMergeRefresh = (useMergedStart: boolean) => { + const info = mergeDialogInfo + const graphState = useGraphStore.getState() + const settingsState = useSettingsStore.getState() + + graphState.clearSelection() + graphState.setGraphDataFetchAttempted(false) + graphState.setLastSuccessfulQueryLabel('') + + if (useMergedStart && info?.targetEntity) { + settingsState.setQueryLabel(info.targetEntity) + } else { + graphState.incrementGraphDataVersion() + } + + setMergeDialogOpen(false) + setMergeDialogInfo(null) + toast.info(t('graphPanel.propertiesView.mergeDialog.refreshing')) + } + return (
@@ -131,7 +239,45 @@ const EditablePropertyRow = ({ propertyName={name} initialValue={String(currentValue)} isSubmitting={isSubmitting} + errorMessage={errorMessage} /> + + { + setMergeDialogOpen(open) + if (!open) { + setMergeDialogInfo(null) + } + }} + > + + + {t('graphPanel.propertiesView.mergeDialog.title')} + + {t('graphPanel.propertiesView.mergeDialog.description', { + source: mergeDialogInfo?.sourceEntity ?? '', + target: mergeDialogInfo?.targetEntity ?? '', + })} + + +

+ {t('graphPanel.propertiesView.mergeDialog.refreshHint')} +

+ + + + +
+
) } diff --git a/lightrag_webui/src/components/graph/PropertyEditDialog.tsx b/lightrag_webui/src/components/graph/PropertyEditDialog.tsx index 65c71c4e..001861a6 100644 --- a/lightrag_webui/src/components/graph/PropertyEditDialog.tsx +++ b/lightrag_webui/src/components/graph/PropertyEditDialog.tsx @@ -9,14 +9,16 @@ import { DialogDescription } from '@/components/ui/Dialog' import Button from '@/components/ui/Button' +import Checkbox from '@/components/ui/Checkbox' interface PropertyEditDialogProps { isOpen: boolean onClose: () => void - onSave: (value: string) => void + onSave: (value: string, options?: { allowMerge?: boolean }) => void propertyName: string initialValue: string isSubmitting?: boolean + errorMessage?: string | null } /** @@ -29,17 +31,18 @@ const PropertyEditDialog = ({ onSave, propertyName, initialValue, - isSubmitting = false + isSubmitting = false, + errorMessage = null }: PropertyEditDialogProps) => { const { t } = useTranslation() const [value, setValue] = useState('') - // Add error state to display save failure messages - const [error, setError] = useState(null) + const [allowMerge, setAllowMerge] = useState(false) // Initialize value when dialog opens useEffect(() => { if (isOpen) { setValue(initialValue) + setAllowMerge(false) } }, [isOpen, initialValue]) @@ -86,18 +89,8 @@ const PropertyEditDialog = ({ const handleSave = async () => { if (value.trim() !== '') { - // Clear previous error messages - setError(null) - try { - await onSave(value) - onClose() - } catch (error) { - console.error('Save error:', error) - // Set error message to state for UI display - setError(typeof error === 'object' && error !== null - ? (error as Error).message || t('common.saveFailed') - : t('common.saveFailed')) - } + const options = propertyName === 'entity_id' ? { allowMerge } : undefined + await onSave(value, options) } } @@ -116,9 +109,9 @@ const PropertyEditDialog = ({ {/* Display error message if save fails */} - {error && ( -
- {error} + {errorMessage && ( +
+ {errorMessage}
)} @@ -146,6 +139,25 @@ const PropertyEditDialog = ({ })()}
+ {propertyName === 'entity_id' && ( +
+ +
+ )} + - - - - + onRefresh={handleMergeRefresh} + /> ) } diff --git a/lightrag_webui/src/components/graph/GraphLabels.tsx b/lightrag_webui/src/components/graph/GraphLabels.tsx index ca2d1691..17e7a4d3 100644 --- a/lightrag_webui/src/components/graph/GraphLabels.tsx +++ b/lightrag_webui/src/components/graph/GraphLabels.tsx @@ -17,6 +17,7 @@ import { getPopularLabels, searchLabels } from '@/api/lightrag' const GraphLabels = () => { const { t } = useTranslation() const label = useSettingsStore.use.queryLabel() + const dropdownRefreshTrigger = useSettingsStore.use.searchLabelDropdownRefreshTrigger() const [isRefreshing, setIsRefreshing] = useState(false) const [refreshTrigger, setRefreshTrigger] = useState(0) const [selectKey, setSelectKey] = useState(0) @@ -54,6 +55,18 @@ const GraphLabels = () => { initializeHistory() }, []) + // Force AsyncSelect to re-render when label changes externally (e.g., from entity rename/merge) + useEffect(() => { + setSelectKey(prev => prev + 1) + }, [label]) + + // Force AsyncSelect to re-render when dropdown refresh is triggered (e.g., after entity rename) + useEffect(() => { + if (dropdownRefreshTrigger > 0) { + setSelectKey(prev => prev + 1) + } + }, [dropdownRefreshTrigger]) + const fetchData = useCallback( async (query?: string): Promise => { let results: string[] = []; @@ -223,6 +236,9 @@ const GraphLabels = () => { // Update the label to trigger data loading useSettingsStore.getState().setQueryLabel(newLabel); + + // Force graph re-render and reset zoom/scale (must be AFTER setQueryLabel) + useGraphStore.getState().incrementGraphDataVersion(); }} clearable={false} // Prevent clearing value on reselect debounceTime={500} diff --git a/lightrag_webui/src/components/graph/MergeDialog.tsx b/lightrag_webui/src/components/graph/MergeDialog.tsx new file mode 100644 index 00000000..5ba18e3f --- /dev/null +++ b/lightrag_webui/src/components/graph/MergeDialog.tsx @@ -0,0 +1,70 @@ +import { useTranslation } from 'react-i18next' +import { useSettingsStore } from '@/stores/settings' +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle +} from '@/components/ui/Dialog' +import Button from '@/components/ui/Button' + +interface MergeDialogProps { + mergeDialogOpen: boolean + mergeDialogInfo: { + targetEntity: string + sourceEntity: string + } | null + onOpenChange: (open: boolean) => void + onRefresh: (useMergedStart: boolean) => void +} + +/** + * MergeDialog component that appears after a successful entity merge + * Allows user to choose whether to use the merged entity or keep current start point + */ +const MergeDialog = ({ + mergeDialogOpen, + mergeDialogInfo, + onOpenChange, + onRefresh +}: MergeDialogProps) => { + const { t } = useTranslation() + const currentQueryLabel = useSettingsStore.use.queryLabel() + + return ( + + + + {t('graphPanel.propertiesView.mergeDialog.title')} + + {t('graphPanel.propertiesView.mergeDialog.description', { + source: mergeDialogInfo?.sourceEntity ?? '', + target: mergeDialogInfo?.targetEntity ?? '', + })} + + +

+ {t('graphPanel.propertiesView.mergeDialog.refreshHint')} +

+ + {currentQueryLabel !== mergeDialogInfo?.sourceEntity && ( + + )} + + +
+
+ ) +} + +export default MergeDialog diff --git a/lightrag_webui/src/stores/settings.ts b/lightrag_webui/src/stores/settings.ts index 79d3f3d1..983f5c43 100644 --- a/lightrag_webui/src/stores/settings.ts +++ b/lightrag_webui/src/stores/settings.ts @@ -78,6 +78,10 @@ interface SettingsState { currentTab: Tab setCurrentTab: (tab: Tab) => void + + // Search label dropdown refresh trigger (non-persistent, runtime only) + searchLabelDropdownRefreshTrigger: number + triggerSearchLabelDropdownRefresh: () => void } const useSettingsStoreBase = create()( @@ -229,7 +233,14 @@ const useSettingsStoreBase = create()( }) }, - setUserPromptHistory: (history: string[]) => set({ userPromptHistory: history }) + setUserPromptHistory: (history: string[]) => set({ userPromptHistory: history }), + + // Search label dropdown refresh trigger (not persisted) + searchLabelDropdownRefreshTrigger: 0, + triggerSearchLabelDropdownRefresh: () => + set((state) => ({ + searchLabelDropdownRefreshTrigger: state.searchLabelDropdownRefreshTrigger + 1 + })) }), { name: 'settings-storage',