This commit is contained in:
zrguo 2025-06-22 15:12:09 +08:00
parent afdc2b3da8
commit 4937de8809
6 changed files with 347 additions and 66 deletions

View file

@ -932,6 +932,94 @@ rag.insert_custom_kg(custom_kg)
</details>
## 删除功能
LightRAG提供了全面的删除功能允许您删除文档、实体和关系。
<details>
<summary> <b>删除实体</b> </summary>
您可以通过实体名称删除实体及其所有关联关系:
```python
# 删除实体及其所有关系(同步版本)
rag.delete_by_entity("Google")
# 异步版本
await rag.adelete_by_entity("Google")
```
删除实体时会:
- 从知识图谱中移除该实体节点
- 删除该实体的所有关联关系
- 从向量数据库中移除相关的嵌入向量
- 保持知识图谱的完整性
</details>
<details>
<summary> <b>删除关系</b> </summary>
您可以删除两个特定实体之间的关系:
```python
# 删除两个实体之间的关系(同步版本)
rag.delete_by_relation("Google", "Gmail")
# 异步版本
await rag.adelete_by_relation("Google", "Gmail")
```
删除关系时会:
- 移除指定的关系边
- 从向量数据库中删除关系的嵌入向量
- 保留两个实体节点及其他关系
</details>
<details>
<summary> <b>通过文档ID删除</b> </summary>
您可以通过文档ID删除整个文档及其相关的所有知识
```python
# 通过文档ID删除异步版本
await rag.adelete_by_doc_id("doc-12345")
```
通过文档ID删除时的优化处理
- **智能清理**:自动识别并删除仅属于该文档的实体和关系
- **保留共享知识**:如果实体或关系在其他文档中也存在,则会保留并重新构建描述
- **缓存优化**清理相关的LLM缓存以减少存储开销
- **增量重建**:从剩余文档重新构建受影响的实体和关系描述
删除过程包括:
1. 删除文档相关的所有文本块
2. 识别仅属于该文档的实体和关系并删除
3. 重新构建在其他文档中仍存在的实体和关系
4. 更新所有相关的向量索引
5. 清理文档状态记录
注意通过文档ID删除是一个异步操作因为它涉及复杂的知识图谱重构过程。
</details>
<details>
<summary> <b>删除注意事项</b> </summary>
**重要提醒:**
1. **不可逆操作**:所有删除操作都是不可逆的,请谨慎使用
2. **性能考虑**删除大量数据时可能需要一些时间特别是通过文档ID删除
3. **数据一致性**:删除操作会自动维护知识图谱和向量数据库之间的一致性
4. **备份建议**:在执行重要删除操作前建议备份数据
**批量删除建议:**
- 对于批量删除操作,建议使用异步方法以获得更好的性能
- 大规模删除时,考虑分批进行以避免系统负载过高
</details>
## 实体合并
<details>

154
README.md
View file

