diff --git a/README-zh.md b/README-zh.md index 9e892012..685c9468 100644 --- a/README-zh.md +++ b/README-zh.md @@ -932,6 +932,94 @@ rag.insert_custom_kg(custom_kg) +## 删除功能 + +LightRAG提供了全面的删除功能,允许您删除文档、实体和关系。 + +
+ 删除实体 + +您可以通过实体名称删除实体及其所有关联关系: + +```python +# 删除实体及其所有关系(同步版本) +rag.delete_by_entity("Google") + +# 异步版本 +await rag.adelete_by_entity("Google") +``` + +删除实体时会: +- 从知识图谱中移除该实体节点 +- 删除该实体的所有关联关系 +- 从向量数据库中移除相关的嵌入向量 +- 保持知识图谱的完整性 + +
+ +
+ 删除关系 + +您可以删除两个特定实体之间的关系: + +```python +# 删除两个实体之间的关系(同步版本) +rag.delete_by_relation("Google", "Gmail") + +# 异步版本 +await rag.adelete_by_relation("Google", "Gmail") +``` + +删除关系时会: +- 移除指定的关系边 +- 从向量数据库中删除关系的嵌入向量 +- 保留两个实体节点及其他关系 + +
+ +
+ 通过文档ID删除 + +您可以通过文档ID删除整个文档及其相关的所有知识: + +```python +# 通过文档ID删除(异步版本) +await rag.adelete_by_doc_id("doc-12345") +``` + +通过文档ID删除时的优化处理: +- **智能清理**:自动识别并删除仅属于该文档的实体和关系 +- **保留共享知识**:如果实体或关系在其他文档中也存在,则会保留并重新构建描述 +- **缓存优化**:清理相关的LLM缓存以减少存储开销 +- **增量重建**:从剩余文档重新构建受影响的实体和关系描述 + +删除过程包括: +1. 删除文档相关的所有文本块 +2. 识别仅属于该文档的实体和关系并删除 +3. 重新构建在其他文档中仍存在的实体和关系 +4. 更新所有相关的向量索引 +5. 清理文档状态记录 + +注意:通过文档ID删除是一个异步操作,因为它涉及复杂的知识图谱重构过程。 + +
+ +
+ 删除注意事项 + +**重要提醒:** + +1. **不可逆操作**:所有删除操作都是不可逆的,请谨慎使用 +2. **性能考虑**:删除大量数据时可能需要一些时间,特别是通过文档ID删除 +3. **数据一致性**:删除操作会自动维护知识图谱和向量数据库之间的一致性 +4. **备份建议**:在执行重要删除操作前建议备份数据 + +**批量删除建议:** +- 对于批量删除操作,建议使用异步方法以获得更好的性能 +- 大规模删除时,考虑分批进行以避免系统负载过高 + +
+ ## 实体合并
diff --git a/README.md b/README.md index 8fdf4439..fa227c60 100644 --- a/README.md +++ b/README.md @@ -988,6 +988,89 @@ These operations maintain data consistency across both the graph database and ve
+## Delete Functions + +LightRAG provides comprehensive deletion capabilities, allowing you to delete documents, entities, and relationships. + +
+ Delete Entities + +You can delete entities by their name along with all associated relationships: + +```python +# Delete entity and all its relationships (synchronous version) +rag.delete_by_entity("Google") + +# Asynchronous version +await rag.adelete_by_entity("Google") +``` + +When deleting an entity: +- Removes the entity node from the knowledge graph +- Deletes all associated relationships +- Removes related embedding vectors from the vector database +- Maintains knowledge graph integrity + +
+ +
+ Delete Relations + +You can delete relationships between two specific entities: + +```python +# Delete relationship between two entities (synchronous version) +rag.delete_by_relation("Google", "Gmail") + +# Asynchronous version +await rag.adelete_by_relation("Google", "Gmail") +``` + +When deleting a relationship: +- Removes the specified relationship edge +- Deletes the relationship's embedding vector from the vector database +- Preserves both entity nodes and their other relationships + +
+ +
+ Delete by Document ID + +You can delete an entire document and all its related knowledge through document ID: + +```python +# Delete by document ID (asynchronous version) +await rag.adelete_by_doc_id("doc-12345") +``` + +Optimized processing when deleting by document ID: +- **Smart Cleanup**: Automatically identifies and removes entities and relationships that belong only to this document +- **Preserve Shared Knowledge**: If entities or relationships exist in other documents, they are preserved and their descriptions are rebuilt +- **Cache Optimization**: Clears related LLM cache to reduce storage overhead +- **Incremental Rebuilding**: Reconstructs affected entity and relationship descriptions from remaining documents + +The deletion process includes: +1. Delete all text chunks related to the document +2. Identify and delete entities and relationships that belong only to this document +3. Rebuild entities and relationships that still exist in other documents +4. Update all related vector indexes +5. Clean up document status records + +Note: Deletion by document ID is an asynchronous operation as it involves complex knowledge graph reconstruction processes. + +
+ +**Important Reminders:** + +1. **Irreversible Operations**: All deletion operations are irreversible, please use with caution +2. **Performance Considerations**: Deleting large amounts of data may take some time, especially deletion by document ID +3. **Data Consistency**: Deletion operations automatically maintain consistency between the knowledge graph and vector database +4. **Backup Recommendations**: Consider backing up data before performing important deletion operations + +**Batch Deletion Recommendations:** +- For batch deletion operations, consider using asynchronous methods for better performance +- For large-scale deletions, consider processing in batches to avoid excessive system load + ## Entity Merging
diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index 5d14dfb6..7a4a498a 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0173" +__api_version__ = "0174" diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index bf4f11f5..0930c1cd 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -355,7 +355,13 @@ def create_app(args): ) # Add routes - app.include_router(create_document_routes(rag, doc_manager, api_key)) + app.include_router( + create_document_routes( + rag, + doc_manager, + api_key, + ) + ) app.include_router(create_query_routes(rag, api_key, args.top_k)) app.include_router(create_graph_routes(rag, api_key)) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index b2e9baf8..06862fe6 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -12,11 +12,18 @@ import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal -from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + HTTPException, + UploadFile, +) from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG -from lightrag.base import DocProcessingStatus, DocStatus +from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args @@ -252,6 +259,40 @@ Attributes: """ +class DeleteDocRequest(BaseModel): + doc_id: str = Field(..., description="The ID of the document to delete.") + + @field_validator("doc_id", mode="after") + @classmethod + def validate_doc_id(cls, doc_id: str) -> str: + if not doc_id or not doc_id.strip(): + raise ValueError("Document ID cannot be empty") + return doc_id.strip() + + +class DeleteEntityRequest(BaseModel): + entity_name: str = Field(..., description="The name of the entity to delete.") + + @field_validator("entity_name", mode="after") + @classmethod + def validate_entity_name(cls, entity_name: str) -> str: + if not entity_name or not entity_name.strip(): + raise ValueError("Entity name cannot be empty") + return entity_name.strip() + + +class DeleteRelationRequest(BaseModel): + source_entity: str = Field(..., description="The name of the source entity.") + target_entity: str = Field(..., description="The name of the target entity.") + + @field_validator("source_entity", "target_entity", mode="after") + @classmethod + def validate_entity_names(cls, entity_name: str) -> str: + if not entity_name or not entity_name.strip(): + raise ValueError("Entity name cannot be empty") + return entity_name.strip() + + class DocStatusResponse(BaseModel): id: str = Field(description="Document identifier") content_summary: str = Field(description="Summary of document content") @@ -1318,6 +1359,119 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + class DeleteDocByIdResponse(BaseModel): + """Response model for single document deletion operation.""" + + status: Literal["success", "fail", "not_found", "busy"] = Field( + description="Status of the deletion operation" + ) + message: str = Field(description="Message describing the operation result") + doc_id: Optional[str] = Field( + default=None, description="The ID of the document." + ) + + @router.delete( + "/delete_document", + response_model=DeleteDocByIdResponse, + dependencies=[Depends(combined_auth)], + summary="Delete a document and all its associated data by its ID.", + ) + + # TODO This method needs to be modified to be asynchronous (please do not use) + async def delete_document( + delete_request: DeleteDocRequest, + ) -> DeleteDocByIdResponse: + """ + This method needs to be modified to be asynchronous (please do not use) + + Deletes a specific document and all its associated data, including its status, + text chunks, vector embeddings, and any related graph data. + It is disabled when llm cache for entity extraction is disabled. + + This operation is irreversible and will interact with the pipeline status. + + Args: + delete_request (DeleteDocRequest): The request containing the document ID. + + Returns: + DeleteDocByIdResponse: The result of the deletion operation. + - status="success": The document was successfully deleted. + - status="not_found": The document with the specified ID was not found. + - status="fail": The deletion operation failed. + - status="busy": The pipeline is busy with another operation. + + Raises: + HTTPException: + - 500: If an unexpected internal error occurs. + """ + # The rag object is initialized from the server startup args, + # so we can access its properties here. + if not rag.enable_llm_cache_for_entity_extract: + raise HTTPException( + status_code=403, + detail="Operation not allowed when LLM cache for entity extraction is disabled.", + ) + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + ) + + doc_id = delete_request.doc_id + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete document while pipeline is busy", + doc_id=doc_id, + ) + pipeline_status.update( + { + "busy": True, + "job_name": f"Deleting Document: {doc_id}", + "job_start": datetime.now().isoformat(), + "latest_message": "Starting document deletion process", + } + ) + # Use slice assignment to clear the list in place + pipeline_status["history_messages"][:] = [ + f"Starting deletion for doc_id: {doc_id}" + ] + + try: + result = await rag.adelete_by_doc_id(doc_id) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(result.message) + + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + + return DeleteDocByIdResponse( + doc_id=result.doc_id, + message=result.message, + status=result.status, + ) + + except Exception as e: + error_msg = f"Error deleting document {doc_id}: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(error_msg) + # Re-raise as HTTPException for consistent error handling by FastAPI + raise HTTPException(status_code=500, detail=error_msg) + finally: + async with pipeline_status_lock: + pipeline_status["busy"] = False + completion_msg = f"Document deletion process for {doc_id} completed." + pipeline_status["latest_message"] = completion_msg + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(completion_msg) + @router.post( "/clear_cache", response_model=ClearCacheResponse, @@ -1371,4 +1525,77 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.delete( + "/delete_entity", + response_model=DeletionResult, + dependencies=[Depends(combined_auth)], + ) + async def delete_entity(request: DeleteEntityRequest): + """ + Delete an entity and all its relationships from the knowledge graph. + + Args: + request (DeleteEntityRequest): The request body containing the entity name. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + + Raises: + HTTPException: If the entity is not found (404) or an error occurs (500). + """ + try: + result = await rag.adelete_by_entity(entity_name=request.entity_name) + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + # Set doc_id to empty string since this is an entity operation, not document + result.doc_id = "" + return result + except HTTPException: + raise + except Exception as e: + error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + + @router.delete( + "/delete_relation", + response_model=DeletionResult, + dependencies=[Depends(combined_auth)], + ) + async def delete_relation(request: DeleteRelationRequest): + """ + Delete a relationship between two entities from the knowledge graph. + + Args: + request (DeleteRelationRequest): The request body containing the source and target entity names. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + + Raises: + HTTPException: If the relation is not found (404) or an error occurs (500). + """ + try: + result = await rag.adelete_by_relation( + source_entity=request.source_entity, + target_entity=request.target_entity, + ) + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + # Set doc_id to empty string since this is a relation operation, not document + result.doc_id = "" + return result + except HTTPException: + raise + except Exception as e: + error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + return router diff --git a/lightrag/base.py b/lightrag/base.py index c6035f70..84fc7564 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC): False: if the cache drop failed, or the cache mode is not supported """ + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs + + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed + + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed, or the operation is not supported + # """ + @dataclass class BaseGraphStorage(StorageNameSpace, ABC): @@ -598,3 +613,13 @@ class StoragesStatus(str, Enum): CREATED = "created" INITIALIZED = "initialized" FINALIZED = "finalized" + + +@dataclass +class DeletionResult: + """Represents the result of a deletion operation.""" + + status: Literal["success", "not_found", "fail"] + doc_id: str + message: str + status_code: int = 200 diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2d44ce00..fa819d4a 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage): except Exception: return False + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs + + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed + + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed + # """ + # if not chunk_ids: + # return False + + # try: + # async with self._storage_lock: + # # Iterate through all cache modes to find entries with matching chunk_ids + # for mode_key, mode_data in list(self._data.items()): + # if isinstance(mode_data, dict): + # # Check each cached entry in this mode + # for cache_key, cache_entry in list(mode_data.items()): + # if ( + # isinstance(cache_entry, dict) + # and cache_entry.get("chunk_id") in chunk_ids + # ): + # # Remove this cache entry + # del mode_data[cache_key] + # logger.debug( + # f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" + # ) + + # # If the mode is now empty, remove it entirely + # if not mode_data: + # del self._data[mode_key] + + # # Set update flags to notify persistence is needed + # await set_all_update_flags(self.namespace) + + # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") + # return True + # except Exception as e: + # logger.error(f"Error clearing cache by chunk IDs: {e}") + # return False + async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 3c57ae34..7fe3da15 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -1,4 +1,3 @@ -import inspect import os import re from dataclasses import dataclass @@ -307,7 +306,7 @@ class Neo4JStorage(BaseGraphStorage): for label in node_dict["labels"] if label != "base" ] - logger.debug(f"Neo4j query node {query} return: {node_dict}") + # logger.debug(f"Neo4j query node {query} return: {node_dict}") return node_dict return None finally: @@ -382,9 +381,9 @@ class Neo4JStorage(BaseGraphStorage): return 0 degree = record["degree"] - logger.debug( - f"Neo4j query node degree for {node_id} return: {degree}" - ) + # logger.debug( + # f"Neo4j query node degree for {node_id} return: {degree}" + # ) return degree finally: await result.consume() # Ensure result is fully consumed @@ -424,7 +423,7 @@ class Neo4JStorage(BaseGraphStorage): logger.warning(f"No node found with label '{nid}'") degrees[nid] = 0 - logger.debug(f"Neo4j batch node degree query returned: {degrees}") + # logger.debug(f"Neo4j batch node degree query returned: {degrees}") return degrees async def edge_degree(self, src_id: str, tgt_id: str) -> int: @@ -512,7 +511,7 @@ class Neo4JStorage(BaseGraphStorage): if records: try: edge_result = dict(records[0]["edge_properties"]) - logger.debug(f"Result: {edge_result}") + # logger.debug(f"Result: {edge_result}") # Ensure required keys exist with defaults required_keys = { "weight": 0.0, @@ -528,9 +527,9 @@ class Neo4JStorage(BaseGraphStorage): f"missing {key}, using default: {default_value}" ) - logger.debug( - f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}" - ) + # logger.debug( + # f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}" + # ) return edge_result except (KeyError, TypeError, ValueError) as e: logger.error( @@ -545,9 +544,9 @@ class Neo4JStorage(BaseGraphStorage): "keywords": None, } - logger.debug( - f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}" - ) + # logger.debug( + # f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}" + # ) # Return None when no edge found return None finally: @@ -766,9 +765,6 @@ class Neo4JStorage(BaseGraphStorage): result = await tx.run( query, entity_id=node_id, properties=properties ) - logger.debug( - f"Upserted node with entity_id '{node_id}' and properties: {properties}" - ) await result.consume() # Ensure result is fully consumed await session.execute_write(execute_upsert) @@ -824,12 +820,7 @@ class Neo4JStorage(BaseGraphStorage): properties=edge_properties, ) try: - records = await result.fetch(2) - if records: - logger.debug( - f"Upserted edge from '{source_node_id}' to '{target_node_id}'" - f"with properties: {edge_properties}" - ) + await result.fetch(2) finally: await result.consume() # Ensure result is consumed diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 465643f0..bacd8894 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -106,6 +106,35 @@ class PostgreSQLDB: ): pass + async def _migrate_llm_cache_add_chunk_id(self): + """Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" + try: + # Check if chunk_id column exists + check_column_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_llm_cache' + AND column_name = 'chunk_id' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") + add_column_sql = """ + ALTER TABLE LIGHTRAG_LLM_CACHE + ADD COLUMN chunk_id VARCHAR(255) NULL + """ + await self.execute(add_column_sql) + logger.info( + "Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table" + ) + else: + logger.info( + "chunk_id column already exists in LIGHTRAG_LLM_CACHE table" + ) + except Exception as e: + logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}") + async def _migrate_timestamp_columns(self): """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time""" # Tables and columns that need migration @@ -203,6 +232,13 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") # Don't throw an exception, allow the initialization process to continue + # Migrate LLM cache table to add chunk_id field if needed + try: + await self._migrate_llm_cache_add_chunk_id() + except Exception as e: + logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") + # Don't throw an exception, allow the initialization process to continue + async def query( self, sql: str, @@ -253,25 +289,31 @@ class PostgreSQLDB: sql: str, data: dict[str, Any] | None = None, upsert: bool = False, + ignore_if_exists: bool = False, with_age: bool = False, graph_name: str | None = None, ): try: async with self.pool.acquire() as connection: # type: ignore if with_age and graph_name: - await self.configure_age(connection, graph_name) # type: ignore + await self.configure_age(connection, graph_name) elif with_age and not graph_name: raise ValueError("Graph name is required when with_age is True") if data is None: - await connection.execute(sql) # type: ignore + await connection.execute(sql) else: - await connection.execute(sql, *data.values()) # type: ignore + await connection.execute(sql, *data.values()) except ( asyncpg.exceptions.UniqueViolationError, asyncpg.exceptions.DuplicateTableError, + asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error + asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists" ) as e: - if upsert: + if ignore_if_exists: + # If the flag is set, just ignore these specific errors + pass + elif upsert: print("Key value duplicate, but upsert succeeded.") else: logger.error(f"Upsert error: {e}") @@ -497,6 +539,7 @@ class PGKVStorage(BaseKVStorage): "original_prompt": v["original_prompt"], "return_value": v["return"], "mode": mode, + "chunk_id": v.get("chunk_id"), } await self.db.execute(upsert_sql, _data) @@ -1175,16 +1218,15 @@ class PGGraphStorage(BaseGraphStorage): ] for query in queries: - try: - await self.db.execute( - query, - upsert=True, - with_age=True, - graph_name=self.graph_name, - ) - # logger.info(f"Successfully executed: {query}") - except Exception: - continue + # Use the new flag to silently ignore "already exists" errors + # at the source, preventing log spam. + await self.db.execute( + query, + upsert=True, + ignore_if_exists=True, # Pass the new flag + with_age=True, + graph_name=self.graph_name, + ) async def finalize(self): if self.db is not None: @@ -2357,6 +2399,7 @@ TABLES = { mode varchar(32) NOT NULL, original_prompt TEXT, return_value TEXT, + chunk_id VARCHAR(255) NULL, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id) @@ -2389,10 +2432,10 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2 """, - "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 """, - "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3 """, "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content @@ -2402,7 +2445,7 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids}) """, - "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids}) """, "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", @@ -2411,12 +2454,13 @@ SQL_TEMPLATES = { ON CONFLICT (workspace,id) DO UPDATE SET content = $2, update_time = CURRENT_TIMESTAMP """, - "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode) - VALUES ($1, $2, $3, $4, $5) + "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (workspace,mode,id) DO UPDATE SET original_prompt = EXCLUDED.original_prompt, return_value=EXCLUDED.return_value, mode=EXCLUDED.mode, + chunk_id=EXCLUDED.chunk_id, update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 6979680c..f631992d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -35,6 +35,7 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, + get_graph_db_lock, ) from .base import ( @@ -47,6 +48,7 @@ from .base import ( QueryParam, StorageNameSpace, StoragesStatus, + DeletionResult, ) from .namespace import NameSpace, make_namespace from .operate import ( @@ -56,6 +58,7 @@ from .operate import ( kg_query, naive_query, query_with_keywords, + _rebuild_knowledge_from_chunks, ) from .prompt import GRAPH_FIELD_SEP from .utils import ( @@ -1207,6 +1210,7 @@ class LightRAG: cast(StorageNameSpace, storage_inst).index_done_callback() for storage_inst in [ # type: ignore self.full_docs, + self.doc_status, self.text_chunks, self.llm_response_cache, self.entities_vdb, @@ -1674,24 +1678,45 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.) - # Document delete is not working properly for most of the storage implementations. - async def adelete_by_doc_id(self, doc_id: str) -> None: - """Delete a document and all its related data + async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult: + """Delete a document and all its related data, including chunks, graph elements, and cached entries. + + This method orchestrates a comprehensive deletion process for a given document ID. + It ensures that not only the document itself but also all its derived and associated + data across different storage layers are removed. This includes: + 1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`. + 2. **Chunks**: Removes all associated text chunks from `chunks_vdb`. + 3. **Graph Data**: + - Deletes related entities from `entities_vdb`. + - Deletes related relationships from `relationships_vdb`. + - Removes corresponding nodes and edges from the `chunk_entity_relation_graph`. + 4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers + a reconstruction of their data from the remaining chunks to ensure consistency. Args: - doc_id: Document ID to delete + doc_id (str): The unique identifier of the document to be deleted. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + - `status` (str): "success", "not_found", or "failure". + - `doc_id` (str): The ID of the document attempted to be deleted. + - `message` (str): A summary of the operation's result. + - `status_code` (int): HTTP status code (e.g., 200, 404, 500). """ try: # 1. Get the document status and related data if not await self.doc_status.get_by_id(doc_id): logger.warning(f"Document {doc_id} not found") - return + return DeletionResult( + status="not_found", + doc_id=doc_id, + message=f"Document {doc_id} not found.", + status_code=404, + ) - logger.debug(f"Starting deletion for document {doc_id}") + logger.info(f"Starting optimized deletion for document {doc_id}") # 2. Get all chunks related to this document - # Find all chunks where full_doc_id equals the current doc_id all_chunks = await self.text_chunks.get_all() related_chunks = { chunk_id: chunk_data @@ -1702,241 +1727,197 @@ class LightRAG: if not related_chunks: logger.warning(f"No chunks found for document {doc_id}") - return + # Still need to delete the doc status and full doc + await self.full_docs.delete([doc_id]) + await self.doc_status.delete([doc_id]) + return DeletionResult( + status="success", + doc_id=doc_id, + message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.", + status_code=200, + ) - # Get all related chunk IDs chunk_ids = set(related_chunks.keys()) - logger.debug(f"Found {len(chunk_ids)} chunks to delete") + logger.info(f"Found {len(chunk_ids)} chunks to delete") - # TODO: self.entities_vdb.client_storage only works for local storage, need to fix this + # # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks + # logger.info("Clearing LLM cache for related chunks...") + # cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( + # list(chunk_ids) + # ) + # if cache_cleared: + # logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") + # else: + # logger.warning( + # "Failed to clear chunk cache or cache clearing not supported" + # ) - # 3. Before deleting, check the related entities and relationships for these chunks - for chunk_id in chunk_ids: - # Check entities - entities_storage = await self.entities_vdb.client_storage - entities = [ - dp - for dp in entities_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities") - - # Check relationships - relationships_storage = await self.relationships_vdb.client_storage - relations = [ - dp - for dp in relationships_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations") - - # Continue with the original deletion process... - - # 4. Delete chunks from vector database - if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - - # 5. Find and process entities and relationships that have these chunks as source - # Get all nodes and edges from the graph storage using storage-agnostic methods + # 4. Analyze entities and relationships that will be affected entities_to_delete = set() - entities_to_update = {} # entity_name -> new_source_id + entities_to_rebuild = {} # entity_name -> remaining_chunk_ids relationships_to_delete = set() - relationships_to_update = {} # (src, tgt) -> new_source_id + relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Process entities - use storage-agnostic methods - all_labels = await self.chunk_entity_relation_graph.get_all_labels() - for node_label in all_labels: - node_data = await self.chunk_entity_relation_graph.get_node(node_label) - if node_data and "source_id" in node_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: - entities_to_delete.add(node_label) - logger.debug( - f"Entity {node_label} marked for deletion - no remaining sources" - ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - entities_to_update[node_label] = new_source_id - logger.debug( - f"Entity {node_label} will be updated with new source_id: {new_source_id}" - ) + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + # Process entities + # TODO There is performance when iterating get_all_labels for PostgresSQL + all_labels = await self.chunk_entity_relation_graph.get_all_labels() + for node_label in all_labels: + node_data = await self.chunk_entity_relation_graph.get_node( + node_label + ) + if node_data and "source_id" in node_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Process relationships - for node_label in all_labels: - node_edges = await self.chunk_entity_relation_graph.get_node_edges( - node_label - ) - if node_edges: - for src, tgt in node_edges: - edge_data = await self.chunk_entity_relation_graph.get_edge( - src, tgt - ) - if edge_data and "source_id" in edge_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: - relationships_to_delete.add((src, tgt)) - logger.debug( - f"Relationship {src}-{tgt} marked for deletion - no remaining sources" - ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - relationships_to_update[(src, tgt)] = new_source_id - logger.debug( - f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}" + if not remaining_sources: + entities_to_delete.add(node_label) + logger.debug( + f"Entity {node_label} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Entity needs to be rebuilt from remaining chunks + entities_to_rebuild[node_label] = remaining_sources + logger.debug( + f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) + + # Process relationships + # TODO There is performance when iterating get_all_labels for PostgresSQL + for node_label in all_labels: + node_edges = await self.chunk_entity_relation_graph.get_node_edges( + node_label + ) + if node_edges: + for src, tgt in node_edges: + # To avoid processing the same edge twice in an undirected graph + if (tgt, src) in relationships_to_delete or ( + tgt, + src, + ) in relationships_to_rebuild: + continue + + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + if edge_data and "source_id" in edge_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set( + edge_data["source_id"].split(GRAPH_FIELD_SEP) ) + remaining_sources = sources - chunk_ids - # Delete entities - if entities_to_delete: - for entity in entities_to_delete: - await self.entities_vdb.delete_entity(entity) - logger.debug(f"Deleted entity {entity} from vector DB") - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - logger.debug(f"Deleted {len(entities_to_delete)} entities from graph") + if not remaining_sources: + relationships_to_delete.add((src, tgt)) + logger.debug( + f"Relationship {src}-{tgt} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Relationship needs to be rebuilt from remaining chunks + relationships_to_rebuild[(src, tgt)] = ( + remaining_sources + ) + logger.debug( + f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) - # Update entities - for entity, new_source_id in entities_to_update.items(): - node_data = await self.chunk_entity_relation_graph.get_node(entity) - if node_data: - node_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_node( - entity, node_data + # 5. Delete chunks from storage + if chunk_ids: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + + # 6. Delete entities that have no remaining sources + if entities_to_delete: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) ) - logger.debug( - f"Updated entity {entity} with new source_id: {new_source_id}" + logger.info(f"Deleted {len(entities_to_delete)} entities") + + # 7. Delete relationships that have no remaining sources + if relationships_to_delete: + # Delete from vector database + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + logger.info(f"Deleted {len(relationships_to_delete)} relationships") + + # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + logger.info( + f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." + ) + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), ) - # Delete relationships - if relationships_to_delete: - for src, tgt in relationships_to_delete: - rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-") - rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-") - await self.relationships_vdb.delete([rel_id_0, rel_id_1]) - logger.debug(f"Deleted relationship {src}-{tgt} from vector DB") - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - logger.debug( - f"Deleted {len(relationships_to_delete)} relationships from graph" - ) - - # Update relationships - for (src, tgt), new_source_id in relationships_to_update.items(): - edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt) - if edge_data: - edge_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_edge( - src, tgt, edge_data - ) - logger.debug( - f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}" - ) - - # 6. Delete original document and status + # 9. Delete original document and status await self.full_docs.delete([doc_id]) await self.doc_status.delete([doc_id]) - # 7. Ensure all indexes are updated + # 10. Ensure all indexes are updated await self._insert_done() - logger.info( - f"Successfully deleted document {doc_id} and related data. " - f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. " - f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships." + success_message = f"""Successfully deleted document {doc_id}. +Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. +Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships.""" + + logger.info(success_message) + return DeletionResult( + status="success", + doc_id=doc_id, + message=success_message, + status_code=200, ) - async def process_data(data_type, vdb, chunk_id): - # Check data (entities or relationships) - storage = await vdb.client_storage - data_with_chunk = [ - dp - for dp in storage["data"] - if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP) - ] - - data_for_vdb = {} - if data_with_chunk: - logger.warning( - f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}" - ) - - for item in data_with_chunk: - old_sources = item["source_id"].split(GRAPH_FIELD_SEP) - new_sources = [src for src in old_sources if src != chunk_id] - - if not new_sources: - logger.info( - f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists" - ) - await vdb.delete_entity(item) - else: - item["source_id"] = GRAPH_FIELD_SEP.join(new_sources) - item_id = item["__id__"] - data_for_vdb[item_id] = item.copy() - if data_type == "entities": - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - item.get("entity_name", "") - + (item.get("description") or "") - ) - else: # relationships - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - (item.get("keywords") or "") - + (item.get("src_id") or "") - + (item.get("tgt_id") or "") - + (item.get("description") or "") - ) - - if data_for_vdb: - await vdb.upsert(data_for_vdb) - logger.info(f"Successfully updated {data_type} in vector DB") - - # Add verification step - async def verify_deletion(): - # Verify if the document has been deleted - if await self.full_docs.get_by_id(doc_id): - logger.warning(f"Document {doc_id} still exists in full_docs") - - # Verify if chunks have been deleted - all_remaining_chunks = await self.text_chunks.get_all() - remaining_related_chunks = { - chunk_id: chunk_data - for chunk_id, chunk_data in all_remaining_chunks.items() - if isinstance(chunk_data, dict) - and chunk_data.get("full_doc_id") == doc_id - } - - if remaining_related_chunks: - logger.warning( - f"Found {len(remaining_related_chunks)} remaining chunks" - ) - - # Verify entities and relationships - for chunk_id in chunk_ids: - await process_data("entities", self.entities_vdb, chunk_id) - await process_data( - "relationships", self.relationships_vdb, chunk_id - ) - - await verify_deletion() - except Exception as e: - logger.error(f"Error while deleting document {doc_id}: {e}") + error_message = f"Error while deleting document {doc_id}: {e}" + logger.error(error_message) + logger.error(traceback.format_exc()) + return DeletionResult( + status="fail", + doc_id=doc_id, + message=error_message, + status_code=500, + ) - async def adelete_by_entity(self, entity_name: str) -> None: + async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships. Args: - entity_name: Name of the entity to delete + entity_name: Name of the entity to delete. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. """ from .utils_graph import adelete_by_entity @@ -1947,16 +1928,29 @@ class LightRAG: entity_name, ) - def delete_by_entity(self, entity_name: str) -> None: + def delete_by_entity(self, entity_name: str) -> DeletionResult: + """Synchronously delete an entity and all its relationships. + + Args: + entity_name: Name of the entity to delete. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + """ loop = always_get_an_event_loop() return loop.run_until_complete(self.adelete_by_entity(entity_name)) - async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: + async def adelete_by_relation( + self, source_entity: str, target_entity: str + ) -> DeletionResult: """Asynchronously delete a relation between two entities. Args: - source_entity: Name of the source entity - target_entity: Name of the target entity + source_entity: Name of the source entity. + target_entity: Name of the target entity. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. """ from .utils_graph import adelete_by_relation @@ -1967,7 +1961,18 @@ class LightRAG: target_entity, ) - def delete_by_relation(self, source_entity: str, target_entity: str) -> None: + def delete_by_relation( + self, source_entity: str, target_entity: str + ) -> DeletionResult: + """Synchronously delete a relation between two entities. + + Args: + source_entity: Name of the source entity. + target_entity: Name of the target entity. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + """ loop = always_get_an_event_loop() return loop.run_until_complete( self.adelete_by_relation(source_entity, target_entity) diff --git a/lightrag/operate.py b/lightrag/operate.py index e1295091..b19f739c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -240,6 +240,466 @@ async def _handle_single_relationship_extraction( ) +async def _rebuild_knowledge_from_chunks( + entities_to_rebuild: dict[str, set[str]], + relationships_to_rebuild: dict[tuple[str, str], set[str]], + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + relationships_vdb: BaseVectorStorage, + text_chunks: BaseKVStorage, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild entity and relationship descriptions from cached extraction results + + This method uses cached LLM extraction results instead of calling LLM again, + following the same approach as the insert process. + + Args: + entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + """ + if not entities_to_rebuild and not relationships_to_rebuild: + return + + # Get all referenced chunk IDs + all_referenced_chunk_ids = set() + for chunk_ids in entities_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + for chunk_ids in relationships_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + + logger.info( + f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" + ) + + # Get cached extraction results for these chunks + cached_results = await _get_cached_extraction_results( + llm_response_cache, all_referenced_chunk_ids + ) + + if not cached_results: + logger.warning("No cached extraction results found, cannot rebuild") + return + + # Process cached results to get entities and relationships for each chunk + chunk_entities = {} # chunk_id -> {entity_name: [entity_data]} + chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]} + + for chunk_id, extraction_result in cached_results.items(): + try: + entities, relationships = await _parse_extraction_result( + text_chunks=text_chunks, + extraction_result=extraction_result, + chunk_id=chunk_id, + ) + chunk_entities[chunk_id] = entities + chunk_relationships[chunk_id] = relationships + except Exception as e: + logger.error( + f"Failed to parse cached extraction result for chunk {chunk_id}: {e}" + ) + continue + + # Rebuild entities + for entity_name, chunk_ids in entities_to_rebuild.items(): + try: + await _rebuild_single_entity( + knowledge_graph_inst=knowledge_graph_inst, + entities_vdb=entities_vdb, + entity_name=entity_name, + chunk_ids=chunk_ids, + chunk_entities=chunk_entities, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild entity {entity_name}: {e}") + + # Rebuild relationships + for (src, tgt), chunk_ids in relationships_to_rebuild.items(): + try: + await _rebuild_single_relationship( + knowledge_graph_inst=knowledge_graph_inst, + relationships_vdb=relationships_vdb, + src=src, + tgt=tgt, + chunk_ids=chunk_ids, + chunk_relationships=chunk_relationships, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") + + logger.info("Completed rebuilding knowledge from cached extractions") + + +async def _get_cached_extraction_results( + llm_response_cache: BaseKVStorage, chunk_ids: set[str] +) -> dict[str, str]: + """Get cached extraction results for specific chunk IDs + + Args: + chunk_ids: Set of chunk IDs to get cached results for + + Returns: + Dict mapping chunk_id -> extraction_result_text + """ + cached_results = {} + + # Get all cached data for "default" mode (entity extraction cache) + default_cache = await llm_response_cache.get_by_id("default") or {} + + for cache_key, cache_entry in default_cache.items(): + if ( + isinstance(cache_entry, dict) + and cache_entry.get("cache_type") == "extract" + and cache_entry.get("chunk_id") in chunk_ids + ): + chunk_id = cache_entry["chunk_id"] + extraction_result = cache_entry["return"] + cached_results[chunk_id] = extraction_result + + logger.info( + f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs" + ) + return cached_results + + +async def _parse_extraction_result( + text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str +) -> tuple[dict, dict]: + """Parse cached extraction result using the same logic as extract_entities + + Args: + extraction_result: The cached LLM extraction result + chunk_id: The chunk ID for source tracking + + Returns: + Tuple of (entities_dict, relationships_dict) + """ + + # Get chunk data for file_path + chunk_data = await text_chunks.get_by_id(chunk_id) + file_path = ( + chunk_data.get("file_path", "unknown_source") + if chunk_data + else "unknown_source" + ) + context_base = dict( + tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], + record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"], + completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], + ) + maybe_nodes = defaultdict(list) + maybe_edges = defaultdict(list) + + # Parse the extraction result using the same logic as in extract_entities + records = split_string_by_multi_markers( + extraction_result, + [context_base["record_delimiter"], context_base["completion_delimiter"]], + ) + for record in records: + record = re.search(r"\((.*)\)", record) + if record is None: + continue + record = record.group(1) + record_attributes = split_string_by_multi_markers( + record, [context_base["tuple_delimiter"]] + ) + + # Try to parse as entity + entity_data = await _handle_single_entity_extraction( + record_attributes, chunk_id, file_path + ) + if entity_data is not None: + maybe_nodes[entity_data["entity_name"]].append(entity_data) + continue + + # Try to parse as relationship + relationship_data = await _handle_single_relationship_extraction( + record_attributes, chunk_id, file_path + ) + if relationship_data is not None: + maybe_edges[ + (relationship_data["src_id"], relationship_data["tgt_id"]) + ].append(relationship_data) + + return dict(maybe_nodes), dict(maybe_edges) + + +async def _rebuild_single_entity( + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + entity_name: str, + chunk_ids: set[str], + chunk_entities: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single entity from cached extraction results""" + + # Get current entity data + current_entity = await knowledge_graph_inst.get_node(entity_name) + if not current_entity: + return + + # Helper function to update entity in both graph and vector storage + async def _update_entity_storage( + final_description: str, entity_type: str, file_paths: set[str] + ): + # Update entity in graph storage + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + + # Update entity in vector database + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Delete old vector record first + try: + await entities_vdb.delete([entity_vdb_id]) + except Exception as e: + logger.debug( + f"Could not delete old entity vector record {entity_vdb_id}: {e}" + ) + + # Insert new vector record + entity_content = f"{entity_name}\n{final_description}" + await entities_vdb.upsert( + { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_name, + "source_id": updated_entity_data["source_id"], + "description": final_description, + "entity_type": entity_type, + "file_path": updated_entity_data["file_path"], + } + } + ) + + # Helper function to generate final description with optional LLM summary + async def _generate_final_description(combined_description: str) -> str: + if len(combined_description) > global_config["summary_to_max_tokens"]: + return await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + return combined_description + + # Collect all entity data from relevant chunks + all_entity_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: + all_entity_data.extend(chunk_entities[chunk_id][entity_name]) + + if not all_entity_data: + logger.warning( + f"No cached entity data found for {entity_name}, trying to rebuild from relationships" + ) + + # Get all edges connected to this entity + edges = await knowledge_graph_inst.get_node_edges(entity_name) + if not edges: + logger.warning(f"No relationships found for entity {entity_name}") + return + + # Collect relationship data to extract entity information + relationship_descriptions = [] + file_paths = set() + + # Get edge data for all connected relationships + for src_id, tgt_id in edges: + edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id) + if edge_data: + if edge_data.get("description"): + relationship_descriptions.append(edge_data["description"]) + + if edge_data.get("file_path"): + edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) + file_paths.update(edge_file_paths) + + # Generate description from relationships or fallback to current + if relationship_descriptions: + combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) + final_description = await _generate_final_description(combined_description) + else: + final_description = current_entity.get("description", "") + + entity_type = current_entity.get("entity_type", "UNKNOWN") + await _update_entity_storage(final_description, entity_type, file_paths) + return + + # Process cached entity data + descriptions = [] + entity_types = [] + file_paths = set() + + for entity_data in all_entity_data: + if entity_data.get("description"): + descriptions.append(entity_data["description"]) + if entity_data.get("entity_type"): + entity_types.append(entity_data["entity_type"]) + if entity_data.get("file_path"): + file_paths.add(entity_data["file_path"]) + + # Combine all descriptions + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_entity.get("description", "") + ) + + # Get most common entity type + entity_type = ( + max(set(entity_types), key=entity_types.count) + if entity_types + else current_entity.get("entity_type", "UNKNOWN") + ) + + # Generate final description and update storage + final_description = await _generate_final_description(combined_description) + await _update_entity_storage(final_description, entity_type, file_paths) + + +async def _rebuild_single_relationship( + knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage, + src: str, + tgt: str, + chunk_ids: set[str], + chunk_relationships: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single relationship from cached extraction results""" + + # Get current relationship data + current_relationship = await knowledge_graph_inst.get_edge(src, tgt) + if not current_relationship: + return + + # Collect all relationship data from relevant chunks + all_relationship_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_relationships: + # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional + for edge_key in [(src, tgt), (tgt, src)]: + if edge_key in chunk_relationships[chunk_id]: + all_relationship_data.extend( + chunk_relationships[chunk_id][edge_key] + ) + + if not all_relationship_data: + logger.warning(f"No cached relationship data found for {src}-{tgt}") + return + + # Merge descriptions and keywords + descriptions = [] + keywords = [] + weights = [] + file_paths = set() + + for rel_data in all_relationship_data: + if rel_data.get("description"): + descriptions.append(rel_data["description"]) + if rel_data.get("keywords"): + keywords.append(rel_data["keywords"]) + if rel_data.get("weight"): + weights.append(rel_data["weight"]) + if rel_data.get("file_path"): + file_paths.add(rel_data["file_path"]) + + # Combine descriptions and keywords + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_relationship.get("description", "") + ) + combined_keywords = ( + ", ".join(set(keywords)) + if keywords + else current_relationship.get("keywords", "") + ) + # weight = ( + # sum(weights) / len(weights) + # if weights + # else current_relationship.get("weight", 1.0) + # ) + weight = sum(weights) if weights else current_relationship.get("weight", 1.0) + + # Use summary if description is too long + if len(combined_description) > global_config["summary_to_max_tokens"]: + final_description = await _handle_entity_relation_summary( + f"{src}-{tgt}", + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + + # Update relationship in graph storage + updated_relationship_data = { + **current_relationship, + "description": final_description, + "keywords": combined_keywords, + "weight": weight, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_relationship.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) + + # Update relationship in vector database + rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") + + # Delete old vector records first (both directions to be safe) + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + + # Insert new vector record + rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}" + await relationships_vdb.upsert( + { + rel_vdb_id: { + "src_id": src, + "tgt_id": tgt, + "source_id": updated_relationship_data["source_id"], + "content": rel_content, + "keywords": combined_keywords, + "description": final_description, + "weight": weight, + "file_path": updated_relationship_data["file_path"], + } + } + ) + + async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], @@ -757,6 +1217,7 @@ async def extract_entities( use_llm_func, llm_response_cache=llm_response_cache, cache_type="extract", + chunk_id=chunk_key, ) history = pack_user_ass_to_openai_messages(hint_prompt, final_result) @@ -773,6 +1234,7 @@ async def extract_entities( llm_response_cache=llm_response_cache, history_messages=history, cache_type="extract", + chunk_id=chunk_key, ) history += pack_user_ass_to_openai_messages(continue_prompt, glean_result) diff --git a/lightrag/utils.py b/lightrag/utils.py index 2e75b9b9..06b7a468 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -990,6 +990,7 @@ class CacheData: max_val: float | None = None mode: str = "default" cache_type: str = "query" + chunk_id: str | None = None async def save_to_cache(hashing_kv, cache_data: CacheData): @@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData): mode_cache[cache_data.args_hash] = { "return": cache_data.content, "cache_type": cache_data.cache_type, + "chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None, "embedding": cache_data.quantized.tobytes().hex() if cache_data.quantized is not None else None, @@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache( max_tokens: int = None, history_messages: list[dict[str, str]] = None, cache_type: str = "extract", + chunk_id: str | None = None, ) -> str: """Call LLM function with cache support @@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache( max_tokens: Maximum tokens for generation history_messages: History messages list cache_type: Type of cache + chunk_id: Chunk identifier to store in cache Returns: LLM response text @@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache( content=res, prompt=_prompt, cache_type=cache_type, + chunk_id=chunk_id, ), ) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 54876fa7..5485d47c 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -4,6 +4,7 @@ import time import asyncio from typing import Any, cast +from .base import DeletionResult from .kg.shared_storage import get_graph_db_lock from .prompt import GRAPH_FIELD_SEP from .utils import compute_mdhash_id, logger @@ -12,7 +13,7 @@ from .base import StorageNameSpace async def adelete_by_entity( chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str -) -> None: +) -> DeletionResult: """Asynchronously delete an entity and all its relationships. Args: @@ -25,18 +26,43 @@ async def adelete_by_entity( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Check if the entity exists + if not await chunk_entity_relation_graph.has_node(entity_name): + logger.warning(f"Entity '{entity_name}' not found.") + return DeletionResult( + status="not_found", + doc_id=entity_name, + message=f"Entity '{entity_name}' not found.", + status_code=404, + ) + # Retrieve related relationships before deleting the node + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + related_relations_count = len(edges) if edges else 0 + await entities_vdb.delete_entity(entity_name) await relationships_vdb.delete_entity_relation(entity_name) await chunk_entity_relation_graph.delete_node(entity_name) - logger.info( - f"Entity '{entity_name}' and its relationships have been deleted." - ) + message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted." + logger.info(message) await _delete_by_entity_done( entities_vdb, relationships_vdb, chunk_entity_relation_graph ) + return DeletionResult( + status="success", + doc_id=entity_name, + message=message, + status_code=200, + ) except Exception as e: - logger.error(f"Error while deleting entity '{entity_name}': {e}") + error_message = f"Error while deleting entity '{entity_name}': {e}" + logger.error(error_message) + return DeletionResult( + status="fail", + doc_id=entity_name, + message=error_message, + status_code=500, + ) async def _delete_by_entity_done( @@ -60,7 +86,7 @@ async def adelete_by_relation( relationships_vdb, source_entity: str, target_entity: str, -) -> None: +) -> DeletionResult: """Asynchronously delete a relation between two entities. Args: @@ -69,6 +95,7 @@ async def adelete_by_relation( source_entity: Name of the source entity target_entity: Name of the target entity """ + relation_str = f"{source_entity} -> {target_entity}" graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: @@ -78,29 +105,45 @@ async def adelete_by_relation( source_entity, target_entity ) if not edge_exists: - logger.warning( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" + message = f"Relation from '{source_entity}' to '{target_entity}' does not exist" + logger.warning(message) + return DeletionResult( + status="not_found", + doc_id=relation_str, + message=message, + status_code=404, ) - return # Delete relation from vector database - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await relationships_vdb.delete([relation_id]) + rel_ids_to_delete = [ + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + + await relationships_vdb.delete(rel_ids_to_delete) # Delete relation from knowledge graph await chunk_entity_relation_graph.remove_edges( [(source_entity, target_entity)] ) - logger.info( - f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" - ) + message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + logger.info(message) await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + return DeletionResult( + status="success", + doc_id=relation_str, + message=message, + status_code=200, + ) except Exception as e: - logger.error( - f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + error_message = f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + logger.error(error_message) + return DeletionResult( + status="fail", + doc_id=relation_str, + message=error_message, + status_code=500, )