Merge pull request #1693 from HKUDS/delete_doc

Feat: Add comprehensive document deletion feature with integrated knowledge graph reconstruction
This commit is contained in:
Daniel.y 2025-06-24 11:33:20 +08:00 committed by GitHub
commit 3acdeeb871
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1311 additions and 285 deletions

View file

@ -932,6 +932,94 @@ rag.insert_custom_kg(custom_kg)
</details>
## 删除功能
LightRAG提供了全面的删除功能允许您删除文档、实体和关系。
<details>
<summary> <b>删除实体</b> </summary>
您可以通过实体名称删除实体及其所有关联关系:
```python
# 删除实体及其所有关系(同步版本)
rag.delete_by_entity("Google")
# 异步版本
await rag.adelete_by_entity("Google")
```
删除实体时会:
- 从知识图谱中移除该实体节点
- 删除该实体的所有关联关系
- 从向量数据库中移除相关的嵌入向量
- 保持知识图谱的完整性
</details>
<details>
<summary> <b>删除关系</b> </summary>
您可以删除两个特定实体之间的关系:
```python
# 删除两个实体之间的关系(同步版本)
rag.delete_by_relation("Google", "Gmail")
# 异步版本
await rag.adelete_by_relation("Google", "Gmail")
```
删除关系时会:
- 移除指定的关系边
- 从向量数据库中删除关系的嵌入向量
- 保留两个实体节点及其他关系
</details>
<details>
<summary> <b>通过文档ID删除</b> </summary>
您可以通过文档ID删除整个文档及其相关的所有知识
```python
# 通过文档ID删除异步版本
await rag.adelete_by_doc_id("doc-12345")
```
通过文档ID删除时的优化处理
- **智能清理**:自动识别并删除仅属于该文档的实体和关系
- **保留共享知识**:如果实体或关系在其他文档中也存在,则会保留并重新构建描述
- **缓存优化**清理相关的LLM缓存以减少存储开销
- **增量重建**:从剩余文档重新构建受影响的实体和关系描述
删除过程包括:
1. 删除文档相关的所有文本块
2. 识别仅属于该文档的实体和关系并删除
3. 重新构建在其他文档中仍存在的实体和关系
4. 更新所有相关的向量索引
5. 清理文档状态记录
注意通过文档ID删除是一个异步操作因为它涉及复杂的知识图谱重构过程。
</details>
<details>
<summary> <b>删除注意事项</b> </summary>
**重要提醒:**
1. **不可逆操作**:所有删除操作都是不可逆的,请谨慎使用
2. **性能考虑**删除大量数据时可能需要一些时间特别是通过文档ID删除
3. **数据一致性**:删除操作会自动维护知识图谱和向量数据库之间的一致性
4. **备份建议**:在执行重要删除操作前建议备份数据
**批量删除建议:**
- 对于批量删除操作,建议使用异步方法以获得更好的性能
- 大规模删除时,考虑分批进行以避免系统负载过高
</details>
## 实体合并
<details>

View file

@ -988,6 +988,89 @@ These operations maintain data consistency across both the graph database and ve
</details>
## Delete Functions
LightRAG provides comprehensive deletion capabilities, allowing you to delete documents, entities, and relationships.
<details>
<summary> <b>Delete Entities</b> </summary>
You can delete entities by their name along with all associated relationships:
```python
# Delete entity and all its relationships (synchronous version)
rag.delete_by_entity("Google")
# Asynchronous version
await rag.adelete_by_entity("Google")
```
When deleting an entity:
- Removes the entity node from the knowledge graph
- Deletes all associated relationships
- Removes related embedding vectors from the vector database
- Maintains knowledge graph integrity
</details>
<details>
<summary> <b>Delete Relations</b> </summary>
You can delete relationships between two specific entities:
```python
# Delete relationship between two entities (synchronous version)
rag.delete_by_relation("Google", "Gmail")
# Asynchronous version
await rag.adelete_by_relation("Google", "Gmail")
```
When deleting a relationship:
- Removes the specified relationship edge
- Deletes the relationship's embedding vector from the vector database
- Preserves both entity nodes and their other relationships
</details>
<details>
<summary> <b>Delete by Document ID</b> </summary>
You can delete an entire document and all its related knowledge through document ID:
```python
# Delete by document ID (asynchronous version)
await rag.adelete_by_doc_id("doc-12345")
```
Optimized processing when deleting by document ID:
- **Smart Cleanup**: Automatically identifies and removes entities and relationships that belong only to this document
- **Preserve Shared Knowledge**: If entities or relationships exist in other documents, they are preserved and their descriptions are rebuilt
- **Cache Optimization**: Clears related LLM cache to reduce storage overhead
- **Incremental Rebuilding**: Reconstructs affected entity and relationship descriptions from remaining documents
The deletion process includes:
1. Delete all text chunks related to the document
2. Identify and delete entities and relationships that belong only to this document
3. Rebuild entities and relationships that still exist in other documents
4. Update all related vector indexes
5. Clean up document status records
Note: Deletion by document ID is an asynchronous operation as it involves complex knowledge graph reconstruction processes.
</details>
**Important Reminders:**
1. **Irreversible Operations**: All deletion operations are irreversible, please use with caution
2. **Performance Considerations**: Deleting large amounts of data may take some time, especially deletion by document ID
3. **Data Consistency**: Deletion operations automatically maintain consistency between the knowledge graph and vector database
4. **Backup Recommendations**: Consider backing up data before performing important deletion operations
**Batch Deletion Recommendations:**
- For batch deletion operations, consider using asynchronous methods for better performance
- For large-scale deletions, consider processing in batches to avoid excessive system load
## Entity Merging
<details>

View file

@ -1 +1 @@
__api_version__ = "0173"
__api_version__ = "0174"

View file

