From 4937de8809490f2ad247a8d3ae194345b902623b Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Sun, 22 Jun 2025 15:12:09 +0800 Subject: [PATCH] Update --- README-zh.md | 88 ++++++++++++++++++++ README.md | 154 +++++++++++++++++++++++++++++++++++ lightrag/base.py | 22 ++--- lightrag/kg/json_kv_impl.py | 78 +++++++++--------- lightrag/kg/postgres_impl.py | 49 +++++++++-- lightrag/lightrag.py | 22 ++--- 6 files changed, 347 insertions(+), 66 deletions(-) diff --git a/README-zh.md b/README-zh.md index 9e892012..685c9468 100644 --- a/README-zh.md +++ b/README-zh.md @@ -932,6 +932,94 @@ rag.insert_custom_kg(custom_kg) +## 删除功能 + +LightRAG提供了全面的删除功能,允许您删除文档、实体和关系。 + +
+ 删除实体 + +您可以通过实体名称删除实体及其所有关联关系: + +```python +# 删除实体及其所有关系(同步版本) +rag.delete_by_entity("Google") + +# 异步版本 +await rag.adelete_by_entity("Google") +``` + +删除实体时会: +- 从知识图谱中移除该实体节点 +- 删除该实体的所有关联关系 +- 从向量数据库中移除相关的嵌入向量 +- 保持知识图谱的完整性 + +
+ +
+ 删除关系 + +您可以删除两个特定实体之间的关系: + +```python +# 删除两个实体之间的关系(同步版本) +rag.delete_by_relation("Google", "Gmail") + +# 异步版本 +await rag.adelete_by_relation("Google", "Gmail") +``` + +删除关系时会: +- 移除指定的关系边 +- 从向量数据库中删除关系的嵌入向量 +- 保留两个实体节点及其他关系 + +
+ +
+ 通过文档ID删除 + +您可以通过文档ID删除整个文档及其相关的所有知识: + +```python +# 通过文档ID删除(异步版本) +await rag.adelete_by_doc_id("doc-12345") +``` + +通过文档ID删除时的优化处理: +- **智能清理**:自动识别并删除仅属于该文档的实体和关系 +- **保留共享知识**:如果实体或关系在其他文档中也存在,则会保留并重新构建描述 +- **缓存优化**:清理相关的LLM缓存以减少存储开销 +- **增量重建**:从剩余文档重新构建受影响的实体和关系描述 + +删除过程包括: +1. 删除文档相关的所有文本块 +2. 识别仅属于该文档的实体和关系并删除 +3. 重新构建在其他文档中仍存在的实体和关系 +4. 更新所有相关的向量索引 +5. 清理文档状态记录 + +注意:通过文档ID删除是一个异步操作,因为它涉及复杂的知识图谱重构过程。 + +
+ +
+ 删除注意事项 + +**重要提醒:** + +1. **不可逆操作**:所有删除操作都是不可逆的,请谨慎使用 +2. **性能考虑**:删除大量数据时可能需要一些时间,特别是通过文档ID删除 +3. **数据一致性**:删除操作会自动维护知识图谱和向量数据库之间的一致性 +4. **备份建议**:在执行重要删除操作前建议备份数据 + +**批量删除建议:** +- 对于批量删除操作,建议使用异步方法以获得更好的性能 +- 大规模删除时,考虑分批进行以避免系统负载过高 + +
+ ## 实体合并
diff --git a/README.md b/README.md index 8fdf4439..31c03bff 100644 --- a/README.md +++ b/README.md @@ -988,6 +988,160 @@ These operations maintain data consistency across both the graph database and ve
+## Delete Functions + +LightRAG provides comprehensive deletion capabilities, allowing you to delete documents, entities, and relationships. + +
+ Delete Entities + +You can delete entities by their name along with all associated relationships: + +```python +# Delete entity and all its relationships (synchronous version) +rag.delete_by_entity("Google") + +# Asynchronous version +await rag.adelete_by_entity("Google") +``` + +When deleting an entity: +- Removes the entity node from the knowledge graph +- Deletes all associated relationships +- Removes related embedding vectors from the vector database +- Maintains knowledge graph integrity + +
+ +
+ Delete Relations + +You can delete relationships between two specific entities: + +```python +# Delete relationship between two entities (synchronous version) +rag.delete_by_relation("Google", "Gmail") + +# Asynchronous version +await rag.adelete_by_relation("Google", "Gmail") +``` + +When deleting a relationship: +- Removes the specified relationship edge +- Deletes the relationship's embedding vector from the vector database +- Preserves both entity nodes and their other relationships + +
+ +
+ Delete by Document ID + +You can delete an entire document and all its related knowledge through document ID: + +```python +# Delete by document ID (asynchronous version) +await rag.adelete_by_doc_id("doc-12345") +``` + +Optimized processing when deleting by document ID: +- **Smart Cleanup**: Automatically identifies and removes entities and relationships that belong only to this document +- **Preserve Shared Knowledge**: If entities or relationships exist in other documents, they are preserved and their descriptions are rebuilt +- **Cache Optimization**: Clears related LLM cache to reduce storage overhead +- **Incremental Rebuilding**: Reconstructs affected entity and relationship descriptions from remaining documents + +The deletion process includes: +1. Delete all text chunks related to the document +2. Identify and delete entities and relationships that belong only to this document +3. Rebuild entities and relationships that still exist in other documents +4. Update all related vector indexes +5. Clean up document status records + +Note: Deletion by document ID is an asynchronous operation as it involves complex knowledge graph reconstruction processes. + +
+ +**Important Reminders:** + +1. **Irreversible Operations**: All deletion operations are irreversible, please use with caution +2. **Performance Considerations**: Deleting large amounts of data may take some time, especially deletion by document ID +3. **Data Consistency**: Deletion operations automatically maintain consistency between the knowledge graph and vector database +4. **Backup Recommendations**: Consider backing up data before performing important deletion operations + +**Batch Deletion Recommendations:** +- For batch deletion operations, consider using asynchronous methods for better performance +- For large-scale deletions, consider processing in batches to avoid excessive system load + +## Entity Merging + +
+ Merge Entities and Their Relationships + +LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships: + +```python +# Basic entity merging +rag.merge_entities( + source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"], + target_entity="AI Technology" +) +``` + +With custom merge strategy: + +```python +# Define custom merge strategy for different fields +rag.merge_entities( + source_entities=["John Smith", "Dr. Smith", "J. Smith"], + target_entity="John Smith", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "entity_type": "keep_first", # Keep the entity type from the first entity + "source_id": "join_unique" # Combine all unique source IDs + } +) +``` + +With custom target entity data: + +```python +# Specify exact values for the merged entity +rag.merge_entities( + source_entities=["New York", "NYC", "Big Apple"], + target_entity="New York City", + target_entity_data={ + "entity_type": "LOCATION", + "description": "New York City is the most populous city in the United States.", + } +) +``` + +Advanced usage combining both approaches: + +```python +# Merge company entities with both strategy and custom data +rag.merge_entities( + source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"], + target_entity="Microsoft", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "source_id": "join_unique" # Combine source IDs + }, + target_entity_data={ + "entity_type": "ORGANIZATION", + } +) +``` + +When merging entities: + +* All relationships from source entities are redirected to the target entity +* Duplicate relationships are intelligently merged +* Self-relationships (loops) are prevented +* Source entities are removed after merging +* Relationship weights and attributes are preserved + +
+ ## Entity Merging
diff --git a/lightrag/base.py b/lightrag/base.py index e66a67db..b8e4d642 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -278,20 +278,20 @@ class BaseKVStorage(StorageNameSpace, ABC): False: if the cache drop failed, or the cache mode is not supported """ - async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: - """Delete specific cache records from storage by chunk IDs + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed - Args: - chunk_ids (list[str]): List of chunk IDs to be dropped from storage + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage - Returns: - True: if the cache drop successfully - False: if the cache drop failed, or the operation is not supported - """ + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed, or the operation is not supported + # """ @dataclass diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2345e50f..fa819d4a 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -172,52 +172,52 @@ class JsonKVStorage(BaseKVStorage): except Exception: return False - async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: - """Delete specific cache records from storage by chunk IDs + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed - Args: - chunk_ids (list[str]): List of chunk IDs to be dropped from storage + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage - Returns: - True: if the cache drop successfully - False: if the cache drop failed - """ - if not chunk_ids: - return False + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed + # """ + # if not chunk_ids: + # return False - try: - async with self._storage_lock: - # Iterate through all cache modes to find entries with matching chunk_ids - for mode_key, mode_data in list(self._data.items()): - if isinstance(mode_data, dict): - # Check each cached entry in this mode - for cache_key, cache_entry in list(mode_data.items()): - if ( - isinstance(cache_entry, dict) - and cache_entry.get("chunk_id") in chunk_ids - ): - # Remove this cache entry - del mode_data[cache_key] - logger.debug( - f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" - ) + # try: + # async with self._storage_lock: + # # Iterate through all cache modes to find entries with matching chunk_ids + # for mode_key, mode_data in list(self._data.items()): + # if isinstance(mode_data, dict): + # # Check each cached entry in this mode + # for cache_key, cache_entry in list(mode_data.items()): + # if ( + # isinstance(cache_entry, dict) + # and cache_entry.get("chunk_id") in chunk_ids + # ): + # # Remove this cache entry + # del mode_data[cache_key] + # logger.debug( + # f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" + # ) - # If the mode is now empty, remove it entirely - if not mode_data: - del self._data[mode_key] + # # If the mode is now empty, remove it entirely + # if not mode_data: + # del self._data[mode_key] - # Set update flags to notify persistence is needed - await set_all_update_flags(self.namespace) + # # Set update flags to notify persistence is needed + # await set_all_update_flags(self.namespace) - logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") - return True - except Exception as e: - logger.error(f"Error clearing cache by chunk IDs: {e}") - return False + # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") + # return True + # except Exception as e: + # logger.error(f"Error clearing cache by chunk IDs: {e}") + # return False async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 465643f0..f9dabafb 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -106,6 +106,35 @@ class PostgreSQLDB: ): pass + async def _migrate_llm_cache_add_chunk_id(self): + """Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" + try: + # Check if chunk_id column exists + check_column_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_llm_cache' + AND column_name = 'chunk_id' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") + add_column_sql = """ + ALTER TABLE LIGHTRAG_LLM_CACHE + ADD COLUMN chunk_id VARCHAR(255) NULL + """ + await self.execute(add_column_sql) + logger.info( + "Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table" + ) + else: + logger.info( + "chunk_id column already exists in LIGHTRAG_LLM_CACHE table" + ) + except Exception as e: + logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}") + async def _migrate_timestamp_columns(self): """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time""" # Tables and columns that need migration @@ -203,6 +232,13 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") # Don't throw an exception, allow the initialization process to continue + # Migrate LLM cache table to add chunk_id field if needed + try: + await self._migrate_llm_cache_add_chunk_id() + except Exception as e: + logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") + # Don't throw an exception, allow the initialization process to continue + async def query( self, sql: str, @@ -497,6 +533,7 @@ class PGKVStorage(BaseKVStorage): "original_prompt": v["original_prompt"], "return_value": v["return"], "mode": mode, + "chunk_id": v.get("chunk_id"), } await self.db.execute(upsert_sql, _data) @@ -2357,6 +2394,7 @@ TABLES = { mode varchar(32) NOT NULL, original_prompt TEXT, return_value TEXT, + chunk_id VARCHAR(255) NULL, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id) @@ -2389,10 +2427,10 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2 """, - "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 """, - "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3 """, "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content @@ -2402,7 +2440,7 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids}) """, - "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids}) """, "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", @@ -2411,12 +2449,13 @@ SQL_TEMPLATES = { ON CONFLICT (workspace,id) DO UPDATE SET content = $2, update_time = CURRENT_TIMESTAMP """, - "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode) - VALUES ($1, $2, $3, $4, $5) + "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (workspace,mode,id) DO UPDATE SET original_prompt = EXCLUDED.original_prompt, return_value=EXCLUDED.return_value, mode=EXCLUDED.mode, + chunk_id=EXCLUDED.chunk_id, update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f1c3747b..d299080a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1710,17 +1710,17 @@ class LightRAG: chunk_ids = set(related_chunks.keys()) logger.info(f"Found {len(chunk_ids)} chunks to delete") - # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks - logger.info("Clearing LLM cache for related chunks...") - cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( - list(chunk_ids) - ) - if cache_cleared: - logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") - else: - logger.warning( - "Failed to clear chunk cache or cache clearing not supported" - ) + # # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks + # logger.info("Clearing LLM cache for related chunks...") + # cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( + # list(chunk_ids) + # ) + # if cache_cleared: + # logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") + # else: + # logger.warning( + # "Failed to clear chunk cache or cache clearing not supported" + # ) # 4. Analyze entities and relationships that will be affected entities_to_delete = set()