@ -988,6 +988,160 @@ These operations maintain data consistency across both the graph database and ve
</details>
## Delete Functions
LightRAG provides comprehensive deletion capabilities, allowing you to delete documents, entities, and relationships.
<details>
<summary> <b>Delete Entities</b> </summary>
You can delete entities by their name along with all associated relationships:
```python
# Delete entity and all its relationships (synchronous version)
rag.delete_by_entity("Google")
# Asynchronous version
await rag.adelete_by_entity("Google")
```
When deleting an entity:
- Removes the entity node from the knowledge graph
- Deletes all associated relationships
- Removes related embedding vectors from the vector database
- Maintains knowledge graph integrity
</details>
<details>
<summary> <b>Delete Relations</b> </summary>
You can delete relationships between two specific entities:
```python
# Delete relationship between two entities (synchronous version)
rag.delete_by_relation("Google", "Gmail")
# Asynchronous version
await rag.adelete_by_relation("Google", "Gmail")
```
When deleting a relationship:
- Removes the specified relationship edge
- Deletes the relationship's embedding vector from the vector database
- Preserves both entity nodes and their other relationships
</details>
<details>
<summary> <b>Delete by Document ID</b> </summary>
You can delete an entire document and all its related knowledge through document ID:
```python
# Delete by document ID (asynchronous version)
await rag.adelete_by_doc_id("doc-12345")
```
Optimized processing when deleting by document ID:
- **Smart Cleanup**: Automatically identifies and removes entities and relationships that belong only to this document
- **Preserve Shared Knowledge**: If entities or relationships exist in other documents, they are preserved and their descriptions are rebuilt
- **Cache Optimization**: Clears related LLM cache to reduce storage overhead
- **Incremental Rebuilding**: Reconstructs affected entity and relationship descriptions from remaining documents
The deletion process includes:
1. Delete all text chunks related to the document
2. Identify and delete entities and relationships that belong only to this document
3. Rebuild entities and relationships that still exist in other documents
4. Update all related vector indexes
5. Clean up document status records
Note: Deletion by document ID is an asynchronous operation as it involves complex knowledge graph reconstruction processes.
</details>
**Important Reminders:**
1. **Irreversible Operations**: All deletion operations are irreversible, please use with caution
2. **Performance Considerations**: Deleting large amounts of data may take some time, especially deletion by document ID
3. **Data Consistency**: Deletion operations automatically maintain consistency between the knowledge graph and vector database
4. **Backup Recommendations**: Consider backing up data before performing important deletion operations
**Batch Deletion Recommendations:**
- For batch deletion operations, consider using asynchronous methods for better performance
- For large-scale deletions, consider processing in batches to avoid excessive system load
## Entity Merging
<details>
<summary> <b>Merge Entities and Their Relationships</b> </summary>
LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships:
```python
# Basic entity merging
rag.merge_entities(
source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"],
target_entity="AI Technology"
)
```
With custom merge strategy:
```python
# Define custom merge strategy for different fields
rag.merge_entities(
source_entities=["John Smith", "Dr. Smith", "J. Smith"],
target_entity="John Smith",
merge_strategy={
"description": "concatenate", # Combine all descriptions
"entity_type": "keep_first", # Keep the entity type from the first entity
"source_id": "join_unique" # Combine all unique source IDs
}
)
```
With custom target entity data:
```python
# Specify exact values for the merged entity
rag.merge_entities(
source_entities=["New York", "NYC", "Big Apple"],
target_entity="New York City",
target_entity_data={
"entity_type": "LOCATION",
"description": "New York City is the most populous city in the United States.",
}
)
```
Advanced usage combining both approaches:
```python
# Merge company entities with both strategy and custom data
rag.merge_entities(
source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"],
target_entity="Microsoft",
merge_strategy={
"description": "concatenate", # Combine all descriptions
"source_id": "join_unique" # Combine source IDs
},
target_entity_data={
"entity_type": "ORGANIZATION",
}
)
```
When merging entities:
* All relationships from source entities are redirected to the target entity
* Duplicate relationships are intelligently merged
* Self-relationships (loops) are prevented
* Source entities are removed after merging
* Relationship weights and attributes are preserved
</details>
## Entity Merging
<details>

View file

@ -278,20 +278,20 @@ class BaseKVStorage(StorageNameSpace, ABC):
False: if the cache drop failed, or the cache mode is not supported
"""
async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
"""Delete specific cache records from storage by chunk IDs
# async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
# """Delete specific cache records from storage by chunk IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
# Importance notes for in-memory storage:
# 1. Changes will be persisted to disk during the next index_done_callback
# 2. update flags to notify other processes that data persistence is needed
Args:
chunk_ids (list[str]): List of chunk IDs to be dropped from storage
# Args:
# chunk_ids (list[str]): List of chunk IDs to be dropped from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed, or the operation is not supported
"""
# Returns:
# True: if the cache drop successfully
# False: if the cache drop failed, or the operation is not supported
# """
@dataclass

View file

