From ead82a8dbd3e782b9ca25777125335fc6b2f83cf Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Mon, 9 Jun 2025 18:52:34 +0800 Subject: [PATCH 01/25] update delete_by_doc_id --- lightrag/base.py | 15 ++ lightrag/kg/json_kv_impl.py | 47 ++++ lightrag/lightrag.py | 259 ++++++++-------------- lightrag/operate.py | 417 ++++++++++++++++++++++++++++++++++++ lightrag/utils.py | 5 + 5 files changed, 572 insertions(+), 171 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index c6035f70..e66a67db 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC): False: if the cache drop failed, or the cache mode is not supported """ + async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + """Delete specific cache records from storage by chunk IDs + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed, or the operation is not supported + """ + @dataclass class BaseGraphStorage(StorageNameSpace, ABC): diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2d44ce00..2345e50f 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage): except Exception: return False + async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + """Delete specific cache records from storage by chunk IDs + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed + """ + if not chunk_ids: + return False + + try: + async with self._storage_lock: + # Iterate through all cache modes to find entries with matching chunk_ids + for mode_key, mode_data in list(self._data.items()): + if isinstance(mode_data, dict): + # Check each cached entry in this mode + for cache_key, cache_entry in list(mode_data.items()): + if ( + isinstance(cache_entry, dict) + and cache_entry.get("chunk_id") in chunk_ids + ): + # Remove this cache entry + del mode_data[cache_key] + logger.debug( + f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" + ) + + # If the mode is now empty, remove it entirely + if not mode_data: + del self._data[mode_key] + + # Set update flags to notify persistence is needed + await set_all_update_flags(self.namespace) + + logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") + return True + except Exception as e: + logger.error(f"Error clearing cache by chunk IDs: {e}") + return False + async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0bf7de83..6d206d5c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -56,6 +56,7 @@ from .operate import ( kg_query, naive_query, query_with_keywords, + _rebuild_knowledge_from_chunks, ) from .prompt import GRAPH_FIELD_SEP from .utils import ( @@ -1207,6 +1208,7 @@ class LightRAG: cast(StorageNameSpace, storage_inst).index_done_callback() for storage_inst in [ # type: ignore self.full_docs, + self.doc_status, self.text_chunks, self.llm_response_cache, self.entities_vdb, @@ -1674,10 +1676,12 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.) - # Document delete is not working properly for most of the storage implementations. async def adelete_by_doc_id(self, doc_id: str) -> None: - """Delete a document and all its related data + """Delete a document and all its related data with cache cleanup and reconstruction + + Optimized version that: + 1. Clears LLM cache for related chunks + 2. Rebuilds entity and relationship descriptions from remaining chunks Args: doc_id: Document ID to delete @@ -1688,10 +1692,9 @@ class LightRAG: logger.warning(f"Document {doc_id} not found") return - logger.debug(f"Starting deletion for document {doc_id}") + logger.info(f"Starting optimized deletion for document {doc_id}") # 2. Get all chunks related to this document - # Find all chunks where full_doc_id equals the current doc_id all_chunks = await self.text_chunks.get_all() related_chunks = { chunk_id: chunk_data @@ -1704,64 +1707,46 @@ class LightRAG: logger.warning(f"No chunks found for document {doc_id}") return - # Get all related chunk IDs chunk_ids = set(related_chunks.keys()) - logger.debug(f"Found {len(chunk_ids)} chunks to delete") + logger.info(f"Found {len(chunk_ids)} chunks to delete") - # TODO: self.entities_vdb.client_storage only works for local storage, need to fix this + # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks + logger.info("Clearing LLM cache for related chunks...") + cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( + list(chunk_ids) + ) + if cache_cleared: + logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") + else: + logger.warning( + "Failed to clear chunk cache or cache clearing not supported" + ) - # 3. Before deleting, check the related entities and relationships for these chunks - for chunk_id in chunk_ids: - # Check entities - entities_storage = await self.entities_vdb.client_storage - entities = [ - dp - for dp in entities_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities") - - # Check relationships - relationships_storage = await self.relationships_vdb.client_storage - relations = [ - dp - for dp in relationships_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations") - - # Continue with the original deletion process... - - # 4. Delete chunks from vector database - if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - - # 5. Find and process entities and relationships that have these chunks as source - # Get all nodes and edges from the graph storage using storage-agnostic methods + # 4. Analyze entities and relationships that will be affected entities_to_delete = set() - entities_to_update = {} # entity_name -> new_source_id + entities_to_rebuild = {} # entity_name -> remaining_chunk_ids relationships_to_delete = set() - relationships_to_update = {} # (src, tgt) -> new_source_id + relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Process entities - use storage-agnostic methods + # Process entities all_labels = await self.chunk_entity_relation_graph.get_all_labels() for node_label in all_labels: node_data = await self.chunk_entity_relation_graph.get_node(node_label) if node_data and "source_id" in node_data: # Split source_id using GRAPH_FIELD_SEP sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: + remaining_sources = sources - chunk_ids + + if not remaining_sources: entities_to_delete.add(node_label) logger.debug( f"Entity {node_label} marked for deletion - no remaining sources" ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - entities_to_update[node_label] = new_source_id + elif remaining_sources != sources: + # Entity needs to be rebuilt from remaining chunks + entities_to_rebuild[node_label] = remaining_sources logger.debug( - f"Entity {node_label} will be updated with new source_id: {new_source_id}" + f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" ) # Process relationships @@ -1777,160 +1762,92 @@ class LightRAG: if edge_data and "source_id" in edge_data: # Split source_id using GRAPH_FIELD_SEP sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: + remaining_sources = sources - chunk_ids + + if not remaining_sources: relationships_to_delete.add((src, tgt)) logger.debug( f"Relationship {src}-{tgt} marked for deletion - no remaining sources" ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - relationships_to_update[(src, tgt)] = new_source_id + elif remaining_sources != sources: + # Relationship needs to be rebuilt from remaining chunks + relationships_to_rebuild[(src, tgt)] = remaining_sources logger.debug( - f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}" + f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" ) - # Delete entities + # 5. Delete chunks from storage + if chunk_ids: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + + # 6. Delete entities that have no remaining sources if entities_to_delete: - for entity in entities_to_delete: - await self.entities_vdb.delete_entity(entity) - logger.debug(f"Deleted entity {entity} from vector DB") + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph await self.chunk_entity_relation_graph.remove_nodes( list(entities_to_delete) ) - logger.debug(f"Deleted {len(entities_to_delete)} entities from graph") + logger.info(f"Deleted {len(entities_to_delete)} entities") - # Update entities - for entity, new_source_id in entities_to_update.items(): - node_data = await self.chunk_entity_relation_graph.get_node(entity) - if node_data: - node_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_node( - entity, node_data - ) - logger.debug( - f"Updated entity {entity} with new source_id: {new_source_id}" - ) - - # Delete relationships + # 7. Delete relationships that have no remaining sources if relationships_to_delete: + # Delete from vector database + rel_ids_to_delete = [] for src, tgt in relationships_to_delete: - rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-") - rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-") - await self.relationships_vdb.delete([rel_id_0, rel_id_1]) - logger.debug(f"Deleted relationship {src}-{tgt} from vector DB") + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph await self.chunk_entity_relation_graph.remove_edges( list(relationships_to_delete) ) - logger.debug( - f"Deleted {len(relationships_to_delete)} relationships from graph" + logger.info(f"Deleted {len(relationships_to_delete)} relationships") + + # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + logger.info( + f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." + ) + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), ) - # Update relationships - for (src, tgt), new_source_id in relationships_to_update.items(): - edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt) - if edge_data: - edge_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_edge( - src, tgt, edge_data - ) - logger.debug( - f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}" - ) - - # 6. Delete original document and status + # 9. Delete original document and status await self.full_docs.delete([doc_id]) await self.doc_status.delete([doc_id]) - # 7. Ensure all indexes are updated + # 10. Ensure all indexes are updated await self._insert_done() logger.info( - f"Successfully deleted document {doc_id} and related data. " - f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. " - f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships." + f"Successfully deleted document {doc_id}. " + f"Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. " + f"Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships." ) - async def process_data(data_type, vdb, chunk_id): - # Check data (entities or relationships) - storage = await vdb.client_storage - data_with_chunk = [ - dp - for dp in storage["data"] - if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP) - ] - - data_for_vdb = {} - if data_with_chunk: - logger.warning( - f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}" - ) - - for item in data_with_chunk: - old_sources = item["source_id"].split(GRAPH_FIELD_SEP) - new_sources = [src for src in old_sources if src != chunk_id] - - if not new_sources: - logger.info( - f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists" - ) - await vdb.delete_entity(item) - else: - item["source_id"] = GRAPH_FIELD_SEP.join(new_sources) - item_id = item["__id__"] - data_for_vdb[item_id] = item.copy() - if data_type == "entities": - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - item.get("entity_name", "") - + (item.get("description") or "") - ) - else: # relationships - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - (item.get("keywords") or "") - + (item.get("src_id") or "") - + (item.get("tgt_id") or "") - + (item.get("description") or "") - ) - - if data_for_vdb: - await vdb.upsert(data_for_vdb) - logger.info(f"Successfully updated {data_type} in vector DB") - - # Add verification step - async def verify_deletion(): - # Verify if the document has been deleted - if await self.full_docs.get_by_id(doc_id): - logger.warning(f"Document {doc_id} still exists in full_docs") - - # Verify if chunks have been deleted - all_remaining_chunks = await self.text_chunks.get_all() - remaining_related_chunks = { - chunk_id: chunk_data - for chunk_id, chunk_data in all_remaining_chunks.items() - if isinstance(chunk_data, dict) - and chunk_data.get("full_doc_id") == doc_id - } - - if remaining_related_chunks: - logger.warning( - f"Found {len(remaining_related_chunks)} remaining chunks" - ) - - # Verify entities and relationships - for chunk_id in chunk_ids: - await process_data("entities", self.entities_vdb, chunk_id) - await process_data( - "relationships", self.relationships_vdb, chunk_id - ) - - await verify_deletion() - except Exception as e: logger.error(f"Error while deleting document {doc_id}: {e}") + raise async def adelete_by_entity(self, entity_name: str) -> None: """Asynchronously delete an entity and all its relationships. diff --git a/lightrag/operate.py b/lightrag/operate.py index e1295091..91d1ee68 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -240,6 +240,421 @@ async def _handle_single_relationship_extraction( ) +async def _rebuild_knowledge_from_chunks( + entities_to_rebuild: dict[str, set[str]], + relationships_to_rebuild: dict[tuple[str, str], set[str]], + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + relationships_vdb: BaseVectorStorage, + text_chunks: BaseKVStorage, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild entity and relationship descriptions from cached extraction results + + This method uses cached LLM extraction results instead of calling LLM again, + following the same approach as the insert process. + + Args: + entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + """ + if not entities_to_rebuild and not relationships_to_rebuild: + return + + # Get all referenced chunk IDs + all_referenced_chunk_ids = set() + for chunk_ids in entities_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + for chunk_ids in relationships_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + + logger.info( + f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" + ) + + # Get cached extraction results for these chunks + cached_results = await _get_cached_extraction_results( + llm_response_cache, all_referenced_chunk_ids + ) + + if not cached_results: + logger.warning("No cached extraction results found, cannot rebuild") + return + + # Process cached results to get entities and relationships for each chunk + chunk_entities = {} # chunk_id -> {entity_name: [entity_data]} + chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]} + + for chunk_id, extraction_result in cached_results.items(): + try: + entities, relationships = await _parse_extraction_result( + text_chunks=text_chunks, + extraction_result=extraction_result, + chunk_id=chunk_id, + ) + chunk_entities[chunk_id] = entities + chunk_relationships[chunk_id] = relationships + except Exception as e: + logger.error( + f"Failed to parse cached extraction result for chunk {chunk_id}: {e}" + ) + continue + + # Rebuild entities + for entity_name, chunk_ids in entities_to_rebuild.items(): + try: + await _rebuild_single_entity( + knowledge_graph_inst=knowledge_graph_inst, + entities_vdb=entities_vdb, + entity_name=entity_name, + chunk_ids=chunk_ids, + chunk_entities=chunk_entities, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild entity {entity_name}: {e}") + + # Rebuild relationships + for (src, tgt), chunk_ids in relationships_to_rebuild.items(): + try: + await _rebuild_single_relationship( + knowledge_graph_inst=knowledge_graph_inst, + relationships_vdb=relationships_vdb, + src=src, + tgt=tgt, + chunk_ids=chunk_ids, + chunk_relationships=chunk_relationships, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") + + logger.info("Completed rebuilding knowledge from cached extractions") + + +async def _get_cached_extraction_results( + llm_response_cache: BaseKVStorage, chunk_ids: set[str] +) -> dict[str, str]: + """Get cached extraction results for specific chunk IDs + + Args: + chunk_ids: Set of chunk IDs to get cached results for + + Returns: + Dict mapping chunk_id -> extraction_result_text + """ + cached_results = {} + + # Get all cached data for "default" mode (entity extraction cache) + default_cache = await llm_response_cache.get_by_id("default") or {} + + for cache_key, cache_entry in default_cache.items(): + if ( + isinstance(cache_entry, dict) + and cache_entry.get("cache_type") == "extract" + and cache_entry.get("chunk_id") in chunk_ids + ): + chunk_id = cache_entry["chunk_id"] + extraction_result = cache_entry["return"] + cached_results[chunk_id] = extraction_result + + logger.info( + f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs" + ) + return cached_results + + +async def _parse_extraction_result( + text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str +) -> tuple[dict, dict]: + """Parse cached extraction result using the same logic as extract_entities + + Args: + extraction_result: The cached LLM extraction result + chunk_id: The chunk ID for source tracking + + Returns: + Tuple of (entities_dict, relationships_dict) + """ + + # Get chunk data for file_path + chunk_data = await text_chunks.get_by_id(chunk_id) + file_path = ( + chunk_data.get("file_path", "unknown_source") + if chunk_data + else "unknown_source" + ) + context_base = dict( + tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], + record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"], + completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], + ) + maybe_nodes = defaultdict(list) + maybe_edges = defaultdict(list) + + # Parse the extraction result using the same logic as in extract_entities + records = split_string_by_multi_markers( + extraction_result, + [context_base["record_delimiter"], context_base["completion_delimiter"]], + ) + for record in records: + record = re.search(r"\((.*)\)", record) + if record is None: + continue + record = record.group(1) + record_attributes = split_string_by_multi_markers( + record, [context_base["tuple_delimiter"]] + ) + + # Try to parse as entity + entity_data = await _handle_single_entity_extraction( + record_attributes, chunk_id, file_path + ) + if entity_data is not None: + maybe_nodes[entity_data["entity_name"]].append(entity_data) + continue + + # Try to parse as relationship + relationship_data = await _handle_single_relationship_extraction( + record_attributes, chunk_id, file_path + ) + if relationship_data is not None: + maybe_edges[ + (relationship_data["src_id"], relationship_data["tgt_id"]) + ].append(relationship_data) + + return dict(maybe_nodes), dict(maybe_edges) + + +async def _rebuild_single_entity( + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + entity_name: str, + chunk_ids: set[str], + chunk_entities: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single entity from cached extraction results""" + + # Get current entity data + current_entity = await knowledge_graph_inst.get_node(entity_name) + if not current_entity: + return + + # Collect all entity data from relevant chunks + all_entity_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: + all_entity_data.extend(chunk_entities[chunk_id][entity_name]) + + if not all_entity_data: + logger.warning(f"No cached entity data found for {entity_name}") + return + + # Merge descriptions and get the most common entity type + descriptions = [] + entity_types = [] + file_paths = set() + + for entity_data in all_entity_data: + if entity_data.get("description"): + descriptions.append(entity_data["description"]) + if entity_data.get("entity_type"): + entity_types.append(entity_data["entity_type"]) + if entity_data.get("file_path"): + file_paths.add(entity_data["file_path"]) + + # Combine all descriptions + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_entity.get("description", "") + ) + + # Get most common entity type + entity_type = ( + max(set(entity_types), key=entity_types.count) + if entity_types + else current_entity.get("entity_type", "UNKNOWN") + ) + + # Use summary if description is too long + if len(combined_description) > global_config["summary_to_max_tokens"]: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + + # Update entity in graph storage + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + + # Update entity in vector database + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Delete old vector record first + try: + await entities_vdb.delete([entity_vdb_id]) + except Exception as e: + logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") + + # Insert new vector record + entity_content = f"{entity_name}\n{final_description}" + await entities_vdb.upsert( + { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_name, + "source_id": updated_entity_data["source_id"], + "description": final_description, + "entity_type": entity_type, + "file_path": updated_entity_data["file_path"], + } + } + ) + + +async def _rebuild_single_relationship( + knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage, + src: str, + tgt: str, + chunk_ids: set[str], + chunk_relationships: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single relationship from cached extraction results""" + + # Get current relationship data + current_relationship = await knowledge_graph_inst.get_edge(src, tgt) + if not current_relationship: + return + + # Collect all relationship data from relevant chunks + all_relationship_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_relationships: + # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional + for edge_key in [(src, tgt), (tgt, src)]: + if edge_key in chunk_relationships[chunk_id]: + all_relationship_data.extend( + chunk_relationships[chunk_id][edge_key] + ) + + if not all_relationship_data: + logger.warning(f"No cached relationship data found for {src}-{tgt}") + return + + # Merge descriptions and keywords + descriptions = [] + keywords = [] + weights = [] + file_paths = set() + + for rel_data in all_relationship_data: + if rel_data.get("description"): + descriptions.append(rel_data["description"]) + if rel_data.get("keywords"): + keywords.append(rel_data["keywords"]) + if rel_data.get("weight"): + weights.append(rel_data["weight"]) + if rel_data.get("file_path"): + file_paths.add(rel_data["file_path"]) + + # Combine descriptions and keywords + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_relationship.get("description", "") + ) + combined_keywords = ( + ", ".join(set(keywords)) + if keywords + else current_relationship.get("keywords", "") + ) + avg_weight = ( + sum(weights) / len(weights) + if weights + else current_relationship.get("weight", 1.0) + ) + + # Use summary if description is too long + if len(combined_description) > global_config["summary_to_max_tokens"]: + final_description = await _handle_entity_relation_summary( + f"{src}-{tgt}", + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + + # Update relationship in graph storage + updated_relationship_data = { + **current_relationship, + "description": final_description, + "keywords": combined_keywords, + "weight": avg_weight, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_relationship.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) + + # Update relationship in vector database + rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") + + # Delete old vector records first (both directions to be safe) + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + + # Insert new vector record + rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}" + await relationships_vdb.upsert( + { + rel_vdb_id: { + "src_id": src, + "tgt_id": tgt, + "source_id": updated_relationship_data["source_id"], + "content": rel_content, + "keywords": combined_keywords, + "description": final_description, + "weight": avg_weight, + "file_path": updated_relationship_data["file_path"], + } + } + ) + + async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], @@ -757,6 +1172,7 @@ async def extract_entities( use_llm_func, llm_response_cache=llm_response_cache, cache_type="extract", + chunk_id=chunk_key, ) history = pack_user_ass_to_openai_messages(hint_prompt, final_result) @@ -773,6 +1189,7 @@ async def extract_entities( llm_response_cache=llm_response_cache, history_messages=history, cache_type="extract", + chunk_id=chunk_key, ) history += pack_user_ass_to_openai_messages(continue_prompt, glean_result) diff --git a/lightrag/utils.py b/lightrag/utils.py index 2e75b9b9..06b7a468 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -990,6 +990,7 @@ class CacheData: max_val: float | None = None mode: str = "default" cache_type: str = "query" + chunk_id: str | None = None async def save_to_cache(hashing_kv, cache_data: CacheData): @@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData): mode_cache[cache_data.args_hash] = { "return": cache_data.content, "cache_type": cache_data.cache_type, + "chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None, "embedding": cache_data.quantized.tobytes().hex() if cache_data.quantized is not None else None, @@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache( max_tokens: int = None, history_messages: list[dict[str, str]] = None, cache_type: str = "extract", + chunk_id: str | None = None, ) -> str: """Call LLM function with cache support @@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache( max_tokens: Maximum tokens for generation history_messages: History messages list cache_type: Type of cache + chunk_id: Chunk identifier to store in cache Returns: LLM response text @@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache( content=res, prompt=_prompt, cache_type=cache_type, + chunk_id=chunk_id, ), ) From 9a71a10bc00a614df1c45b5aa52c5dff7757fae4 Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Mon, 9 Jun 2025 19:40:29 +0800 Subject: [PATCH 02/25] Update operate.py --- lightrag/operate.py | 139 ++++++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 49 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 91d1ee68..453e647d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -451,6 +451,58 @@ async def _rebuild_single_entity( if not current_entity: return + # Helper function to update entity in both graph and vector storage + async def _update_entity_storage( + final_description: str, + entity_type: str, + file_paths: set[str] + ): + # Update entity in graph storage + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + + # Update entity in vector database + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Delete old vector record first + try: + await entities_vdb.delete([entity_vdb_id]) + except Exception as e: + logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") + + # Insert new vector record + entity_content = f"{entity_name}\n{final_description}" + await entities_vdb.upsert( + { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_name, + "source_id": updated_entity_data["source_id"], + "description": final_description, + "entity_type": entity_type, + "file_path": updated_entity_data["file_path"], + } + } + ) + + # Helper function to generate final description with optional LLM summary + async def _generate_final_description(combined_description: str) -> str: + if len(combined_description) > global_config["summary_to_max_tokens"]: + return await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + return combined_description + # Collect all entity data from relevant chunks all_entity_data = [] for chunk_id in chunk_ids: @@ -458,10 +510,41 @@ async def _rebuild_single_entity( all_entity_data.extend(chunk_entities[chunk_id][entity_name]) if not all_entity_data: - logger.warning(f"No cached entity data found for {entity_name}") + logger.warning(f"No cached entity data found for {entity_name}, trying to rebuild from relationships") + + # Get all edges connected to this entity + edges = await knowledge_graph_inst.get_node_edges(entity_name) + if not edges: + logger.warning(f"No relationships found for entity {entity_name}") + return + + # Collect relationship data to extract entity information + relationship_descriptions = [] + file_paths = set() + + # Get edge data for all connected relationships + for src_id, tgt_id in edges: + edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id) + if edge_data: + if edge_data.get("description"): + relationship_descriptions.append(edge_data["description"]) + + if edge_data.get("file_path"): + edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) + file_paths.update(edge_file_paths) + + # Generate description from relationships or fallback to current + if relationship_descriptions: + combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) + final_description = await _generate_final_description(combined_description) + else: + final_description = current_entity.get("description", "") + + entity_type = current_entity.get("entity_type", "UNKNOWN") + await _update_entity_storage(final_description, entity_type, file_paths) return - # Merge descriptions and get the most common entity type + # Process cached entity data descriptions = [] entity_types = [] file_paths = set() @@ -488,52 +571,9 @@ async def _rebuild_single_entity( else current_entity.get("entity_type", "UNKNOWN") ) - # Use summary if description is too long - if len(combined_description) > global_config["summary_to_max_tokens"]: - final_description = await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - final_description = combined_description - - # Update entity in graph storage - updated_entity_data = { - **current_entity, - "description": final_description, - "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), - "file_path": GRAPH_FIELD_SEP.join(file_paths) - if file_paths - else current_entity.get("file_path", "unknown_source"), - } - await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) - - # Update entity in vector database - entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Delete old vector record first - try: - await entities_vdb.delete([entity_vdb_id]) - except Exception as e: - logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") - - # Insert new vector record - entity_content = f"{entity_name}\n{final_description}" - await entities_vdb.upsert( - { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_name, - "source_id": updated_entity_data["source_id"], - "description": final_description, - "entity_type": entity_type, - "file_path": updated_entity_data["file_path"], - } - } - ) + # Generate final description and update storage + final_description = await _generate_final_description(combined_description) + await _update_entity_storage(final_description, entity_type, file_paths) async def _rebuild_single_relationship( @@ -798,7 +838,8 @@ async def _merge_edges_then_upsert( ) # Process edges_data with None checks - weight = sum([dp["weight"] for dp in edges_data] + already_weights) + all_weights = [dp["weight"] for dp in edges_data] + already_weights + weight = sum(all_weights) / len(all_weights) description = GRAPH_FIELD_SEP.join( sorted( set( From 3a9494ab60a1c4d2761fe0031f4231d8b9156dd7 Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Mon, 9 Jun 2025 19:47:29 +0800 Subject: [PATCH 03/25] Update operate.py --- lightrag/operate.py | 46 ++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 453e647d..b19f739c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -453,9 +453,7 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, - entity_type: str, - file_paths: set[str] + final_description: str, entity_type: str, file_paths: set[str] ): # Update entity in graph storage updated_entity_data = { @@ -463,7 +461,9 @@ async def _rebuild_single_entity( "description": final_description, "entity_type": entity_type, "source_id": GRAPH_FIELD_SEP.join(chunk_ids), - "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_entity.get("file_path", "unknown_source"), } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -474,7 +474,9 @@ async def _rebuild_single_entity( try: await entities_vdb.delete([entity_vdb_id]) except Exception as e: - logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") + logger.debug( + f"Could not delete old entity vector record {entity_vdb_id}: {e}" + ) # Insert new vector record entity_content = f"{entity_name}\n{final_description}" @@ -510,36 +512,38 @@ async def _rebuild_single_entity( all_entity_data.extend(chunk_entities[chunk_id][entity_name]) if not all_entity_data: - logger.warning(f"No cached entity data found for {entity_name}, trying to rebuild from relationships") - + logger.warning( + f"No cached entity data found for {entity_name}, trying to rebuild from relationships" + ) + # Get all edges connected to this entity edges = await knowledge_graph_inst.get_node_edges(entity_name) if not edges: logger.warning(f"No relationships found for entity {entity_name}") return - + # Collect relationship data to extract entity information relationship_descriptions = [] file_paths = set() - + # Get edge data for all connected relationships for src_id, tgt_id in edges: edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id) if edge_data: if edge_data.get("description"): relationship_descriptions.append(edge_data["description"]) - + if edge_data.get("file_path"): edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) file_paths.update(edge_file_paths) - + # Generate description from relationships or fallback to current if relationship_descriptions: combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) final_description = await _generate_final_description(combined_description) else: final_description = current_entity.get("description", "") - + entity_type = current_entity.get("entity_type", "UNKNOWN") await _update_entity_storage(final_description, entity_type, file_paths) return @@ -635,11 +639,12 @@ async def _rebuild_single_relationship( if keywords else current_relationship.get("keywords", "") ) - avg_weight = ( - sum(weights) / len(weights) - if weights - else current_relationship.get("weight", 1.0) - ) + # weight = ( + # sum(weights) / len(weights) + # if weights + # else current_relationship.get("weight", 1.0) + # ) + weight = sum(weights) if weights else current_relationship.get("weight", 1.0) # Use summary if description is too long if len(combined_description) > global_config["summary_to_max_tokens"]: @@ -657,7 +662,7 @@ async def _rebuild_single_relationship( **current_relationship, "description": final_description, "keywords": combined_keywords, - "weight": avg_weight, + "weight": weight, "source_id": GRAPH_FIELD_SEP.join(chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths @@ -688,7 +693,7 @@ async def _rebuild_single_relationship( "content": rel_content, "keywords": combined_keywords, "description": final_description, - "weight": avg_weight, + "weight": weight, "file_path": updated_relationship_data["file_path"], } } @@ -838,8 +843,7 @@ async def _merge_edges_then_upsert( ) # Process edges_data with None checks - all_weights = [dp["weight"] for dp in edges_data] + already_weights - weight = sum(all_weights) / len(all_weights) + weight = sum([dp["weight"] for dp in edges_data] + already_weights) description = GRAPH_FIELD_SEP.join( sorted( set( From 4937de8809490f2ad247a8d3ae194345b902623b Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Sun, 22 Jun 2025 15:12:09 +0800 Subject: [PATCH 04/25] Update --- README-zh.md | 88 ++++++++++++++++++++ README.md | 154 +++++++++++++++++++++++++++++++++++ lightrag/base.py | 22 ++--- lightrag/kg/json_kv_impl.py | 78 +++++++++--------- lightrag/kg/postgres_impl.py | 49 +++++++++-- lightrag/lightrag.py | 22 ++--- 6 files changed, 347 insertions(+), 66 deletions(-) diff --git a/README-zh.md b/README-zh.md index 9e892012..685c9468 100644 --- a/README-zh.md +++ b/README-zh.md @@ -932,6 +932,94 @@ rag.insert_custom_kg(custom_kg) +## 删除功能 + +LightRAG提供了全面的删除功能,允许您删除文档、实体和关系。 + +
+ 删除实体 + +您可以通过实体名称删除实体及其所有关联关系: + +```python +# 删除实体及其所有关系(同步版本) +rag.delete_by_entity("Google") + +# 异步版本 +await rag.adelete_by_entity("Google") +``` + +删除实体时会: +- 从知识图谱中移除该实体节点 +- 删除该实体的所有关联关系 +- 从向量数据库中移除相关的嵌入向量 +- 保持知识图谱的完整性 + +
+ +
+ 删除关系 + +您可以删除两个特定实体之间的关系: + +```python +# 删除两个实体之间的关系(同步版本) +rag.delete_by_relation("Google", "Gmail") + +# 异步版本 +await rag.adelete_by_relation("Google", "Gmail") +``` + +删除关系时会: +- 移除指定的关系边 +- 从向量数据库中删除关系的嵌入向量 +- 保留两个实体节点及其他关系 + +
+ +
+ 通过文档ID删除 + +您可以通过文档ID删除整个文档及其相关的所有知识: + +```python +# 通过文档ID删除(异步版本) +await rag.adelete_by_doc_id("doc-12345") +``` + +通过文档ID删除时的优化处理: +- **智能清理**:自动识别并删除仅属于该文档的实体和关系 +- **保留共享知识**:如果实体或关系在其他文档中也存在,则会保留并重新构建描述 +- **缓存优化**:清理相关的LLM缓存以减少存储开销 +- **增量重建**:从剩余文档重新构建受影响的实体和关系描述 + +删除过程包括: +1. 删除文档相关的所有文本块 +2. 识别仅属于该文档的实体和关系并删除 +3. 重新构建在其他文档中仍存在的实体和关系 +4. 更新所有相关的向量索引 +5. 清理文档状态记录 + +注意:通过文档ID删除是一个异步操作,因为它涉及复杂的知识图谱重构过程。 + +
+ +
+ 删除注意事项 + +**重要提醒:** + +1. **不可逆操作**:所有删除操作都是不可逆的,请谨慎使用 +2. **性能考虑**:删除大量数据时可能需要一些时间,特别是通过文档ID删除 +3. **数据一致性**:删除操作会自动维护知识图谱和向量数据库之间的一致性 +4. **备份建议**:在执行重要删除操作前建议备份数据 + +**批量删除建议:** +- 对于批量删除操作,建议使用异步方法以获得更好的性能 +- 大规模删除时,考虑分批进行以避免系统负载过高 + +
+ ## 实体合并
diff --git a/README.md b/README.md index 8fdf4439..31c03bff 100644 --- a/README.md +++ b/README.md @@ -988,6 +988,160 @@ These operations maintain data consistency across both the graph database and ve
+## Delete Functions + +LightRAG provides comprehensive deletion capabilities, allowing you to delete documents, entities, and relationships. + +
+ Delete Entities + +You can delete entities by their name along with all associated relationships: + +```python +# Delete entity and all its relationships (synchronous version) +rag.delete_by_entity("Google") + +# Asynchronous version +await rag.adelete_by_entity("Google") +``` + +When deleting an entity: +- Removes the entity node from the knowledge graph +- Deletes all associated relationships +- Removes related embedding vectors from the vector database +- Maintains knowledge graph integrity + +
+ +
+ Delete Relations + +You can delete relationships between two specific entities: + +```python +# Delete relationship between two entities (synchronous version) +rag.delete_by_relation("Google", "Gmail") + +# Asynchronous version +await rag.adelete_by_relation("Google", "Gmail") +``` + +When deleting a relationship: +- Removes the specified relationship edge +- Deletes the relationship's embedding vector from the vector database +- Preserves both entity nodes and their other relationships + +
+ +
+ Delete by Document ID + +You can delete an entire document and all its related knowledge through document ID: + +```python +# Delete by document ID (asynchronous version) +await rag.adelete_by_doc_id("doc-12345") +``` + +Optimized processing when deleting by document ID: +- **Smart Cleanup**: Automatically identifies and removes entities and relationships that belong only to this document +- **Preserve Shared Knowledge**: If entities or relationships exist in other documents, they are preserved and their descriptions are rebuilt +- **Cache Optimization**: Clears related LLM cache to reduce storage overhead +- **Incremental Rebuilding**: Reconstructs affected entity and relationship descriptions from remaining documents + +The deletion process includes: +1. Delete all text chunks related to the document +2. Identify and delete entities and relationships that belong only to this document +3. Rebuild entities and relationships that still exist in other documents +4. Update all related vector indexes +5. Clean up document status records + +Note: Deletion by document ID is an asynchronous operation as it involves complex knowledge graph reconstruction processes. + +
+ +**Important Reminders:** + +1. **Irreversible Operations**: All deletion operations are irreversible, please use with caution +2. **Performance Considerations**: Deleting large amounts of data may take some time, especially deletion by document ID +3. **Data Consistency**: Deletion operations automatically maintain consistency between the knowledge graph and vector database +4. **Backup Recommendations**: Consider backing up data before performing important deletion operations + +**Batch Deletion Recommendations:** +- For batch deletion operations, consider using asynchronous methods for better performance +- For large-scale deletions, consider processing in batches to avoid excessive system load + +## Entity Merging + +
+ Merge Entities and Their Relationships + +LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships: + +```python +# Basic entity merging +rag.merge_entities( + source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"], + target_entity="AI Technology" +) +``` + +With custom merge strategy: + +```python +# Define custom merge strategy for different fields +rag.merge_entities( + source_entities=["John Smith", "Dr. Smith", "J. Smith"], + target_entity="John Smith", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "entity_type": "keep_first", # Keep the entity type from the first entity + "source_id": "join_unique" # Combine all unique source IDs + } +) +``` + +With custom target entity data: + +```python +# Specify exact values for the merged entity +rag.merge_entities( + source_entities=["New York", "NYC", "Big Apple"], + target_entity="New York City", + target_entity_data={ + "entity_type": "LOCATION", + "description": "New York City is the most populous city in the United States.", + } +) +``` + +Advanced usage combining both approaches: + +```python +# Merge company entities with both strategy and custom data +rag.merge_entities( + source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"], + target_entity="Microsoft", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "source_id": "join_unique" # Combine source IDs + }, + target_entity_data={ + "entity_type": "ORGANIZATION", + } +) +``` + +When merging entities: + +* All relationships from source entities are redirected to the target entity +* Duplicate relationships are intelligently merged +* Self-relationships (loops) are prevented +* Source entities are removed after merging +* Relationship weights and attributes are preserved + +
+ ## Entity Merging
diff --git a/lightrag/base.py b/lightrag/base.py index e66a67db..b8e4d642 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -278,20 +278,20 @@ class BaseKVStorage(StorageNameSpace, ABC): False: if the cache drop failed, or the cache mode is not supported """ - async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: - """Delete specific cache records from storage by chunk IDs + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed - Args: - chunk_ids (list[str]): List of chunk IDs to be dropped from storage + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage - Returns: - True: if the cache drop successfully - False: if the cache drop failed, or the operation is not supported - """ + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed, or the operation is not supported + # """ @dataclass diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2345e50f..fa819d4a 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -172,52 +172,52 @@ class JsonKVStorage(BaseKVStorage): except Exception: return False - async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: - """Delete specific cache records from storage by chunk IDs + # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + # """Delete specific cache records from storage by chunk IDs - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + # Importance notes for in-memory storage: + # 1. Changes will be persisted to disk during the next index_done_callback + # 2. update flags to notify other processes that data persistence is needed - Args: - chunk_ids (list[str]): List of chunk IDs to be dropped from storage + # Args: + # chunk_ids (list[str]): List of chunk IDs to be dropped from storage - Returns: - True: if the cache drop successfully - False: if the cache drop failed - """ - if not chunk_ids: - return False + # Returns: + # True: if the cache drop successfully + # False: if the cache drop failed + # """ + # if not chunk_ids: + # return False - try: - async with self._storage_lock: - # Iterate through all cache modes to find entries with matching chunk_ids - for mode_key, mode_data in list(self._data.items()): - if isinstance(mode_data, dict): - # Check each cached entry in this mode - for cache_key, cache_entry in list(mode_data.items()): - if ( - isinstance(cache_entry, dict) - and cache_entry.get("chunk_id") in chunk_ids - ): - # Remove this cache entry - del mode_data[cache_key] - logger.debug( - f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" - ) + # try: + # async with self._storage_lock: + # # Iterate through all cache modes to find entries with matching chunk_ids + # for mode_key, mode_data in list(self._data.items()): + # if isinstance(mode_data, dict): + # # Check each cached entry in this mode + # for cache_key, cache_entry in list(mode_data.items()): + # if ( + # isinstance(cache_entry, dict) + # and cache_entry.get("chunk_id") in chunk_ids + # ): + # # Remove this cache entry + # del mode_data[cache_key] + # logger.debug( + # f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" + # ) - # If the mode is now empty, remove it entirely - if not mode_data: - del self._data[mode_key] + # # If the mode is now empty, remove it entirely + # if not mode_data: + # del self._data[mode_key] - # Set update flags to notify persistence is needed - await set_all_update_flags(self.namespace) + # # Set update flags to notify persistence is needed + # await set_all_update_flags(self.namespace) - logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") - return True - except Exception as e: - logger.error(f"Error clearing cache by chunk IDs: {e}") - return False + # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") + # return True + # except Exception as e: + # logger.error(f"Error clearing cache by chunk IDs: {e}") + # return False async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 465643f0..f9dabafb 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -106,6 +106,35 @@ class PostgreSQLDB: ): pass + async def _migrate_llm_cache_add_chunk_id(self): + """Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" + try: + # Check if chunk_id column exists + check_column_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_llm_cache' + AND column_name = 'chunk_id' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") + add_column_sql = """ + ALTER TABLE LIGHTRAG_LLM_CACHE + ADD COLUMN chunk_id VARCHAR(255) NULL + """ + await self.execute(add_column_sql) + logger.info( + "Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table" + ) + else: + logger.info( + "chunk_id column already exists in LIGHTRAG_LLM_CACHE table" + ) + except Exception as e: + logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}") + async def _migrate_timestamp_columns(self): """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time""" # Tables and columns that need migration @@ -203,6 +232,13 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") # Don't throw an exception, allow the initialization process to continue + # Migrate LLM cache table to add chunk_id field if needed + try: + await self._migrate_llm_cache_add_chunk_id() + except Exception as e: + logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") + # Don't throw an exception, allow the initialization process to continue + async def query( self, sql: str, @@ -497,6 +533,7 @@ class PGKVStorage(BaseKVStorage): "original_prompt": v["original_prompt"], "return_value": v["return"], "mode": mode, + "chunk_id": v.get("chunk_id"), } await self.db.execute(upsert_sql, _data) @@ -2357,6 +2394,7 @@ TABLES = { mode varchar(32) NOT NULL, original_prompt TEXT, return_value TEXT, + chunk_id VARCHAR(255) NULL, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id) @@ -2389,10 +2427,10 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2 """, - "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 """, - "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3 """, "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content @@ -2402,7 +2440,7 @@ SQL_TEMPLATES = { chunk_order_index, full_doc_id, file_path FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids}) """, - "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode + "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids}) """, "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", @@ -2411,12 +2449,13 @@ SQL_TEMPLATES = { ON CONFLICT (workspace,id) DO UPDATE SET content = $2, update_time = CURRENT_TIMESTAMP """, - "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode) - VALUES ($1, $2, $3, $4, $5) + "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (workspace,mode,id) DO UPDATE SET original_prompt = EXCLUDED.original_prompt, return_value=EXCLUDED.return_value, mode=EXCLUDED.mode, + chunk_id=EXCLUDED.chunk_id, update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f1c3747b..d299080a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1710,17 +1710,17 @@ class LightRAG: chunk_ids = set(related_chunks.keys()) logger.info(f"Found {len(chunk_ids)} chunks to delete") - # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks - logger.info("Clearing LLM cache for related chunks...") - cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( - list(chunk_ids) - ) - if cache_cleared: - logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") - else: - logger.warning( - "Failed to clear chunk cache or cache clearing not supported" - ) + # # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks + # logger.info("Clearing LLM cache for related chunks...") + # cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( + # list(chunk_ids) + # ) + # if cache_cleared: + # logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") + # else: + # logger.warning( + # "Failed to clear chunk cache or cache clearing not supported" + # ) # 4. Analyze entities and relationships that will be affected entities_to_delete = set() From c947b20bb162710d0fc23094f200042e08c1cc36 Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Sun, 22 Jun 2025 16:43:18 +0800 Subject: [PATCH 05/25] Update README.md --- README.md | 71 ------------------------------------------------------- 1 file changed, 71 deletions(-) diff --git a/README.md b/README.md index 31c03bff..fa227c60 100644 --- a/README.md +++ b/README.md @@ -1142,77 +1142,6 @@ When merging entities:
-## Entity Merging - -
- Merge Entities and Their Relationships - -LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships: - -```python -# Basic entity merging -rag.merge_entities( - source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"], - target_entity="AI Technology" -) -``` - -With custom merge strategy: - -```python -# Define custom merge strategy for different fields -rag.merge_entities( - source_entities=["John Smith", "Dr. Smith", "J. Smith"], - target_entity="John Smith", - merge_strategy={ - "description": "concatenate", # Combine all descriptions - "entity_type": "keep_first", # Keep the entity type from the first entity - "source_id": "join_unique" # Combine all unique source IDs - } -) -``` - -With custom target entity data: - -```python -# Specify exact values for the merged entity -rag.merge_entities( - source_entities=["New York", "NYC", "Big Apple"], - target_entity="New York City", - target_entity_data={ - "entity_type": "LOCATION", - "description": "New York City is the most populous city in the United States.", - } -) -``` - -Advanced usage combining both approaches: - -```python -# Merge company entities with both strategy and custom data -rag.merge_entities( - source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"], - target_entity="Microsoft", - merge_strategy={ - "description": "concatenate", # Combine all descriptions - "source_id": "join_unique" # Combine source IDs - }, - target_entity_data={ - "entity_type": "ORGANIZATION", - } -) -``` - -When merging entities: - -* All relationships from source entities are redirected to the target entity -* Duplicate relationships are intelligently merged -* Self-relationships (loops) are prevented -* Source entities are removed after merging -* Relationship weights and attributes are preserved - -
- ## Multimodal Document Processing (RAG-Anything Integration) LightRAG now seamlessly integrates with [RAG-Anything](https://github.com/HKUDS/RAG-Anything), a comprehensive **All-in-One Multimodal Document Processing RAG system** built specifically for LightRAG. RAG-Anything enables advanced parsing and retrieval-augmented generation (RAG) capabilities, allowing you to handle multimodal documents seamlessly and extract structured content—including text, images, tables, and formulas—from various document formats for integration into your RAG pipeline. From 9fae0eadff774980d7a820325b03aff7c3771156 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 09:57:56 +0800 Subject: [PATCH 06/25] feat: Ensure thread safety for graph write operations Add a lock to delete, adelete_by_entity, and adelete_by_relation methods to prevent race conditions and ensure data consistency during concurrent modifications to the knowledge graph. --- lightrag/lightrag.py | 229 +++++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 109 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index d299080a..c16b65f3 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -35,6 +35,7 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, + get_graph_db_lock, ) from .base import ( @@ -1728,109 +1729,113 @@ class LightRAG: relationships_to_delete = set() relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Process entities - all_labels = await self.chunk_entity_relation_graph.get_all_labels() - for node_label in all_labels: - node_data = await self.chunk_entity_relation_graph.get_node(node_label) - if node_data and "source_id" in node_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: - if not remaining_sources: - entities_to_delete.add(node_label) - logger.debug( - f"Entity {node_label} marked for deletion - no remaining sources" - ) - elif remaining_sources != sources: - # Entity needs to be rebuilt from remaining chunks - entities_to_rebuild[node_label] = remaining_sources - logger.debug( - f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" - ) + # Process entities + all_labels = await self.chunk_entity_relation_graph.get_all_labels() + for node_label in all_labels: + node_data = await self.chunk_entity_relation_graph.get_node(node_label) + if node_data and "source_id" in node_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Process relationships - for node_label in all_labels: - node_edges = await self.chunk_entity_relation_graph.get_node_edges( - node_label - ) - if node_edges: - for src, tgt in node_edges: - edge_data = await self.chunk_entity_relation_graph.get_edge( - src, tgt - ) - if edge_data and "source_id" in edge_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + if not remaining_sources: + entities_to_delete.add(node_label) + logger.debug( + f"Entity {node_label} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Entity needs to be rebuilt from remaining chunks + entities_to_rebuild[node_label] = remaining_sources + logger.debug( + f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) - if not remaining_sources: - relationships_to_delete.add((src, tgt)) - logger.debug( - f"Relationship {src}-{tgt} marked for deletion - no remaining sources" - ) - elif remaining_sources != sources: - # Relationship needs to be rebuilt from remaining chunks - relationships_to_rebuild[(src, tgt)] = remaining_sources - logger.debug( - f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" - ) - - # 5. Delete chunks from storage - if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - logger.info(f"Deleted {len(chunk_ids)} chunks from storage") - - # 6. Delete entities that have no remaining sources - if entities_to_delete: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - logger.info(f"Deleted {len(entities_to_delete)} entities") - - # 7. Delete relationships that have no remaining sources - if relationships_to_delete: - # Delete from vector database - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] + # Process relationships + for node_label in all_labels: + node_edges = await self.chunk_entity_relation_graph.get_node_edges( + node_label ) - await self.relationships_vdb.delete(rel_ids_to_delete) + if node_edges: + for src, tgt in node_edges: + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + if edge_data and "source_id" in edge_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - logger.info(f"Deleted {len(relationships_to_delete)} relationships") + if not remaining_sources: + relationships_to_delete.add((src, tgt)) + logger.debug( + f"Relationship {src}-{tgt} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Relationship needs to be rebuilt from remaining chunks + relationships_to_rebuild[(src, tgt)] = remaining_sources + logger.debug( + f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) - # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks - if entities_to_rebuild or relationships_to_rebuild: - logger.info( - f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." - ) - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - ) + # 5. Delete chunks from storage + if chunk_ids: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + + # 6. Delete entities that have no remaining sources + if entities_to_delete: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + logger.info(f"Deleted {len(entities_to_delete)} entities") + + # 7. Delete relationships that have no remaining sources + if relationships_to_delete: + # Delete from vector database + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + logger.info(f"Deleted {len(relationships_to_delete)} relationships") + + # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + logger.info( + f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." + ) + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + ) # 9. Delete original document and status await self.full_docs.delete([doc_id]) @@ -1857,12 +1862,15 @@ class LightRAG: """ from .utils_graph import adelete_by_entity - return await adelete_by_entity( - self.chunk_entity_relation_graph, - self.entities_vdb, - self.relationships_vdb, - entity_name, - ) + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + return await adelete_by_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name, + ) def delete_by_entity(self, entity_name: str) -> None: loop = always_get_an_event_loop() @@ -1877,12 +1885,15 @@ class LightRAG: """ from .utils_graph import adelete_by_relation - return await adelete_by_relation( - self.chunk_entity_relation_graph, - self.relationships_vdb, - source_entity, - target_entity, - ) + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + return await adelete_by_relation( + self.chunk_entity_relation_graph, + self.relationships_vdb, + source_entity, + target_entity, + ) def delete_by_relation(self, source_entity: str, target_entity: str) -> None: loop = always_get_an_event_loop() From a0be65d5d97c830345ca1ced75e1299729446088 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 17:59:27 +0800 Subject: [PATCH 07/25] Refac: Return status and messages for delete by doc id operaton --- lightrag/base.py | 8 ++++++++ lightrag/lightrag.py | 45 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index b8e4d642..b04ce3b2 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -613,3 +613,11 @@ class StoragesStatus(str, Enum): CREATED = "created" INITIALIZED = "initialized" FINALIZED = "finalized" + +@dataclass +class DeletionResult: + """Represents the result of a deletion operation.""" + status: Literal["success", "not_found", "failure"] + doc_id: str + message: str + status_code: int = 200 diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index c16b65f3..ea6f23f1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -48,6 +48,7 @@ from .base import ( QueryParam, StorageNameSpace, StoragesStatus, + DeletionResult, ) from .namespace import NameSpace, make_namespace from .operate import ( @@ -1677,7 +1678,7 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - async def adelete_by_doc_id(self, doc_id: str) -> None: + async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult: """Delete a document and all its related data with cache cleanup and reconstruction Optimized version that: @@ -1691,7 +1692,12 @@ class LightRAG: # 1. Get the document status and related data if not await self.doc_status.get_by_id(doc_id): logger.warning(f"Document {doc_id} not found") - return + return DeletionResult( + status="not_found", + doc_id=doc_id, + message=f"Document {doc_id} not found.", + status_code=404, + ) logger.info(f"Starting optimized deletion for document {doc_id}") @@ -1706,7 +1712,15 @@ class LightRAG: if not related_chunks: logger.warning(f"No chunks found for document {doc_id}") - return + # Still need to delete the doc status and full doc + await self.full_docs.delete([doc_id]) + await self.doc_status.delete([doc_id]) + return DeletionResult( + status="success", + doc_id=doc_id, + message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.", + status_code=200, + ) chunk_ids = set(related_chunks.keys()) logger.info(f"Found {len(chunk_ids)} chunks to delete") @@ -1844,15 +1858,28 @@ class LightRAG: # 10. Ensure all indexes are updated await self._insert_done() - logger.info( - f"Successfully deleted document {doc_id}. " - f"Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. " - f"Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships." + success_message = f"""Successfully deleted document {doc_id}. +Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. +Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships.""" + + logger.info(success_message) + return DeletionResult( + status="success", + doc_id=doc_id, + message=success_message, + status_code=200, ) except Exception as e: - logger.error(f"Error while deleting document {doc_id}: {e}") - raise + error_message = f"Error while deleting document {doc_id}: {e}" + logger.error(error_message) + logger.error(traceback.format_exc()) + return DeletionResult( + status="failure", + doc_id=doc_id, + message=error_message, + status_code=500, + ) async def adelete_by_entity(self, entity_name: str) -> None: """Asynchronously delete an entity and all its relationships. From dffe659388ba2fecaa0de43c1e8c9507921b4dc9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:10:40 +0800 Subject: [PATCH 08/25] Feat: Add document deletion by ID API endpoint - New DELETE endpoint for document removal - Implements doc_id-based deletion - Handles pipeline status during operation - Includes proper error handling - Updates pipeline status messages --- lightrag/api/routers/document_routes.py | 108 +++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index b2e9baf8..16dd4b0e 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -12,7 +12,14 @@ import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal -from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + HTTPException, + UploadFile, +) from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG @@ -252,6 +259,10 @@ Attributes: """ +class DeleteDocRequest(BaseModel): + doc_id: str = Field(..., description="The ID of the document to delete.") + + class DocStatusResponse(BaseModel): id: str = Field(description="Document identifier") content_summary: str = Field(description="Summary of document content") @@ -1318,6 +1329,100 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + class DeleteDocByIdResponse(BaseModel): + """Response model for single document deletion operation.""" + + status: Literal["success", "fail", "not_found", "busy"] = Field( + description="Status of the deletion operation" + ) + message: str = Field(description="Message describing the operation result") + doc_id: Optional[str] = Field(default=None, description="The ID of the document.") + + @router.delete( + "/delete_by_doc_id", + response_model=DeleteDocByIdResponse, + dependencies=[Depends(combined_auth)], + summary="Delete a document and all its associated data by its ID.", + ) + async def delete_by_doc_id( + delete_request: DeleteDocRequest, + ) -> DeleteDocByIdResponse: + """ + Deletes a specific document and all its associated data, including its status, + text chunks, vector embeddings, and any related graph data. + + This operation is irreversible and will interact with the pipeline status. + + Args: + delete_request (DeleteDocRequest): The request containing the document ID. + + Returns: + DeleteDocByIdResponse: The result of the deletion operation. + - status="success": The document was successfully deleted. + - status="not_found": The document with the specified ID was not found. + - status="fail": The deletion operation failed. + - status="busy": The pipeline is busy with another operation. + + Raises: + HTTPException: + - 500: If an unexpected internal error occurs. + """ + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + ) + + doc_id = delete_request.doc_id + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete document while pipeline is busy", + doc_id=doc_id, + ) + pipeline_status.update( + { + "busy": True, + "job_name": f"Deleting Document: {doc_id}", + "job_start": datetime.now().isoformat(), + "latest_message": "Starting document deletion process", + } + ) + # Use slice assignment to clear the list in place + pipeline_status["history_messages"][:] = [ + f"Starting deletion for doc_id: {doc_id}" + ] + + try: + result = await rag.adelete_by_doc_id(doc_id) + response_data = { + "doc_id": result.doc_id, + "message": result.message, + "status": result.status, + } + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(result.message) + return DeleteDocByIdResponse(**response_data) + + except Exception as e: + error_msg = f"Error deleting document {doc_id}: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(error_msg) + # Re-raise as HTTPException for consistent error handling by FastAPI + raise HTTPException(status_code=500, detail=error_msg) + finally: + async with pipeline_status_lock: + pipeline_status["busy"] = False + completion_msg = f"Document deletion process for {doc_id} completed." + pipeline_status["latest_message"] = completion_msg + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(completion_msg) + @router.post( "/clear_cache", response_model=ClearCacheResponse, @@ -1372,3 +1477,4 @@ def create_document_routes( raise HTTPException(status_code=500, detail=str(e)) return router + From a215939c41108a1047eea767905c79e8ab4a0478 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:39:36 +0800 Subject: [PATCH 09/25] Refac: Avoid duplicate edge processing in adelete_by_doc_id --- lightrag/lightrag.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ea6f23f1..69bb2c28 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1775,12 +1775,21 @@ class LightRAG: ) if node_edges: for src, tgt in node_edges: + # To avoid processing the same edge twice in an undirected graph + if ( + (tgt, src) in relationships_to_delete + or (tgt, src) in relationships_to_rebuild + ): + continue + edge_data = await self.chunk_entity_relation_graph.get_edge( src, tgt ) if edge_data and "source_id" in edge_data: # Split source_id using GRAPH_FIELD_SEP - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + sources = set( + edge_data["source_id"].split(GRAPH_FIELD_SEP) + ) remaining_sources = sources - chunk_ids if not remaining_sources: @@ -1790,7 +1799,9 @@ class LightRAG: ) elif remaining_sources != sources: # Relationship needs to be rebuilt from remaining chunks - relationships_to_rebuild[(src, tgt)] = remaining_sources + relationships_to_rebuild[ + (src, tgt) + ] = remaining_sources logger.debug( f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" ) From 5099ac8213e6bfbe01a9296adb90ba088c3453ba Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:41:30 +0800 Subject: [PATCH 10/25] Fix linting --- lightrag/api/routers/document_routes.py | 5 +++-- lightrag/base.py | 2 ++ lightrag/lightrag.py | 19 ++++++++++--------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 16dd4b0e..2dcca47a 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1336,7 +1336,9 @@ def create_document_routes( description="Status of the deletion operation" ) message: str = Field(description="Message describing the operation result") - doc_id: Optional[str] = Field(default=None, description="The ID of the document.") + doc_id: Optional[str] = Field( + default=None, description="The ID of the document." + ) @router.delete( "/delete_by_doc_id", @@ -1477,4 +1479,3 @@ def create_document_routes( raise HTTPException(status_code=500, detail=str(e)) return router - diff --git a/lightrag/base.py b/lightrag/base.py index b04ce3b2..47a390b5 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -614,9 +614,11 @@ class StoragesStatus(str, Enum): INITIALIZED = "initialized" FINALIZED = "finalized" + @dataclass class DeletionResult: """Represents the result of a deletion operation.""" + status: Literal["success", "not_found", "failure"] doc_id: str message: str diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 69bb2c28..820dd527 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1746,11 +1746,12 @@ class LightRAG: # Use graph database lock to ensure atomic merges and updates graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: - # Process entities all_labels = await self.chunk_entity_relation_graph.get_all_labels() for node_label in all_labels: - node_data = await self.chunk_entity_relation_graph.get_node(node_label) + node_data = await self.chunk_entity_relation_graph.get_node( + node_label + ) if node_data and "source_id" in node_data: # Split source_id using GRAPH_FIELD_SEP sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) @@ -1776,10 +1777,10 @@ class LightRAG: if node_edges: for src, tgt in node_edges: # To avoid processing the same edge twice in an undirected graph - if ( - (tgt, src) in relationships_to_delete - or (tgt, src) in relationships_to_rebuild - ): + if (tgt, src) in relationships_to_delete or ( + tgt, + src, + ) in relationships_to_rebuild: continue edge_data = await self.chunk_entity_relation_graph.get_edge( @@ -1799,9 +1800,9 @@ class LightRAG: ) elif remaining_sources != sources: # Relationship needs to be rebuilt from remaining chunks - relationships_to_rebuild[ - (src, tgt) - ] = remaining_sources + relationships_to_rebuild[(src, tgt)] = ( + remaining_sources + ) logger.debug( f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" ) From b8a2d5b2230be9e681d1142ddd2cd3903f1da25e Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:42:34 +0800 Subject: [PATCH 11/25] Bump api version to 0174 --- lightrag/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index 5d14dfb6..7a4a498a 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0173" +__api_version__ = "0174" From ebcabe29ca0d5ed95a6fec1f44b4505afe799798 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:46:01 +0800 Subject: [PATCH 12/25] Remove duplicated graph db lock --- lightrag/lightrag.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 820dd527..287ed5ee 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1901,15 +1901,12 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re """ from .utils_graph import adelete_by_entity - # Use graph database lock to ensure atomic merges and updates - graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: - return await adelete_by_entity( - self.chunk_entity_relation_graph, - self.entities_vdb, - self.relationships_vdb, - entity_name, - ) + return await adelete_by_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name, + ) def delete_by_entity(self, entity_name: str) -> None: loop = always_get_an_event_loop() @@ -1924,15 +1921,12 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re """ from .utils_graph import adelete_by_relation - # Use graph database lock to ensure atomic merges and updates - graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: - return await adelete_by_relation( - self.chunk_entity_relation_graph, - self.relationships_vdb, - source_entity, - target_entity, - ) + return await adelete_by_relation( + self.chunk_entity_relation_graph, + self.relationships_vdb, + source_entity, + target_entity, + ) def delete_by_relation(self, source_entity: str, target_entity: str) -> None: loop = always_get_an_event_loop() From f60bad92f0d72bbf74e8b4ec688ab72008bb6f6f Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 19:20:10 +0800 Subject: [PATCH 13/25] Fix: Delete relations from vector DB in both directions --- lightrag/utils_graph.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 54876fa7..ce837dc0 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -84,10 +84,12 @@ async def adelete_by_relation( return # Delete relation from vector database - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await relationships_vdb.delete([relation_id]) + rel_ids_to_delete = [ + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + + await relationships_vdb.delete(rel_ids_to_delete) # Delete relation from knowledge graph await chunk_entity_relation_graph.remove_edges( From 1697ea4bf6fc417d0ca62d0f6793361174441dd4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 19:20:35 +0800 Subject: [PATCH 14/25] Fix linting --- lightrag/utils_graph.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index ce837dc0..392d1c69 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -85,10 +85,10 @@ async def adelete_by_relation( # Delete relation from vector database rel_ids_to_delete = [ - compute_mdhash_id(source_entity + target_entity, prefix="rel-"), - compute_mdhash_id(target_entity + source_entity, prefix="rel-"), - ] - + compute_mdhash_id(source_entity + target_entity, prefix="rel-"), + compute_mdhash_id(target_entity + source_entity, prefix="rel-"), + ] + await relationships_vdb.delete(rel_ids_to_delete) # Delete relation from knowledge graph From ce50135efb9c6f5f4f7dcb6ed1a5dc2825bac778 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 21:08:51 +0800 Subject: [PATCH 15/25] Improved docstring for document deletion method --- lightrag/lightrag.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 287ed5ee..98a9157c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1679,14 +1679,29 @@ class LightRAG: return found_statuses async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult: - """Delete a document and all its related data with cache cleanup and reconstruction + """Delete a document and all its related data, including chunks, graph elements, and cached entries. - Optimized version that: - 1. Clears LLM cache for related chunks - 2. Rebuilds entity and relationship descriptions from remaining chunks + This method orchestrates a comprehensive deletion process for a given document ID. + It ensures that not only the document itself but also all its derived and associated + data across different storage layers are removed. This includes: + 1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`. + 2. **Chunks**: Removes all associated text chunks from `chunks_vdb`. + 3. **Graph Data**: + - Deletes related entities from `entities_vdb`. + - Deletes related relationships from `relationships_vdb`. + - Removes corresponding nodes and edges from the `chunk_entity_relation_graph`. + 4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers + a reconstruction of their data from the remaining chunks to ensure consistency. Args: - doc_id: Document ID to delete + doc_id (str): The unique identifier of the document to be deleted. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + - `success` (bool): True if the deletion was successful, False otherwise. + - `message` (str): A summary of the operation's result. + - `deleted_ids` (dict[str, list[str]]): A dictionary detailing the IDs of + all deleted items, categorized by storage type (e.g., "chunks", "entities"). """ try: # 1. Get the document status and related data From bd487dd252ab5155bf752341bc64d4ff54df1f01 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 21:38:47 +0800 Subject: [PATCH 16/25] Unify document APIs returen status string --- lightrag/api/routers/document_routes.py | 4 ++-- lightrag/base.py | 2 +- lightrag/lightrag.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 2dcca47a..f5bdf01d 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1341,12 +1341,12 @@ def create_document_routes( ) @router.delete( - "/delete_by_doc_id", + "/delete_document", response_model=DeleteDocByIdResponse, dependencies=[Depends(combined_auth)], summary="Delete a document and all its associated data by its ID.", ) - async def delete_by_doc_id( + async def delete_document( delete_request: DeleteDocRequest, ) -> DeleteDocByIdResponse: """ diff --git a/lightrag/base.py b/lightrag/base.py index 47a390b5..84fc7564 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -619,7 +619,7 @@ class StoragesStatus(str, Enum): class DeletionResult: """Represents the result of a deletion operation.""" - status: Literal["success", "not_found", "failure"] + status: Literal["success", "not_found", "fail"] doc_id: str message: str status_code: int = 200 diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 98a9157c..ec40d197 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1698,10 +1698,10 @@ class LightRAG: Returns: DeletionResult: An object containing the outcome of the deletion process. - - `success` (bool): True if the deletion was successful, False otherwise. + - `status` (str): "success", "not_found", or "failure". + - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `deleted_ids` (dict[str, list[str]]): A dictionary detailing the IDs of - all deleted items, categorized by storage type (e.g., "chunks", "entities"). + - `status_code` (int): HTTP status code (e.g., 200, 404, 500). """ try: # 1. Get the document status and related data @@ -1902,7 +1902,7 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re logger.error(error_message) logger.error(traceback.format_exc()) return DeletionResult( - status="failure", + status="fail", doc_id=doc_id, message=error_message, status_code=500, From e6baffe10c1d3a0c242247c3c44b9d89097eef65 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 21:39:45 +0800 Subject: [PATCH 17/25] Add retrun status to entity and relation delete operations --- lightrag/lightrag.py | 41 +++++++++++++++++++----- lightrag/utils_graph.py | 69 ++++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 21 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ec40d197..1a0b4bfd 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1908,11 +1908,14 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re status_code=500, ) - async def adelete_by_entity(self, entity_name: str) -> None: + async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships. Args: - entity_name: Name of the entity to delete + entity_name: Name of the entity to delete. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. """ from .utils_graph import adelete_by_entity @@ -1923,16 +1926,29 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re entity_name, ) - def delete_by_entity(self, entity_name: str) -> None: + def delete_by_entity(self, entity_name: str) -> DeletionResult: + """Synchronously delete an entity and all its relationships. + + Args: + entity_name: Name of the entity to delete. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + """ loop = always_get_an_event_loop() return loop.run_until_complete(self.adelete_by_entity(entity_name)) - async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: + async def adelete_by_relation( + self, source_entity: str, target_entity: str + ) -> DeletionResult: """Asynchronously delete a relation between two entities. Args: - source_entity: Name of the source entity - target_entity: Name of the target entity + source_entity: Name of the source entity. + target_entity: Name of the target entity. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. """ from .utils_graph import adelete_by_relation @@ -1943,7 +1959,18 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re target_entity, ) - def delete_by_relation(self, source_entity: str, target_entity: str) -> None: + def delete_by_relation( + self, source_entity: str, target_entity: str + ) -> DeletionResult: + """Synchronously delete a relation between two entities. + + Args: + source_entity: Name of the source entity. + target_entity: Name of the target entity. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + """ loop = always_get_an_event_loop() return loop.run_until_complete( self.adelete_by_relation(source_entity, target_entity) diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 392d1c69..5485d47c 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -4,6 +4,7 @@ import time import asyncio from typing import Any, cast +from .base import DeletionResult from .kg.shared_storage import get_graph_db_lock from .prompt import GRAPH_FIELD_SEP from .utils import compute_mdhash_id, logger @@ -12,7 +13,7 @@ from .base import StorageNameSpace async def adelete_by_entity( chunk_entity_relation_graph, entities_vdb, relationships_vdb, entity_name: str -) -> None: +) -> DeletionResult: """Asynchronously delete an entity and all its relationships. Args: @@ -25,18 +26,43 @@ async def adelete_by_entity( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: + # Check if the entity exists + if not await chunk_entity_relation_graph.has_node(entity_name): + logger.warning(f"Entity '{entity_name}' not found.") + return DeletionResult( + status="not_found", + doc_id=entity_name, + message=f"Entity '{entity_name}' not found.", + status_code=404, + ) + # Retrieve related relationships before deleting the node + edges = await chunk_entity_relation_graph.get_node_edges(entity_name) + related_relations_count = len(edges) if edges else 0 + await entities_vdb.delete_entity(entity_name) await relationships_vdb.delete_entity_relation(entity_name) await chunk_entity_relation_graph.delete_node(entity_name) - logger.info( - f"Entity '{entity_name}' and its relationships have been deleted." - ) + message = f"Entity '{entity_name}' and its {related_relations_count} relationships have been deleted." + logger.info(message) await _delete_by_entity_done( entities_vdb, relationships_vdb, chunk_entity_relation_graph ) + return DeletionResult( + status="success", + doc_id=entity_name, + message=message, + status_code=200, + ) except Exception as e: - logger.error(f"Error while deleting entity '{entity_name}': {e}") + error_message = f"Error while deleting entity '{entity_name}': {e}" + logger.error(error_message) + return DeletionResult( + status="fail", + doc_id=entity_name, + message=error_message, + status_code=500, + ) async def _delete_by_entity_done( @@ -60,7 +86,7 @@ async def adelete_by_relation( relationships_vdb, source_entity: str, target_entity: str, -) -> None: +) -> DeletionResult: """Asynchronously delete a relation between two entities. Args: @@ -69,6 +95,7 @@ async def adelete_by_relation( source_entity: Name of the source entity target_entity: Name of the target entity """ + relation_str = f"{source_entity} -> {target_entity}" graph_db_lock = get_graph_db_lock(enable_logging=False) # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: @@ -78,10 +105,14 @@ async def adelete_by_relation( source_entity, target_entity ) if not edge_exists: - logger.warning( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" + message = f"Relation from '{source_entity}' to '{target_entity}' does not exist" + logger.warning(message) + return DeletionResult( + status="not_found", + doc_id=relation_str, + message=message, + status_code=404, ) - return # Delete relation from vector database rel_ids_to_delete = [ @@ -96,13 +127,23 @@ async def adelete_by_relation( [(source_entity, target_entity)] ) - logger.info( - f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" - ) + message = f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + logger.info(message) await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + return DeletionResult( + status="success", + doc_id=relation_str, + message=message, + status_code=200, + ) except Exception as e: - logger.error( - f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + error_message = f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + logger.error(error_message) + return DeletionResult( + status="fail", + doc_id=relation_str, + message=error_message, + status_code=500, ) From 1973c80dcacae82e7c7cb8b2342942142a183059 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 22:14:50 +0800 Subject: [PATCH 18/25] Feat: Add entity and relation deletion endpoints --- lightrag/api/routers/document_routes.py | 105 +++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index f5bdf01d..2d66ae1c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -23,7 +23,7 @@ from fastapi import ( from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG -from lightrag.base import DocProcessingStatus, DocStatus +from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args @@ -262,6 +262,36 @@ Attributes: class DeleteDocRequest(BaseModel): doc_id: str = Field(..., description="The ID of the document to delete.") + @field_validator("doc_id", mode="after") + @classmethod + def validate_doc_id(cls, doc_id: str) -> str: + if not doc_id or not doc_id.strip(): + raise ValueError("Document ID cannot be empty") + return doc_id.strip() + + +class DeleteEntityRequest(BaseModel): + entity_name: str = Field(..., description="The name of the entity to delete.") + + @field_validator("entity_name", mode="after") + @classmethod + def validate_entity_name(cls, entity_name: str) -> str: + if not entity_name or not entity_name.strip(): + raise ValueError("Entity name cannot be empty") + return entity_name.strip() + + +class DeleteRelationRequest(BaseModel): + source_entity: str = Field(..., description="The name of the source entity.") + target_entity: str = Field(..., description="The name of the target entity.") + + @field_validator("source_entity", "target_entity", mode="after") + @classmethod + def validate_entity_names(cls, entity_name: str) -> str: + if not entity_name or not entity_name.strip(): + raise ValueError("Entity name cannot be empty") + return entity_name.strip() + class DocStatusResponse(BaseModel): id: str = Field(description="Document identifier") @@ -1478,4 +1508,77 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.delete( + "/delete_entity", + response_model=DeletionResult, + dependencies=[Depends(combined_auth)], + ) + async def delete_entity(request: DeleteEntityRequest): + """ + Delete an entity and all its relationships from the knowledge graph. + + Args: + request (DeleteEntityRequest): The request body containing the entity name. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + + Raises: + HTTPException: If the entity is not found (404) or an error occurs (500). + """ + try: + result = await rag.adelete_by_entity(entity_name=request.entity_name) + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + # Set doc_id to empty string since this is an entity operation, not document + result.doc_id = "" + return result + except HTTPException: + raise + except Exception as e: + error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + + @router.delete( + "/delete_relation", + response_model=DeletionResult, + dependencies=[Depends(combined_auth)], + ) + async def delete_relation(request: DeleteRelationRequest): + """ + Delete a relationship between two entities from the knowledge graph. + + Args: + request (DeleteRelationRequest): The request body containing the source and target entity names. + + Returns: + DeletionResult: An object containing the outcome of the deletion process. + + Raises: + HTTPException: If the relation is not found (404) or an error occurs (500). + """ + try: + result = await rag.adelete_by_relation( + source_entity=request.source_entity, + target_entity=request.target_entity, + ) + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + # Set doc_id to empty string since this is a relation operation, not document + result.doc_id = "" + return result + except HTTPException: + raise + except Exception as e: + error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + return router From c18065a91255b52a441e96d12f4cf8ae3f7edcc3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 22:41:27 +0800 Subject: [PATCH 19/25] Disable document deletion when LLM cache for extraction is off --- lightrag/api/lightrag_server.py | 8 +++++++- lightrag/api/routers/document_routes.py | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index bf4f11f5..0930c1cd 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -355,7 +355,13 @@ def create_app(args): ) # Add routes - app.include_router(create_document_routes(rag, doc_manager, api_key)) + app.include_router( + create_document_routes( + rag, + doc_manager, + api_key, + ) + ) app.include_router(create_query_routes(rag, api_key, args.top_k)) app.include_router(create_graph_routes(rag, api_key)) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 2d66ae1c..813bdc48 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1382,6 +1382,7 @@ def create_document_routes( """ Deletes a specific document and all its associated data, including its status, text chunks, vector embeddings, and any related graph data. + It is disabled when llm cache for entity extraction is disabled. This operation is irreversible and will interact with the pipeline status. @@ -1399,6 +1400,13 @@ def create_document_routes( HTTPException: - 500: If an unexpected internal error occurs. """ + # The rag object is initialized from the server startup args, + # so we can access its properties here. + if not rag.enable_llm_cache_for_entity_extract: + raise HTTPException( + status_code=403, + detail="Operation not allowed when LLM cache for entity extraction is disabled.", + ) from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, From cc12460b382f80277b42d4e8073ee719766da410 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 23:08:56 +0800 Subject: [PATCH 20/25] Fix: Silence PostgreSQL logs during idempotent graph initialization --- lightrag/kg/postgres_impl.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index f9dabafb..bacd8894 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -289,25 +289,31 @@ class PostgreSQLDB: sql: str, data: dict[str, Any] | None = None, upsert: bool = False, + ignore_if_exists: bool = False, with_age: bool = False, graph_name: str | None = None, ): try: async with self.pool.acquire() as connection: # type: ignore if with_age and graph_name: - await self.configure_age(connection, graph_name) # type: ignore + await self.configure_age(connection, graph_name) elif with_age and not graph_name: raise ValueError("Graph name is required when with_age is True") if data is None: - await connection.execute(sql) # type: ignore + await connection.execute(sql) else: - await connection.execute(sql, *data.values()) # type: ignore + await connection.execute(sql, *data.values()) except ( asyncpg.exceptions.UniqueViolationError, asyncpg.exceptions.DuplicateTableError, + asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error + asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists" ) as e: - if upsert: + if ignore_if_exists: + # If the flag is set, just ignore these specific errors + pass + elif upsert: print("Key value duplicate, but upsert succeeded.") else: logger.error(f"Upsert error: {e}") @@ -1212,16 +1218,15 @@ class PGGraphStorage(BaseGraphStorage): ] for query in queries: - try: - await self.db.execute( - query, - upsert=True, - with_age=True, - graph_name=self.graph_name, - ) - # logger.info(f"Successfully executed: {query}") - except Exception: - continue + # Use the new flag to silently ignore "already exists" errors + # at the source, preventing log spam. + await self.db.execute( + query, + upsert=True, + ignore_if_exists=True, # Pass the new flag + with_age=True, + graph_name=self.graph_name, + ) async def finalize(self): if self.db is not None: From 64d0df0caa70f49b20b65cd341ee2d2a7700b0e0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 24 Jun 2025 00:45:17 +0800 Subject: [PATCH 21/25] Remove debug logging from Neo4J operations - Delete node upsert debug log - Remove edge upsert debug messages --- lightrag/kg/neo4j_impl.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 3c57ae34..06315d76 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -766,9 +766,6 @@ class Neo4JStorage(BaseGraphStorage): result = await tx.run( query, entity_id=node_id, properties=properties ) - logger.debug( - f"Upserted node with entity_id '{node_id}' and properties: {properties}" - ) await result.consume() # Ensure result is fully consumed await session.execute_write(execute_upsert) @@ -824,12 +821,7 @@ class Neo4JStorage(BaseGraphStorage): properties=edge_properties, ) try: - records = await result.fetch(2) - if records: - logger.debug( - f"Upserted edge from '{source_node_id}' to '{target_node_id}'" - f"with properties: {edge_properties}" - ) + await result.fetch(2) finally: await result.consume() # Ensure result is consumed From fd69c895cd412c93d17fb5701d48aa8e36b6fdc6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 24 Jun 2025 01:03:02 +0800 Subject: [PATCH 22/25] Remove debug looging from Neo4j --- lightrag/kg/neo4j_impl.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 06315d76..7fe3da15 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -1,4 +1,3 @@ -import inspect import os import re from dataclasses import dataclass @@ -307,7 +306,7 @@ class Neo4JStorage(BaseGraphStorage): for label in node_dict["labels"] if label != "base" ] - logger.debug(f"Neo4j query node {query} return: {node_dict}") + # logger.debug(f"Neo4j query node {query} return: {node_dict}") return node_dict return None finally: @@ -382,9 +381,9 @@ class Neo4JStorage(BaseGraphStorage): return 0 degree = record["degree"] - logger.debug( - f"Neo4j query node degree for {node_id} return: {degree}" - ) + # logger.debug( + # f"Neo4j query node degree for {node_id} return: {degree}" + # ) return degree finally: await result.consume() # Ensure result is fully consumed @@ -424,7 +423,7 @@ class Neo4JStorage(BaseGraphStorage): logger.warning(f"No node found with label '{nid}'") degrees[nid] = 0 - logger.debug(f"Neo4j batch node degree query returned: {degrees}") + # logger.debug(f"Neo4j batch node degree query returned: {degrees}") return degrees async def edge_degree(self, src_id: str, tgt_id: str) -> int: @@ -512,7 +511,7 @@ class Neo4JStorage(BaseGraphStorage): if records: try: edge_result = dict(records[0]["edge_properties"]) - logger.debug(f"Result: {edge_result}") + # logger.debug(f"Result: {edge_result}") # Ensure required keys exist with defaults required_keys = { "weight": 0.0, @@ -528,9 +527,9 @@ class Neo4JStorage(BaseGraphStorage): f"missing {key}, using default: {default_value}" ) - logger.debug( - f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}" - ) + # logger.debug( + # f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}" + # ) return edge_result except (KeyError, TypeError, ValueError) as e: logger.error( @@ -545,9 +544,9 @@ class Neo4JStorage(BaseGraphStorage): "keywords": None, } - logger.debug( - f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}" - ) + # logger.debug( + # f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}" + # ) # Return None when no edge found return None finally: From 5ae945c1e540b59130a6b2066793c3b89658950e Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 24 Jun 2025 01:12:25 +0800 Subject: [PATCH 23/25] Improved error handling for document deletion Added HTTPException for not_found status Added HTTPException for fail status --- lightrag/api/routers/document_routes.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 813bdc48..01a0d00f 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1438,14 +1438,19 @@ def create_document_routes( try: result = await rag.adelete_by_doc_id(doc_id) - response_data = { - "doc_id": result.doc_id, - "message": result.message, - "status": result.status, - } if "history_messages" in pipeline_status: pipeline_status["history_messages"].append(result.message) - return DeleteDocByIdResponse(**response_data) + + if result.status == "not_found": + raise HTTPException(status_code=404, detail=result.message) + if result.status == "fail": + raise HTTPException(status_code=500, detail=result.message) + + return DeleteDocByIdResponse( + doc_id=result.doc_id, + message=result.message, + status=result.status, + ) except Exception as e: error_msg = f"Error deleting document {doc_id}: {str(e)}" From 8b6dcfb6ebcf8b3d0641f8f0a61c0ada025f0ee4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 24 Jun 2025 11:26:38 +0800 Subject: [PATCH 24/25] Pls do not use /delete_document API endpoint --- lightrag/api/routers/document_routes.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 01a0d00f..06862fe6 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1376,10 +1376,14 @@ def create_document_routes( dependencies=[Depends(combined_auth)], summary="Delete a document and all its associated data by its ID.", ) + + # TODO This method needs to be modified to be asynchronous (please do not use) async def delete_document( delete_request: DeleteDocRequest, ) -> DeleteDocByIdResponse: """ + This method needs to be modified to be asynchronous (please do not use) + Deletes a specific document and all its associated data, including its status, text chunks, vector embeddings, and any related graph data. It is disabled when llm cache for entity extraction is disabled. From 2946bbdb71700a3603ec2aea3579cd74bb9cfda1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 24 Jun 2025 11:32:28 +0800 Subject: [PATCH 25/25] Add TODO: There is performance when iterating get_all_labels --- lightrag/lightrag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 1a0b4bfd..f631992d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1762,6 +1762,7 @@ class LightRAG: graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: # Process entities + # TODO There is performance when iterating get_all_labels for PostgresSQL all_labels = await self.chunk_entity_relation_graph.get_all_labels() for node_label in all_labels: node_data = await self.chunk_entity_relation_graph.get_node( @@ -1785,6 +1786,7 @@ class LightRAG: ) # Process relationships + # TODO There is performance when iterating get_all_labels for PostgresSQL for node_label in all_labels: node_edges = await self.chunk_entity_relation_graph.get_node_edges( node_label