@ -355,7 +355,13 @@ def create_app(args):
)
# Add routes
app.include_router(create_document_routes(rag, doc_manager, api_key))
app.include_router(
create_document_routes(
rag,
doc_manager,
api_key,
)
)
app.include_router(create_query_routes(rag, api_key, args.top_k))
app.include_router(create_graph_routes(rag, api_key))

View file

@ -12,11 +12,18 @@ import pipmaster as pm
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
HTTPException,
UploadFile,
)
from pydantic import BaseModel, Field, field_validator
from lightrag import LightRAG
from lightrag.base import DocProcessingStatus, DocStatus
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@ -252,6 +259,40 @@ Attributes:
"""
class DeleteDocRequest(BaseModel):
doc_id: str = Field(..., description="The ID of the document to delete.")
@field_validator("doc_id", mode="after")
@classmethod
def validate_doc_id(cls, doc_id: str) -> str:
if not doc_id or not doc_id.strip():
raise ValueError("Document ID cannot be empty")
return doc_id.strip()
class DeleteEntityRequest(BaseModel):
entity_name: str = Field(..., description="The name of the entity to delete.")
@field_validator("entity_name", mode="after")
@classmethod
def validate_entity_name(cls, entity_name: str) -> str:
if not entity_name or not entity_name.strip():
raise ValueError("Entity name cannot be empty")
return entity_name.strip()
class DeleteRelationRequest(BaseModel):
source_entity: str = Field(..., description="The name of the source entity.")
target_entity: str = Field(..., description="The name of the target entity.")
@field_validator("source_entity", "target_entity", mode="after")
@classmethod
def validate_entity_names(cls, entity_name: str) -> str:
if not entity_name or not entity_name.strip():
raise ValueError("Entity name cannot be empty")
return entity_name.strip()
class DocStatusResponse(BaseModel):
id: str = Field(description="Document identifier")
content_summary: str = Field(description="Summary of document content")
@ -1318,6 +1359,119 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
class DeleteDocByIdResponse(BaseModel):
"""Response model for single document deletion operation."""
status: Literal["success", "fail", "not_found", "busy"] = Field(
description="Status of the deletion operation"
)
message: str = Field(description="Message describing the operation result")
doc_id: Optional[str] = Field(
default=None, description="The ID of the document."
)
@router.delete(
"/delete_document",
response_model=DeleteDocByIdResponse,
dependencies=[Depends(combined_auth)],
summary="Delete a document and all its associated data by its ID.",
)
# TODO This method needs to be modified to be asynchronous (please do not use)
async def delete_document(
delete_request: DeleteDocRequest,
) -> DeleteDocByIdResponse:
"""
This method needs to be modified to be asynchronous (please do not use)
Deletes a specific document and all its associated data, including its status,
text chunks, vector embeddings, and any related graph data.
It is disabled when llm cache for entity extraction is disabled.
This operation is irreversible and will interact with the pipeline status.
Args:
delete_request (DeleteDocRequest): The request containing the document ID.
Returns:
DeleteDocByIdResponse: The result of the deletion operation.
- status="success": The document was successfully deleted.
- status="not_found": The document with the specified ID was not found.
- status="fail": The deletion operation failed.
- status="busy": The pipeline is busy with another operation.
Raises:
HTTPException:
- 500: If an unexpected internal error occurs.
"""
# The rag object is initialized from the server startup args,
# so we can access its properties here.
if not rag.enable_llm_cache_for_entity_extract:
raise HTTPException(
status_code=403,
detail="Operation not allowed when LLM cache for entity extraction is disabled.",
)
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
doc_id = delete_request.doc_id
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete document while pipeline is busy",
doc_id=doc_id,
)
pipeline_status.update(
{
"busy": True,
"job_name": f"Deleting Document: {doc_id}",
"job_start": datetime.now().isoformat(),
"latest_message": "Starting document deletion process",
}
)
# Use slice assignment to clear the list in place
pipeline_status["history_messages"][:] = [
f"Starting deletion for doc_id: {doc_id}"
]
try:
result = await rag.adelete_by_doc_id(doc_id)
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(result.message)
if result.status == "not_found":
raise HTTPException(status_code=404, detail=result.message)
if result.status == "fail":
raise HTTPException(status_code=500, detail=result.message)
return DeleteDocByIdResponse(
doc_id=result.doc_id,
message=result.message,
status=result.status,
)
except Exception as e:
error_msg = f"Error deleting document {doc_id}: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(error_msg)
# Re-raise as HTTPException for consistent error handling by FastAPI
raise HTTPException(status_code=500, detail=error_msg)
finally:
async with pipeline_status_lock:
pipeline_status["busy"] = False
completion_msg = f"Document deletion process for {doc_id} completed."
pipeline_status["latest_message"] = completion_msg
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(completion_msg)
@router.post(
"/clear_cache",
response_model=ClearCacheResponse,
@ -1371,4 +1525,77 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.delete(
"/delete_entity",
response_model=DeletionResult,
dependencies=[Depends(combined_auth)],
)
async def delete_entity(request: DeleteEntityRequest):
"""
Delete an entity and all its relationships from the knowledge graph.
Args:
request (DeleteEntityRequest): The request body containing the entity name.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
Raises:
HTTPException: If the entity is not found (404) or an error occurs (500).
"""
try:
result = await rag.adelete_by_entity(entity_name=request.entity_name)
if result.status == "not_found":
raise HTTPException(status_code=404, detail=result.message)
if result.status == "fail":
raise HTTPException(status_code=500, detail=result.message)
# Set doc_id to empty string since this is an entity operation, not document
result.doc_id = ""
return result
except HTTPException:
raise
except Exception as e:
error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=error_msg)
@router.delete(
"/delete_relation",
response_model=DeletionResult,
dependencies=[Depends(combined_auth)],
)
async def delete_relation(request: DeleteRelationRequest):
"""
Delete a relationship between two entities from the knowledge graph.
Args:
request (DeleteRelationRequest): The request body containing the source and target entity names.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
Raises:
HTTPException: If the relation is not found (404) or an error occurs (500).
"""
try:
result = await rag.adelete_by_relation(
source_entity=request.source_entity,
target_entity=request.target_entity,
)
if result.status == "not_found":
raise HTTPException(status_code=404, detail=result.message)
if result.status == "fail":
raise HTTPException(status_code=500, detail=result.message)
# Set doc_id to empty string since this is a relation operation, not document
result.doc_id = ""
return result
except HTTPException:
raise
except Exception as e:
error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=error_msg)
return router

View file

@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC):
False: if the cache drop failed, or the cache mode is not supported
"""
# async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
# """Delete specific cache records from storage by chunk IDs
# Importance notes for in-memory storage:
# 1. Changes will be persisted to disk during the next index_done_callback
# 2. update flags to notify other processes that data persistence is needed
# Args:
# chunk_ids (list[str]): List of chunk IDs to be dropped from storage
# Returns:
# True: if the cache drop successfully
# False: if the cache drop failed, or the operation is not supported
# """
@dataclass
class BaseGraphStorage(StorageNameSpace, ABC):
@ -598,3 +613,13 @@ class StoragesStatus(str, Enum):
CREATED = "created"
INITIALIZED = "initialized"
FINALIZED = "finalized"
@dataclass
class DeletionResult:
"""Represents the result of a deletion operation."""
status: Literal["success", "not_found", "fail"]
doc_id: str
message: str
status_code: int = 200