@ -172,52 +172,52 @@ class JsonKVStorage(BaseKVStorage):
except Exception:
return False
async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
"""Delete specific cache records from storage by chunk IDs
# async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
# """Delete specific cache records from storage by chunk IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
# Importance notes for in-memory storage:
# 1. Changes will be persisted to disk during the next index_done_callback
# 2. update flags to notify other processes that data persistence is needed
Args:
chunk_ids (list[str]): List of chunk IDs to be dropped from storage
# Args:
# chunk_ids (list[str]): List of chunk IDs to be dropped from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed
"""
if not chunk_ids:
return False
# Returns:
# True: if the cache drop successfully
# False: if the cache drop failed
# """
# if not chunk_ids:
# return False
try:
async with self._storage_lock:
# Iterate through all cache modes to find entries with matching chunk_ids
for mode_key, mode_data in list(self._data.items()):
if isinstance(mode_data, dict):
# Check each cached entry in this mode
for cache_key, cache_entry in list(mode_data.items()):
if (
isinstance(cache_entry, dict)
and cache_entry.get("chunk_id") in chunk_ids
):
# Remove this cache entry
del mode_data[cache_key]
logger.debug(
f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
)
# try:
# async with self._storage_lock:
# # Iterate through all cache modes to find entries with matching chunk_ids
# for mode_key, mode_data in list(self._data.items()):
# if isinstance(mode_data, dict):
# # Check each cached entry in this mode
# for cache_key, cache_entry in list(mode_data.items()):
# if (
# isinstance(cache_entry, dict)
# and cache_entry.get("chunk_id") in chunk_ids
# ):
# # Remove this cache entry
# del mode_data[cache_key]
# logger.debug(
# f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
# )
# If the mode is now empty, remove it entirely
if not mode_data:
del self._data[mode_key]
# # If the mode is now empty, remove it entirely
# if not mode_data:
# del self._data[mode_key]
# Set update flags to notify persistence is needed
await set_all_update_flags(self.namespace)
# # Set update flags to notify persistence is needed
# await set_all_update_flags(self.namespace)
logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
return True
except Exception as e:
logger.error(f"Error clearing cache by chunk IDs: {e}")
return False
# logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
# return True
# except Exception as e:
# logger.error(f"Error clearing cache by chunk IDs: {e}")
# return False
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources

View file

@ -106,6 +106,35 @@ class PostgreSQLDB:
):
pass
async def _migrate_llm_cache_add_chunk_id(self):
"""Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
try:
# Check if chunk_id column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_llm_cache'
AND column_name = 'chunk_id'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
add_column_sql = """
ALTER TABLE LIGHTRAG_LLM_CACHE
ADD COLUMN chunk_id VARCHAR(255) NULL
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
)
else:
logger.info(
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
)
except Exception as e:
logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
# Tables and columns that need migration
@ -203,6 +232,13 @@ class PostgreSQLDB:
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
# Don't throw an exception, allow the initialization process to continue
# Migrate LLM cache table to add chunk_id field if needed
try:
await self._migrate_llm_cache_add_chunk_id()
except Exception as e:
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
# Don't throw an exception, allow the initialization process to continue
async def query(
self,
sql: str,
@ -497,6 +533,7 @@ class PGKVStorage(BaseKVStorage):
"original_prompt": v["original_prompt"],
"return_value": v["return"],
"mode": mode,
"chunk_id": v.get("chunk_id"),
}
await self.db.execute(upsert_sql, _data)
@ -2357,6 +2394,7 @@ TABLES = {
mode varchar(32) NOT NULL,
original_prompt TEXT,
return_value TEXT,
chunk_id VARCHAR(255) NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id)
@ -2389,10 +2427,10 @@ SQL_TEMPLATES = {
chunk_order_index, full_doc_id, file_path
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2
""",
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
""",
"get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
@ -2402,7 +2440,7 @@ SQL_TEMPLATES = {
chunk_order_index, full_doc_id, file_path
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids})
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
@ -2411,12 +2449,13 @@ SQL_TEMPLATES = {
ON CONFLICT (workspace,id) DO UPDATE
SET content = $2, update_time = CURRENT_TIMESTAMP
""",
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode)
VALUES ($1, $2, $3, $4, $5)
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,mode,id) DO UPDATE
SET original_prompt = EXCLUDED.original_prompt,
return_value=EXCLUDED.return_value,
mode=EXCLUDED.mode,
chunk_id=EXCLUDED.chunk_id,
update_time = CURRENT_TIMESTAMP
""",
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,

View file

@ -1710,17 +1710,17 @@ class LightRAG:
chunk_ids = set(related_chunks.keys())
logger.info(f"Found {len(chunk_ids)} chunks to delete")
# 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
logger.info("Clearing LLM cache for related chunks...")
cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
list(chunk_ids)
)
if cache_cleared:
logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
else:
logger.warning(
"Failed to clear chunk cache or cache clearing not supported"
)
# # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
# logger.info("Clearing LLM cache for related chunks...")
# cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
# list(chunk_ids)
# )
# if cache_cleared:
# logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
# else:
# logger.warning(
# "Failed to clear chunk cache or cache clearing not supported"
# )
# 4. Analyze entities and relationships that will be affected
entities_to_delete = set()