From 01e3c409c284081fcaba0653edcd230c4a5c341b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:18:39 +0800 Subject: [PATCH] cherry-pick ab32456a --- lightrag/constants.py | 2 +- lightrag/utils_graph.py | 332 +++++++++++----------------------------- 2 files changed, 92 insertions(+), 242 deletions(-) diff --git a/lightrag/constants.py b/lightrag/constants.py index 326e9f74..5f1fd3ba 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [ "NaturalObject", ] -# Separator for graph fields +# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted) GRAPH_FIELD_SEP = "" # Query and retrieval configuration defaults diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index d782059d..da7da5a9 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -5,7 +5,7 @@ import asyncio from typing import Any, cast from .base import DeletionResult -from .kg.shared_storage import get_storage_keyed_lock +from .kg.shared_storage import get_graph_db_lock from .constants import GRAPH_FIELD_SEP from .utils import compute_mdhash_id, logger from .base import StorageNameSpace @@ -74,12 +74,9 @@ async def adelete_by_entity( entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity relation_chunks_storage: Optional KV storage for tracking chunks that reference relations """ - # Use keyed lock for entity to ensure atomic graph and vector db operations - workspace = entities_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - async with get_storage_keyed_lock( - [entity_name], namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: # Check if the entity exists if not await chunk_entity_relation_graph.has_node(entity_name): @@ -170,18 +167,14 @@ async def adelete_by_relation( relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation """ relation_str = f"{source_entity} -> {target_entity}" - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity - - # Use keyed lock for relation to ensure atomic graph and vector db operations - workspace = relationships_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - sorted_edge_key = sorted([source_entity, target_entity]) - async with get_storage_keyed_lock( - sorted_edge_key, namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # Check if the relation exists edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -274,19 +267,9 @@ async def aedit_entity( Returns: Dictionary containing updated entity information """ - # Determine entities to lock - new_entity_name = updated_data.get("entity_name", entity_name) - is_renaming = new_entity_name != entity_name - - # Lock both original and new entity names if renaming - lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name] - - # Use keyed lock for entity to ensure atomic graph and vector db operations - workspace = entities_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - async with get_storage_keyed_lock( - lock_keys, namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: # Save original entity name for chunk tracking updates original_entity_name = entity_name @@ -297,6 +280,10 @@ async def aedit_entity( raise ValueError(f"Entity '{entity_name}' does not exist") node_data = await chunk_entity_relation_graph.get_node(entity_name) + # Check if entity is being renamed + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + # If renaming, check if new name already exists if is_renaming: if not allow_rename: @@ -632,18 +619,14 @@ async def aedit_relation( Returns: Dictionary containing updated relation information """ - # Normalize entity order for undirected graph (ensures consistent key generation) - if source_entity > target_entity: - source_entity, target_entity = target_entity, source_entity - - # Use keyed lock for relation to ensure atomic graph and vector db operations - workspace = relationships_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - sorted_edge_key = sorted([source_entity, target_entity]) - async with get_storage_keyed_lock( - sorted_edge_key, namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: + # Normalize entity order for undirected graph (ensures consistent key generation) + if source_entity > target_entity: + source_entity, target_entity = target_entity, source_entity + # 1. Get current relation information edge_exists = await chunk_entity_relation_graph.has_edge( source_entity, target_entity @@ -816,12 +799,9 @@ async def acreate_entity( Returns: Dictionary containing created entity information """ - # Use keyed lock for entity to ensure atomic graph and vector db operations - workspace = entities_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - async with get_storage_keyed_lock( - [entity_name], namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: # Check if entity already exists existing_node = await chunk_entity_relation_graph.has_node(entity_name) @@ -930,13 +910,9 @@ async def acreate_relation( Returns: Dictionary containing created relation information """ - # Use keyed lock for relation to ensure atomic graph and vector db operations - workspace = relationships_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - sorted_edge_key = sorted([source_entity, target_entity]) - async with get_storage_keyed_lock( - sorted_edge_key, namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: # Check if both entities exist source_exists = await chunk_entity_relation_graph.has_node(source_entity) @@ -1062,14 +1038,11 @@ async def amerge_entities( target_entity: str, merge_strategy: dict[str, str] = None, target_entity_data: dict[str, Any] = None, - entity_chunks_storage=None, - relation_chunks_storage=None, ) -> dict[str, Any]: """Asynchronously merge multiple entities into one entity. Merges multiple source entities into a target entity, handling all relationships, and updating both the knowledge graph and vector database. - Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage. Args: chunk_entity_relation_graph: Graph storage instance @@ -1081,23 +1054,13 @@ async def amerge_entities( customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} - entity_chunks_storage: Optional KV storage for tracking chunks that reference entities - relation_chunks_storage: Optional KV storage for tracking chunks that reference relations Returns: Dictionary containing the merged entity information """ - # Collect all entities involved (source + target) and lock them all in sorted order - all_entities = set(source_entities) - all_entities.add(target_entity) - lock_keys = sorted(all_entities) - - # Use keyed lock for all entities to ensure atomic graph and vector db operations - workspace = entities_vdb.global_config.get("workspace", "") - namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - async with get_storage_keyed_lock( - lock_keys, namespace=namespace, enable_logging=False - ): + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: try: # Default merge strategy for entities default_entity_merge_strategy = { @@ -1109,7 +1072,8 @@ async def amerge_entities( effective_entity_merge_strategy = default_entity_merge_strategy if merge_strategy: logger.warning( - "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release." + "merge_strategy parameter is deprecated and will be ignored in a future " + "release. Provided overrides will be applied for now." ) effective_entity_merge_strategy = { **default_entity_merge_strategy, @@ -1136,7 +1100,7 @@ async def amerge_entities( await chunk_entity_relation_graph.get_node(target_entity) ) logger.info( - "Entity Merge: target entity already exists, source and target entities will be merged" + f"Target entity '{target_entity}' already exists, will merge data" ) # 3. Merge entity data @@ -1154,11 +1118,11 @@ async def amerge_entities( # 4. Get all relationships of the source entities and target entity (if exists) all_relations = [] entities_to_collect = source_entities.copy() - - # If target entity exists and not already in source_entities, add it - if target_exists and target_entity not in source_entities: + + # If target entity exists, also collect its relationships for merging + if target_exists: entities_to_collect.append(target_entity) - + for entity_name in entities_to_collect: # Get all relationships of the entities edges = await chunk_entity_relation_graph.get_node_edges(entity_name) @@ -1177,75 +1141,32 @@ async def amerge_entities( await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Entity Merge: created target '{target_entity}'") + logger.info(f"Created new target entity '{target_entity}'") else: await chunk_entity_relation_graph.upsert_node( target_entity, merged_entity_data ) - logger.info(f"Entity Merge: Updated target '{target_entity}'") + logger.info(f"Updated existing target entity '{target_entity}'") - # 6. Recreate all relations pointing to the target entity in KG - # Also collect chunk tracking information in the same loop + # 6. Recreate all relationships, pointing to the target entity relation_updates = {} # Track relationships that need to be merged relations_to_delete = [] - # Initialize chunk tracking variables - relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids - old_relation_keys_to_delete = [] - for src, tgt, edge_data in all_relations: relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - - # Collect old chunk tracking key for deletion - if relation_chunks_storage is not None: - from .utils import make_relation_chunk_key - - old_storage_key = make_relation_chunk_key(src, tgt) - old_relation_keys_to_delete.append(old_storage_key) - new_src = target_entity if src in source_entities else src new_tgt = target_entity if tgt in source_entities else tgt # Skip relationships between source entities to avoid self-loops if new_src == new_tgt: logger.info( - f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop" + f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" ) continue - # Normalize entity order for consistent duplicate detection (undirected relationships) - normalized_src, normalized_tgt = sorted([new_src, new_tgt]) - relation_key = f"{normalized_src}|{normalized_tgt}" - - # Process chunk tracking for this relation - if relation_chunks_storage is not None: - storage_key = make_relation_chunk_key( - normalized_src, normalized_tgt - ) - - # Get chunk_ids from storage for this original relation - stored = await relation_chunks_storage.get_by_id(old_storage_key) - - if stored is not None and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - else: - # Fallback to source_id from graph - source_id = edge_data.get("source_id", "") - chunk_ids = [ - cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid - ] - - # Accumulate chunk_ids with ordered deduplication - if storage_key not in relation_chunk_tracking: - relation_chunk_tracking[storage_key] = [] - - existing_chunks = set(relation_chunk_tracking[storage_key]) - for chunk_id in chunk_ids: - if chunk_id not in existing_chunks: - existing_chunks.add(chunk_id) - relation_chunk_tracking[storage_key].append(chunk_id) - + # Check if the same relationship already exists + relation_key = f"{new_src}|{new_tgt}" if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] @@ -1262,53 +1183,57 @@ async def amerge_entities( ) relation_updates[relation_key]["data"] = merged_relation logger.info( - f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`" + f"Merged duplicate relationship: {new_src} -> {new_tgt}" ) else: relation_updates[relation_key] = { - "graph_src": new_src, - "graph_tgt": new_tgt, - "norm_src": normalized_src, - "norm_tgt": normalized_tgt, + "src": new_src, + "tgt": new_tgt, "data": edge_data.copy(), } # Apply relationship updates for rel_data in relation_updates.values(): await chunk_entity_relation_graph.upsert_edge( - rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] + rel_data["src"], rel_data["tgt"], rel_data["data"] ) logger.info( - f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`" + f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" ) - # Update relation chunk tracking storage - if relation_chunks_storage is not None and all_relations: - if old_relation_keys_to_delete: - await relation_chunks_storage.delete(old_relation_keys_to_delete) + # Delete relationships records from vector database + await relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity from vector database" + ) - if relation_chunk_tracking: - updates = {} - for storage_key, chunk_ids in relation_chunk_tracking.items(): - updates[storage_key] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } + # 7. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description - await relation_chunks_storage.upsert(updates) - logger.info( - f"Entity Merge: merged chunk tracking for {len(updates)} relations" - ) + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } - # 7. Update relationship vector representations - logger.info( - f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb" - ) - await relationships_vdb.delete(relations_to_delete) + await entities_vdb.upsert(entity_data_for_vdb) + + # 8. Update relationship vector representations for rel_data in relation_updates.values(): + src = rel_data["src"] + tgt = rel_data["tgt"] edge_data = rel_data["data"] - normalized_src = rel_data["norm_src"] - normalized_tgt = rel_data["norm_tgt"] + + # Normalize entity order for consistent vector storage + normalized_src, normalized_tgt = sorted([src, tgt]) description = edge_data.get("description", "") keywords = edge_data.get("keywords", "") @@ -1334,112 +1259,37 @@ async def amerge_entities( "weight": weight, } } + await relationships_vdb.upsert(relation_data_for_vdb) - logger.info( - f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" - ) - # 8. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - await entities_vdb.upsert(entity_data_for_vdb) - logger.info(f"Entity Merge: updating vdb `{target_entity}`") - - # 9. Merge entity chunk tracking (source entities first, then target entity) - if entity_chunks_storage is not None: - all_chunk_id_lists = [] - - # Build list of entities to process (source entities first, then target entity) - entities_to_process = [] - - # Add source entities first (excluding target if it's already in source list) - for entity_name in source_entities: - if entity_name != target_entity: - entities_to_process.append(entity_name) - - # Add target entity last (if it exists) - if target_exists: - entities_to_process.append(target_entity) - - # Process all entities in order with unified logic - for entity_name in entities_to_process: - stored = await entity_chunks_storage.get_by_id(entity_name) - if stored and isinstance(stored, dict): - chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid] - if chunk_ids: - all_chunk_id_lists.append(chunk_ids) - - # Merge chunk_ids with ordered deduplication (preserves order, source entities first) - merged_chunk_ids = [] - seen = set() - for chunk_id_list in all_chunk_id_lists: - for chunk_id in chunk_id_list: - if chunk_id not in seen: - seen.add(chunk_id) - merged_chunk_ids.append(chunk_id) - - # Delete source entities' chunk tracking records - entity_keys_to_delete = [ - e for e in source_entities if e != target_entity - ] - if entity_keys_to_delete: - await entity_chunks_storage.delete(entity_keys_to_delete) - - # Update target entity's chunk tracking - if merged_chunk_ids: - await entity_chunks_storage.upsert( - { - target_entity: { - "chunk_ids": merged_chunk_ids, - "count": len(merged_chunk_ids), - } - } - ) - logger.info( - f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'" - ) - - # 10. Delete source entities + # 9. Delete source entities for entity_name in source_entities: if entity_name == target_entity: - logger.warning( - f"Entity Merge: source entity'{entity_name}' is same as target entity" + logger.info( + f"Skipping deletion of '{entity_name}' as it's also the target entity" ) continue - logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb") - - # Delete entity node and related edges from knowledge graph + # Delete entity node from knowledge graph await chunk_entity_relation_graph.delete_node(entity_name) # Delete entity record from vector database entity_id = compute_mdhash_id(entity_name, prefix="ent-") await entities_vdb.delete([entity_id]) - # 11. Save changes + logger.info( + f"Deleted source entity '{entity_name}' and its vector embedding from database" + ) + + # 10. Save changes await _persist_graph_updates( entities_vdb=entities_vdb, relationships_vdb=relationships_vdb, chunk_entity_relation_graph=chunk_entity_relation_graph, - entity_chunks_storage=entity_chunks_storage, - relation_chunks_storage=relation_chunks_storage, ) logger.info( - f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'" + f"Successfully merged {len(source_entities)} entities into '{target_entity}'" ) return await get_entity_info( chunk_entity_relation_graph,