View file

@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage):
except Exception:
return False
# async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
# """Delete specific cache records from storage by chunk IDs
# Importance notes for in-memory storage:
# 1. Changes will be persisted to disk during the next index_done_callback
# 2. update flags to notify other processes that data persistence is needed
# Args:
# chunk_ids (list[str]): List of chunk IDs to be dropped from storage
# Returns:
# True: if the cache drop successfully
# False: if the cache drop failed
# """
# if not chunk_ids:
# return False
# try:
# async with self._storage_lock:
# # Iterate through all cache modes to find entries with matching chunk_ids
# for mode_key, mode_data in list(self._data.items()):
# if isinstance(mode_data, dict):
# # Check each cached entry in this mode
# for cache_key, cache_entry in list(mode_data.items()):
# if (
# isinstance(cache_entry, dict)
# and cache_entry.get("chunk_id") in chunk_ids
# ):
# # Remove this cache entry
# del mode_data[cache_key]
# logger.debug(
# f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
# )
# # If the mode is now empty, remove it entirely
# if not mode_data:
# del self._data[mode_key]
# # Set update flags to notify persistence is needed
# await set_all_update_flags(self.namespace)
# logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
# return True
# except Exception as e:
# logger.error(f"Error clearing cache by chunk IDs: {e}")
# return False
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
This action will persistent the data to disk immediately.

View file

@ -1,4 +1,3 @@
import inspect
import os
import re
from dataclasses import dataclass
@ -307,7 +306,7 @@ class Neo4JStorage(BaseGraphStorage):
for label in node_dict["labels"]
if label != "base"
]
logger.debug(f"Neo4j query node {query} return: {node_dict}")
# logger.debug(f"Neo4j query node {query} return: {node_dict}")
return node_dict
return None
finally:
@ -382,9 +381,9 @@ class Neo4JStorage(BaseGraphStorage):
return 0
degree = record["degree"]
logger.debug(
f"Neo4j query node degree for {node_id} return: {degree}"
)
# logger.debug(
# f"Neo4j query node degree for {node_id} return: {degree}"
# )
return degree
finally:
await result.consume() # Ensure result is fully consumed
@ -424,7 +423,7 @@ class Neo4JStorage(BaseGraphStorage):
logger.warning(f"No node found with label '{nid}'")
degrees[nid] = 0
logger.debug(f"Neo4j batch node degree query returned: {degrees}")
# logger.debug(f"Neo4j batch node degree query returned: {degrees}")
return degrees
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
@ -512,7 +511,7 @@ class Neo4JStorage(BaseGraphStorage):
if records:
try:
edge_result = dict(records[0]["edge_properties"])
logger.debug(f"Result: {edge_result}")
# logger.debug(f"Result: {edge_result}")
# Ensure required keys exist with defaults
required_keys = {
"weight": 0.0,
@ -528,9 +527,9 @@ class Neo4JStorage(BaseGraphStorage):
f"missing {key}, using default: {default_value}"
)
logger.debug(
f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}"
)
# logger.debug(
# f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}"
# )
return edge_result
except (KeyError, TypeError, ValueError) as e:
logger.error(
@ -545,9 +544,9 @@ class Neo4JStorage(BaseGraphStorage):
"keywords": None,
}
logger.debug(
f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}"
)
# logger.debug(
# f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}"
# )
# Return None when no edge found
return None
finally:
@ -766,9 +765,6 @@ class Neo4JStorage(BaseGraphStorage):
result = await tx.run(
query, entity_id=node_id, properties=properties
)
logger.debug(
f"Upserted node with entity_id '{node_id}' and properties: {properties}"
)
await result.consume() # Ensure result is fully consumed
await session.execute_write(execute_upsert)
@ -824,12 +820,7 @@ class Neo4JStorage(BaseGraphStorage):
properties=edge_properties,
)
try:
records = await result.fetch(2)
if records:
logger.debug(
f"Upserted edge from '{source_node_id}' to '{target_node_id}'"
f"with properties: {edge_properties}"
)
await result.fetch(2)
finally:
await result.consume() # Ensure result is consumed

View file

@ -106,6 +106,35 @@ class PostgreSQLDB:
):
pass
async def _migrate_llm_cache_add_chunk_id(self):
"""Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
try:
# Check if chunk_id column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_llm_cache'
AND column_name = 'chunk_id'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
add_column_sql = """
ALTER TABLE LIGHTRAG_LLM_CACHE
ADD COLUMN chunk_id VARCHAR(255) NULL
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
)
else:
logger.info(
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
)
except Exception as e:
logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
# Tables and columns that need migration
@ -203,6 +232,13 @@ class PostgreSQLDB:
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
# Don't throw an exception, allow the initialization process to continue
# Migrate LLM cache table to add chunk_id field if needed
try:
await self._migrate_llm_cache_add_chunk_id()
except Exception as e:
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
# Don't throw an exception, allow the initialization process to continue
async def query(
self,
sql: str,
@ -253,25 +289,31 @@ class PostgreSQLDB:
sql: str,
data: dict[str, Any] | None = None,
upsert: bool = False,
ignore_if_exists: bool = False,
with_age: bool = False,
graph_name: str | None = None,
):
try:
async with self.pool.acquire() as connection: # type: ignore
if with_age and graph_name:
await self.configure_age(connection, graph_name) # type: ignore
await self.configure_age(connection, graph_name)
elif with_age and not graph_name:
raise ValueError("Graph name is required when with_age is True")
if data is None:
await connection.execute(sql) # type: ignore
await connection.execute(sql)
else:
await connection.execute(sql, *data.values()) # type: ignore
await connection.execute(sql, *data.values())
except (
asyncpg.exceptions.UniqueViolationError,
asyncpg.exceptions.DuplicateTableError,
asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error
asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists"
) as e:
if upsert:
if ignore_if_exists:
# If the flag is set, just ignore these specific errors
pass
elif upsert:
print("Key value duplicate, but upsert succeeded.")
else:
logger.error(f"Upsert error: {e}")
@ -497,6 +539,7 @@ class PGKVStorage(BaseKVStorage):
"original_prompt": v["original_prompt"],
"return_value": v["return"],
"mode": mode,
"chunk_id": v.get("chunk_id"),
}
await self.db.execute(upsert_sql, _data)
@ -1175,16 +1218,15 @@ class PGGraphStorage(BaseGraphStorage):
]
for query in queries:
try:
await self.db.execute(
query,
upsert=True,
with_age=True,
graph_name=self.graph_name,
)
# logger.info(f"Successfully executed: {query}")
except Exception:
continue
# Use the new flag to silently ignore "already exists" errors
# at the source, preventing log spam.
await self.db.execute(
query,
upsert=True,
ignore_if_exists=True, # Pass the new flag
with_age=True,
graph_name=self.graph_name,
)
async def finalize(self):
if self.db is not None:
@ -2357,6 +2399,7 @@ TABLES = {
mode varchar(32) NOT NULL,
original_prompt TEXT,
return_value TEXT,
chunk_id VARCHAR(255) NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id)
@ -2389,10 +2432,10 @@ SQL_TEMPLATES = {
chunk_order_index, full_doc_id, file_path
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2
""",
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
""",
"get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
@ -2402,7 +2445,7 @@ SQL_TEMPLATES = {
chunk_order_index, full_doc_id, file_path
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids})
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
@ -2411,12 +2454,13 @@ SQL_TEMPLATES = {
ON CONFLICT (workspace,id) DO UPDATE
SET content = $2, update_time = CURRENT_TIMESTAMP
""",
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode)
VALUES ($1, $2, $3, $4, $5)
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,mode,id) DO UPDATE
SET original_prompt = EXCLUDED.original_prompt,
return_value=EXCLUDED.return_value,
mode=EXCLUDED.mode,
chunk_id=EXCLUDED.chunk_id,
update_time = CURRENT_TIMESTAMP
""",
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,

View file

@ -35,6 +35,7 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
)
from .base import (
@ -47,6 +48,7 @@ from .base import (
QueryParam,
StorageNameSpace,
StoragesStatus,
DeletionResult,
)
from .namespace import NameSpace, make_namespace
from .operate import (
@ -56,6 +58,7 @@ from .operate import (
kg_query,
naive_query,
query_with_keywords,
_rebuild_knowledge_from_chunks,
)
from .prompt import GRAPH_FIELD_SEP
from .utils import (
@ -1207,6 +1210,7 @@ class LightRAG:
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
self.full_docs,
self.doc_status,
self.text_chunks,
self.llm_response_cache,
self.entities_vdb,
@ -1674,24 +1678,45 @@ class LightRAG:
# Return the dictionary containing statuses only for the found document IDs
return found_statuses
# TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
# Document delete is not working properly for most of the storage implementations.
async def adelete_by_doc_id(self, doc_id: str) -> None:
"""Delete a document and all its related data
async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult:
"""Delete a document and all its related data, including chunks, graph elements, and cached entries.
This method orchestrates a comprehensive deletion process for a given document ID.
It ensures that not only the document itself but also all its derived and associated
data across different storage layers are removed. This includes:
1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`.
2. **Chunks**: Removes all associated text chunks from `chunks_vdb`.
3. **Graph Data**:
- Deletes related entities from `entities_vdb`.
- Deletes related relationships from `relationships_vdb`.
- Removes corresponding nodes and edges from the `chunk_entity_relation_graph`.
4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers
a reconstruction of their data from the remaining chunks to ensure consistency.
Args:
doc_id: Document ID to delete
doc_id (str): The unique identifier of the document to be deleted.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
"""
try:
# 1. Get the document status and related data
if not await self.doc_status.get_by_id(doc_id):
logger.warning(f"Document {doc_id} not found")
return
return DeletionResult(
status="not_found",
doc_id=doc_id,
message=f"Document {doc_id} not found.",
status_code=404,
)
logger.debug(f"Starting deletion for document {doc_id}")
logger.info(f"Starting optimized deletion for document {doc_id}")
# 2. Get all chunks related to this document
# Find all chunks where full_doc_id equals the current doc_id
all_chunks = await self.text_chunks.get_all()
related_chunks = {
chunk_id: chunk_data
@ -1702,241 +1727,197 @@ class LightRAG:
if not related_chunks:
logger.warning(f"No chunks found for document {doc_id}")
return
# Still need to delete the doc status and full doc
await self.full_docs.delete([doc_id])
await self.doc_status.delete([doc_id])
return DeletionResult(
status="success",
doc_id=doc_id,
message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.",
status_code=200,
)
# Get all related chunk IDs
chunk_ids = set(related_chunks.keys())
logger.debug(f"Found {len(chunk_ids)} chunks to delete")
logger.info(f"Found {len(chunk_ids)} chunks to delete")
# TODO: self.entities_vdb.client_storage only works for local storage, need to fix this
# # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
# logger.info("Clearing LLM cache for related chunks...")
# cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
# list(chunk_ids)
# )
# if cache_cleared:
# logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
# else:
# logger.warning(
# "Failed to clear chunk cache or cache clearing not supported"
# )
# 3. Before deleting, check the related entities and relationships for these chunks
for chunk_id in chunk_ids:
# Check entities
entities_storage = await self.entities_vdb.client_storage
entities = [
dp
for dp in entities_storage["data"]
if chunk_id in dp.get("source_id")
]
logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities")
# Check relationships
relationships_storage = await self.relationships_vdb.client_storage
relations = [
dp
for dp in relationships_storage["data"]
if chunk_id in dp.get("source_id")
]
logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations")
# Continue with the original deletion process...
# 4. Delete chunks from vector database
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
# 5. Find and process entities and relationships that have these chunks as source
# Get all nodes and edges from the graph storage using storage-agnostic methods
# 4. Analyze entities and relationships that will be affected
entities_to_delete = set()
entities_to_update = {} # entity_name -> new_source_id
entities_to_rebuild = {} # entity_name -> remaining_chunk_ids
relationships_to_delete = set()
relationships_to_update = {} # (src, tgt) -> new_source_id
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
# Process entities - use storage-agnostic methods
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(node_label)
if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids)
if not sources:
entities_to_delete.add(node_label)
logger.debug(
f"Entity {node_label} marked for deletion - no remaining sources"
)
else:
new_source_id = GRAPH_FIELD_SEP.join(sources)
entities_to_update[node_label] = new_source_id
logger.debug(
f"Entity {node_label} will be updated with new source_id: {new_source_id}"
)
# Use graph database lock to ensure atomic merges and updates
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# Process entities
# TODO There is performance when iterating get_all_labels for PostgresSQL
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(
node_label
)
if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
# Process relationships
for node_label in all_labels:
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
node_label
)
if node_edges:
for src, tgt in node_edges:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids)
if not sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
else:
new_source_id = GRAPH_FIELD_SEP.join(sources)
relationships_to_update[(src, tgt)] = new_source_id
logger.debug(
f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}"
if not remaining_sources:
entities_to_delete.add(node_label)
logger.debug(
f"Entity {node_label} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Entity needs to be rebuilt from remaining chunks
entities_to_rebuild[node_label] = remaining_sources
logger.debug(
f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
# Process relationships
# TODO There is performance when iterating get_all_labels for PostgresSQL
for node_label in all_labels:
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
node_label
)
if node_edges:
for src, tgt in node_edges:
# To avoid processing the same edge twice in an undirected graph
if (tgt, src) in relationships_to_delete or (
tgt,
src,
) in relationships_to_rebuild:
continue
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(
edge_data["source_id"].split(GRAPH_FIELD_SEP)
)
remaining_sources = sources - chunk_ids
# Delete entities
if entities_to_delete:
for entity in entities_to_delete:
await self.entities_vdb.delete_entity(entity)
logger.debug(f"Deleted entity {entity} from vector DB")
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
logger.debug(f"Deleted {len(entities_to_delete)} entities from graph")
if not remaining_sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Relationship needs to be rebuilt from remaining chunks
relationships_to_rebuild[(src, tgt)] = (
remaining_sources
)
logger.debug(
f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
# Update entities
for entity, new_source_id in entities_to_update.items():
node_data = await self.chunk_entity_relation_graph.get_node(entity)
if node_data:
node_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_node(
entity, node_data
# 5. Delete chunks from storage
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
# 6. Delete entities that have no remaining sources
if entities_to_delete:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
logger.debug(
f"Updated entity {entity} with new source_id: {new_source_id}"
logger.info(f"Deleted {len(entities_to_delete)} entities")
# 7. Delete relationships that have no remaining sources
if relationships_to_delete:
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
logger.info(f"Deleted {len(relationships_to_delete)} relationships")
# 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
logger.info(
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
)
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
)
# Delete relationships
if relationships_to_delete:
for src, tgt in relationships_to_delete:
rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-")
rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-")
await self.relationships_vdb.delete([rel_id_0, rel_id_1])
logger.debug(f"Deleted relationship {src}-{tgt} from vector DB")
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
logger.debug(
f"Deleted {len(relationships_to_delete)} relationships from graph"
)
# Update relationships
for (src, tgt), new_source_id in relationships_to_update.items():
edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt)
if edge_data:
edge_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_edge(
src, tgt, edge_data
)
logger.debug(
f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}"
)
# 6. Delete original document and status
# 9. Delete original document and status
await self.full_docs.delete([doc_id])
await self.doc_status.delete([doc_id])
# 7. Ensure all indexes are updated
# 10. Ensure all indexes are updated
await self._insert_done()
logger.info(
f"Successfully deleted document {doc_id} and related data. "
f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. "
f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships."
success_message = f"""Successfully deleted document {doc_id}.
Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships.
Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships."""
logger.info(success_message)
return DeletionResult(
status="success",
doc_id=doc_id,
message=success_message,
status_code=200,
)
async def process_data(data_type, vdb, chunk_id):
# Check data (entities or relationships)
storage = await vdb.client_storage
data_with_chunk = [
dp
for dp in storage["data"]
if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP)
]
data_for_vdb = {}
if data_with_chunk:
logger.warning(
f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}"
)
for item in data_with_chunk:
old_sources = item["source_id"].split(GRAPH_FIELD_SEP)
new_sources = [src for src in old_sources if src != chunk_id]
if not new_sources:
logger.info(
f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists"
)
await vdb.delete_entity(item)
else:
item["source_id"] = GRAPH_FIELD_SEP.join(new_sources)
item_id = item["__id__"]
data_for_vdb[item_id] = item.copy()
if data_type == "entities":
data_for_vdb[item_id]["content"] = data_for_vdb[
item_id
].get("content") or (
item.get("entity_name", "")
+ (item.get("description") or "")
)
else: # relationships
data_for_vdb[item_id]["content"] = data_for_vdb[
item_id
].get("content") or (
(item.get("keywords") or "")
+ (item.get("src_id") or "")
+ (item.get("tgt_id") or "")
+ (item.get("description") or "")
)
if data_for_vdb:
await vdb.upsert(data_for_vdb)
logger.info(f"Successfully updated {data_type} in vector DB")
# Add verification step
async def verify_deletion():
# Verify if the document has been deleted
if await self.full_docs.get_by_id(doc_id):
logger.warning(f"Document {doc_id} still exists in full_docs")
# Verify if chunks have been deleted
all_remaining_chunks = await self.text_chunks.get_all()
remaining_related_chunks = {
chunk_id: chunk_data
for chunk_id, chunk_data in all_remaining_chunks.items()
if isinstance(chunk_data, dict)
and chunk_data.get("full_doc_id") == doc_id
}
if remaining_related_chunks:
logger.warning(
f"Found {len(remaining_related_chunks)} remaining chunks"
)
# Verify entities and relationships
for chunk_id in chunk_ids:
await process_data("entities", self.entities_vdb, chunk_id)
await process_data(
"relationships", self.relationships_vdb, chunk_id
)
await verify_deletion()
except Exception as e:
logger.error(f"Error while deleting document {doc_id}: {e}")
error_message = f"Error while deleting document {doc_id}: {e}"
logger.error(error_message)
logger.error(traceback.format_exc())
return DeletionResult(
status="fail",
doc_id=doc_id,
message=error_message,
status_code=500,
)
async def adelete_by_entity(self, entity_name: str) -> None:
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships.
Args:
entity_name: Name of the entity to delete
entity_name: Name of the entity to delete.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
"""
from .utils_graph import adelete_by_entity
@ -1947,16 +1928,29 @@ class LightRAG:
entity_name,
)
def delete_by_entity(self, entity_name: str) -> None:
def delete_by_entity(self, entity_name: str) -> DeletionResult:
"""Synchronously delete an entity and all its relationships.
Args:
entity_name: Name of the entity to delete.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(self.adelete_by_entity(entity_name))
async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
async def adelete_by_relation(
self, source_entity: str, target_entity: str
) -> DeletionResult:
"""Asynchronously delete a relation between two entities.
Args:
source_entity: Name of the source entity
target_entity: Name of the target entity
source_entity: Name of the source entity.
target_entity: Name of the target entity.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
"""
from .utils_graph import adelete_by_relation
@ -1967,7 +1961,18 @@ class LightRAG:
target_entity,
)
def delete_by_relation(self, source_entity: str, target_entity: str) -> None:
def delete_by_relation(
self, source_entity: str, target_entity: str
) -> DeletionResult:
"""Synchronously delete a relation between two entities.
Args:
source_entity: Name of the source entity.
target_entity: Name of the target entity.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.adelete_by_relation(source_entity, target_entity)

View file

@ -240,6 +240,466 @@ async def _handle_single_relationship_extraction(
)
async def _rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, set[str]],
relationships_to_rebuild: dict[tuple[str, str], set[str]],
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
text_chunks: BaseKVStorage,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results
This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process.
Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
"""
if not entities_to_rebuild and not relationships_to_rebuild:
return
# Get all referenced chunk IDs
all_referenced_chunk_ids = set()
for chunk_ids in entities_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
logger.info(
f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
)
# Get cached extraction results for these chunks
cached_results = await _get_cached_extraction_results(
llm_response_cache, all_referenced_chunk_ids
)
if not cached_results:
logger.warning("No cached extraction results found, cannot rebuild")
return
# Process cached results to get entities and relationships for each chunk
chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
for chunk_id, extraction_result in cached_results.items():
try:
entities, relationships = await _parse_extraction_result(
text_chunks=text_chunks,
extraction_result=extraction_result,
chunk_id=chunk_id,
)
chunk_entities[chunk_id] = entities
chunk_relationships[chunk_id] = relationships
except Exception as e:
logger.error(
f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
)
continue
# Rebuild entities
for entity_name, chunk_ids in entities_to_rebuild.items():
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions"
)
except Exception as e:
logger.error(f"Failed to rebuild entity {entity_name}: {e}")
# Rebuild relationships
for (src, tgt), chunk_ids in relationships_to_rebuild.items():
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions"
)
except Exception as e:
logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
logger.info("Completed rebuilding knowledge from cached extractions")
async def _get_cached_extraction_results(
llm_response_cache: BaseKVStorage, chunk_ids: set[str]
) -> dict[str, str]:
"""Get cached extraction results for specific chunk IDs
Args:
chunk_ids: Set of chunk IDs to get cached results for
Returns:
Dict mapping chunk_id -> extraction_result_text
"""
cached_results = {}
# Get all cached data for "default" mode (entity extraction cache)
default_cache = await llm_response_cache.get_by_id("default") or {}
for cache_key, cache_entry in default_cache.items():
if (
isinstance(cache_entry, dict)
and cache_entry.get("cache_type") == "extract"
and cache_entry.get("chunk_id") in chunk_ids
):
chunk_id = cache_entry["chunk_id"]
extraction_result = cache_entry["return"]
cached_results[chunk_id] = extraction_result
logger.info(
f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
)
return cached_results
async def _parse_extraction_result(
text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str
) -> tuple[dict, dict]:
"""Parse cached extraction result using the same logic as extract_entities
Args:
extraction_result: The cached LLM extraction result
chunk_id: The chunk ID for source tracking
Returns:
Tuple of (entities_dict, relationships_dict)
"""
# Get chunk data for file_path
chunk_data = await text_chunks.get_by_id(chunk_id)
file_path = (
chunk_data.get("file_path", "unknown_source")
if chunk_data
else "unknown_source"
)
context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
)
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
# Parse the extraction result using the same logic as in extract_entities
records = split_string_by_multi_markers(
extraction_result,
[context_base["record_delimiter"], context_base["completion_delimiter"]],
)
for record in records:
record = re.search(r"\((.*)\)", record)
if record is None:
continue
record = record.group(1)
record_attributes = split_string_by_multi_markers(
record, [context_base["tuple_delimiter"]]
)
# Try to parse as entity
entity_data = await _handle_single_entity_extraction(
record_attributes, chunk_id, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
continue
# Try to parse as relationship
relationship_data = await _handle_single_relationship_extraction(
record_attributes, chunk_id, file_path
)
if relationship_data is not None:
maybe_edges[
(relationship_data["src_id"], relationship_data["tgt_id"])
].append(relationship_data)
return dict(maybe_nodes), dict(maybe_edges)
async def _rebuild_single_entity(
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
entity_name: str,
chunk_ids: set[str],
chunk_entities: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single entity from cached extraction results"""
# Get current entity data
current_entity = await knowledge_graph_inst.get_node(entity_name)
if not current_entity:
return
# Helper function to update entity in both graph and vector storage
async def _update_entity_storage(
final_description: str, entity_type: str, file_paths: set[str]
):
# Update entity in graph storage
updated_entity_data = {
**current_entity,
"description": final_description,
"entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_entity.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
# Update entity in vector database
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
# Delete old vector record first
try:
await entities_vdb.delete([entity_vdb_id])
except Exception as e:
logger.debug(
f"Could not delete old entity vector record {entity_vdb_id}: {e}"
)
# Insert new vector record
entity_content = f"{entity_name}\n{final_description}"
await entities_vdb.upsert(
{
entity_vdb_id: {
"content": entity_content,
"entity_name": entity_name,
"source_id": updated_entity_data["source_id"],
"description": final_description,
"entity_type": entity_type,
"file_path": updated_entity_data["file_path"],
}
}
)
# Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str:
if len(combined_description) > global_config["summary_to_max_tokens"]:
return await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
return combined_description
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name])
if not all_entity_data:
logger.warning(
f"No cached entity data found for {entity_name}, trying to rebuild from relationships"
)
# Get all edges connected to this entity
edges = await knowledge_graph_inst.get_node_edges(entity_name)
if not edges:
logger.warning(f"No relationships found for entity {entity_name}")
return
# Collect relationship data to extract entity information
relationship_descriptions = []
file_paths = set()
# Get edge data for all connected relationships
for src_id, tgt_id in edges:
edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id)
if edge_data:
if edge_data.get("description"):
relationship_descriptions.append(edge_data["description"])
if edge_data.get("file_path"):
edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP)
file_paths.update(edge_file_paths)
# Generate description from relationships or fallback to current
if relationship_descriptions:
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
final_description = await _generate_final_description(combined_description)
else:
final_description = current_entity.get("description", "")
entity_type = current_entity.get("entity_type", "UNKNOWN")
await _update_entity_storage(final_description, entity_type, file_paths)
return
# Process cached entity data
descriptions = []
entity_types = []
file_paths = set()
for entity_data in all_entity_data:
if entity_data.get("description"):
descriptions.append(entity_data["description"])
if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Get most common entity type
entity_type = (
max(set(entity_types), key=entity_types.count)
if entity_types
else current_entity.get("entity_type", "UNKNOWN")
)
# Generate final description and update storage
final_description = await _generate_final_description(combined_description)
await _update_entity_storage(final_description, entity_type, file_paths)
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: set[str],
chunk_relationships: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single relationship from cached extraction results"""
# Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
if not current_relationship:
return
# Collect all relationship data from relevant chunks
all_relationship_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_relationships:
# Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
for edge_key in [(src, tgt), (tgt, src)]:
if edge_key in chunk_relationships[chunk_id]:
all_relationship_data.extend(
chunk_relationships[chunk_id][edge_key]
)
if not all_relationship_data:
logger.warning(f"No cached relationship data found for {src}-{tgt}")
return
# Merge descriptions and keywords
descriptions = []
keywords = []
weights = []
file_paths = set()
for rel_data in all_relationship_data:
if rel_data.get("description"):
descriptions.append(rel_data["description"])
if rel_data.get("keywords"):
keywords.append(rel_data["keywords"])
if rel_data.get("weight"):
weights.append(rel_data["weight"])
if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"])
# Combine descriptions and keywords
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_relationship.get("description", "")
)
combined_keywords = (
", ".join(set(keywords))
if keywords
else current_relationship.get("keywords", "")
)
# weight = (
# sum(weights) / len(weights)
# if weights
# else current_relationship.get("weight", 1.0)
# )
weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
# Use summary if description is too long
if len(combined_description) > global_config["summary_to_max_tokens"]:
final_description = await _handle_entity_relation_summary(
f"{src}-{tgt}",
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# Update relationship in graph storage
updated_relationship_data = {
**current_relationship,
"description": final_description,
"keywords": combined_keywords,
"weight": weight,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_relationship.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
# Delete old vector records first (both directions to be safe)
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
# Insert new vector record
rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
await relationships_vdb.upsert(
{
rel_vdb_id: {
"src_id": src,
"tgt_id": tgt,
"source_id": updated_relationship_data["source_id"],
"content": rel_content,
"keywords": combined_keywords,
"description": final_description,
"weight": weight,
"file_path": updated_relationship_data["file_path"],
}
}
)
async def _merge_nodes_then_upsert(
entity_name: str,
nodes_data: list[dict],
@ -757,6 +1217,7 @@ async def extract_entities(
use_llm_func,
llm_response_cache=llm_response_cache,
cache_type="extract",
chunk_id=chunk_key,
)
history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
@ -773,6 +1234,7 @@ async def extract_entities(
llm_response_cache=llm_response_cache,
history_messages=history,
cache_type="extract",
chunk_id=chunk_key,
)
history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)

View file

@ -990,6 +990,7 @@ class CacheData:
max_val: float | None = None
mode: str = "default"
cache_type: str = "query"
chunk_id: str | None = None
async def save_to_cache(hashing_kv, cache_data: CacheData):
@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
mode_cache[cache_data.args_hash] = {
"return": cache_data.content,
"cache_type": cache_data.cache_type,
"chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None,
"embedding": cache_data.quantized.tobytes().hex()
if cache_data.quantized is not None
else None,
@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache(
max_tokens: int = None,
history_messages: list[dict[str, str]] = None,
cache_type: str = "extract",
chunk_id: str | None = None,
) -> str:
"""Call LLM function with cache support
@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache(
max_tokens: Maximum tokens for generation
history_messages: History messages list
cache_type: Type of cache
chunk_id: Chunk identifier to store in cache
Returns:
LLM response text
@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache(
content=res,
prompt=_prompt,
cache_type=cache_type,
chunk_id=chunk_id,
),
)

View file

@ -4,6 +4,7 @@ import time
import asyncio
from typing import Any, cast
from .base import DeletionResult
from .kg.shared_storage import get_graph_db_lock
from .prompt import GRAPH_FIELD_SEP
from .utils import compute_mdhash_id, logger
@ -12,7 +13,7 @@ from .base import StorageNameSpace
async def adelete_by_entity(
chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str
) -> None:
) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships.
Args:
@ -25,18 +26,43 @@ async def adelete_by_entity(
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try:
# Check if the entity exists
if not await chunk_entity_relation_graph.has_node(entity_name):
logger.warning(f"Entity '{entity_name}' not found.")
return DeletionResult(
status="not_found",
doc_id=entity_name,
message=f"Entity '{entity_name}' not found.",
status_code=404,
)
# Retrieve related relationships before deleting the node
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
related_relations_count = len(edges) if edges else 0
await entities_vdb.delete_entity(entity_name)
await relationships_vdb.delete_entity_relation(entity_name)
await chunk_entity_relation_graph.delete_node(entity_name)
logger.info(
f"Entity '{entity_name}' and its relationships have been deleted."
)
message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted."
logger.info(message)
await _delete_by_entity_done(
entities_vdb, relationships_vdb, chunk_entity_relation_graph
)
return DeletionResult(
status="success",
doc_id=entity_name,
message=message,
status_code=200,
)
except Exception as e:
logger.error(f"Error while deleting entity '{entity_name}': {e}")
error_message = f"Error while deleting entity '{entity_name}': {e}"
logger.error(error_message)
return DeletionResult(
status="fail",
doc_id=entity_name,
message=error_message,
status_code=500,
)
async def _delete_by_entity_done(
@ -60,7 +86,7 @@ async def adelete_by_relation(
relationships_vdb,
source_entity: str,
target_entity: str,
) -> None:
) -> DeletionResult:
"""Asynchronously delete a relation between two entities.
Args:
@ -69,6 +95,7 @@ async def adelete_by_relation(
source_entity: Name of the source entity
target_entity: Name of the target entity
"""
relation_str = f"{source_entity} -> {target_entity}"
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
@ -78,29 +105,45 @@ async def adelete_by_relation(
source_entity, target_entity
)
if not edge_exists:
logger.warning(
f"Relation from '{source_entity}' to '{target_entity}' does not exist"
message = f"Relation from '{source_entity}' to '{target_entity}' does not exist"
logger.warning(message)
return DeletionResult(
status="not_found",
doc_id=relation_str,
message=message,
status_code=404,
)
return
# Delete relation from vector database
relation_id = compute_mdhash_id(
source_entity + target_entity, prefix="rel-"
)
await relationships_vdb.delete([relation_id])
rel_ids_to_delete = [
compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
compute_mdhash_id(target_entity + source_entity, prefix="rel-"),
]
await relationships_vdb.delete(rel_ids_to_delete)
# Delete relation from knowledge graph
await chunk_entity_relation_graph.remove_edges(
[(source_entity, target_entity)]
)
logger.info(
f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
)
message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
logger.info(message)
await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph)
return DeletionResult(
status="success",
doc_id=relation_str,
message=message,
status_code=200,
)
except Exception as e:
logger.error(
f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
error_message = f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
logger.error(error_message)
return DeletionResult(
status="fail",
doc_id=relation_str,
message=error_message,
status_code=500,
)