From ead82a8dbd3e782b9ca25777125335fc6b2f83cf Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Mon, 9 Jun 2025 18:52:34 +0800 Subject: [PATCH] update delete_by_doc_id --- lightrag/base.py | 15 ++ lightrag/kg/json_kv_impl.py | 47 ++++ lightrag/lightrag.py | 259 ++++++++-------------- lightrag/operate.py | 417 ++++++++++++++++++++++++++++++++++++ lightrag/utils.py | 5 + 5 files changed, 572 insertions(+), 171 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index c6035f70..e66a67db 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC): False: if the cache drop failed, or the cache mode is not supported """ + async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + """Delete specific cache records from storage by chunk IDs + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed, or the operation is not supported + """ + @dataclass class BaseGraphStorage(StorageNameSpace, ABC): diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2d44ce00..2345e50f 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage): except Exception: return False + async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: + """Delete specific cache records from storage by chunk IDs + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + chunk_ids (list[str]): List of chunk IDs to be dropped from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed + """ + if not chunk_ids: + return False + + try: + async with self._storage_lock: + # Iterate through all cache modes to find entries with matching chunk_ids + for mode_key, mode_data in list(self._data.items()): + if isinstance(mode_data, dict): + # Check each cached entry in this mode + for cache_key, cache_entry in list(mode_data.items()): + if ( + isinstance(cache_entry, dict) + and cache_entry.get("chunk_id") in chunk_ids + ): + # Remove this cache entry + del mode_data[cache_key] + logger.debug( + f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" + ) + + # If the mode is now empty, remove it entirely + if not mode_data: + del self._data[mode_key] + + # Set update flags to notify persistence is needed + await set_all_update_flags(self.namespace) + + logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") + return True + except Exception as e: + logger.error(f"Error clearing cache by chunk IDs: {e}") + return False + async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0bf7de83..6d206d5c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -56,6 +56,7 @@ from .operate import ( kg_query, naive_query, query_with_keywords, + _rebuild_knowledge_from_chunks, ) from .prompt import GRAPH_FIELD_SEP from .utils import ( @@ -1207,6 +1208,7 @@ class LightRAG: cast(StorageNameSpace, storage_inst).index_done_callback() for storage_inst in [ # type: ignore self.full_docs, + self.doc_status, self.text_chunks, self.llm_response_cache, self.entities_vdb, @@ -1674,10 +1676,12 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.) - # Document delete is not working properly for most of the storage implementations. async def adelete_by_doc_id(self, doc_id: str) -> None: - """Delete a document and all its related data + """Delete a document and all its related data with cache cleanup and reconstruction + + Optimized version that: + 1. Clears LLM cache for related chunks + 2. Rebuilds entity and relationship descriptions from remaining chunks Args: doc_id: Document ID to delete @@ -1688,10 +1692,9 @@ class LightRAG: logger.warning(f"Document {doc_id} not found") return - logger.debug(f"Starting deletion for document {doc_id}") + logger.info(f"Starting optimized deletion for document {doc_id}") # 2. Get all chunks related to this document - # Find all chunks where full_doc_id equals the current doc_id all_chunks = await self.text_chunks.get_all() related_chunks = { chunk_id: chunk_data @@ -1704,64 +1707,46 @@ class LightRAG: logger.warning(f"No chunks found for document {doc_id}") return - # Get all related chunk IDs chunk_ids = set(related_chunks.keys()) - logger.debug(f"Found {len(chunk_ids)} chunks to delete") + logger.info(f"Found {len(chunk_ids)} chunks to delete") - # TODO: self.entities_vdb.client_storage only works for local storage, need to fix this + # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks + logger.info("Clearing LLM cache for related chunks...") + cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( + list(chunk_ids) + ) + if cache_cleared: + logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") + else: + logger.warning( + "Failed to clear chunk cache or cache clearing not supported" + ) - # 3. Before deleting, check the related entities and relationships for these chunks - for chunk_id in chunk_ids: - # Check entities - entities_storage = await self.entities_vdb.client_storage - entities = [ - dp - for dp in entities_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities") - - # Check relationships - relationships_storage = await self.relationships_vdb.client_storage - relations = [ - dp - for dp in relationships_storage["data"] - if chunk_id in dp.get("source_id") - ] - logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations") - - # Continue with the original deletion process... - - # 4. Delete chunks from vector database - if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - - # 5. Find and process entities and relationships that have these chunks as source - # Get all nodes and edges from the graph storage using storage-agnostic methods + # 4. Analyze entities and relationships that will be affected entities_to_delete = set() - entities_to_update = {} # entity_name -> new_source_id + entities_to_rebuild = {} # entity_name -> remaining_chunk_ids relationships_to_delete = set() - relationships_to_update = {} # (src, tgt) -> new_source_id + relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Process entities - use storage-agnostic methods + # Process entities all_labels = await self.chunk_entity_relation_graph.get_all_labels() for node_label in all_labels: node_data = await self.chunk_entity_relation_graph.get_node(node_label) if node_data and "source_id" in node_data: # Split source_id using GRAPH_FIELD_SEP sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: + remaining_sources = sources - chunk_ids + + if not remaining_sources: entities_to_delete.add(node_label) logger.debug( f"Entity {node_label} marked for deletion - no remaining sources" ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - entities_to_update[node_label] = new_source_id + elif remaining_sources != sources: + # Entity needs to be rebuilt from remaining chunks + entities_to_rebuild[node_label] = remaining_sources logger.debug( - f"Entity {node_label} will be updated with new source_id: {new_source_id}" + f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" ) # Process relationships @@ -1777,160 +1762,92 @@ class LightRAG: if edge_data and "source_id" in edge_data: # Split source_id using GRAPH_FIELD_SEP sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - sources.difference_update(chunk_ids) - if not sources: + remaining_sources = sources - chunk_ids + + if not remaining_sources: relationships_to_delete.add((src, tgt)) logger.debug( f"Relationship {src}-{tgt} marked for deletion - no remaining sources" ) - else: - new_source_id = GRAPH_FIELD_SEP.join(sources) - relationships_to_update[(src, tgt)] = new_source_id + elif remaining_sources != sources: + # Relationship needs to be rebuilt from remaining chunks + relationships_to_rebuild[(src, tgt)] = remaining_sources logger.debug( - f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}" + f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" ) - # Delete entities + # 5. Delete chunks from storage + if chunk_ids: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + + # 6. Delete entities that have no remaining sources if entities_to_delete: - for entity in entities_to_delete: - await self.entities_vdb.delete_entity(entity) - logger.debug(f"Deleted entity {entity} from vector DB") + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph await self.chunk_entity_relation_graph.remove_nodes( list(entities_to_delete) ) - logger.debug(f"Deleted {len(entities_to_delete)} entities from graph") + logger.info(f"Deleted {len(entities_to_delete)} entities") - # Update entities - for entity, new_source_id in entities_to_update.items(): - node_data = await self.chunk_entity_relation_graph.get_node(entity) - if node_data: - node_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_node( - entity, node_data - ) - logger.debug( - f"Updated entity {entity} with new source_id: {new_source_id}" - ) - - # Delete relationships + # 7. Delete relationships that have no remaining sources if relationships_to_delete: + # Delete from vector database + rel_ids_to_delete = [] for src, tgt in relationships_to_delete: - rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-") - rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-") - await self.relationships_vdb.delete([rel_id_0, rel_id_1]) - logger.debug(f"Deleted relationship {src}-{tgt} from vector DB") + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph await self.chunk_entity_relation_graph.remove_edges( list(relationships_to_delete) ) - logger.debug( - f"Deleted {len(relationships_to_delete)} relationships from graph" + logger.info(f"Deleted {len(relationships_to_delete)} relationships") + + # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + logger.info( + f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." + ) + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), ) - # Update relationships - for (src, tgt), new_source_id in relationships_to_update.items(): - edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt) - if edge_data: - edge_data["source_id"] = new_source_id - await self.chunk_entity_relation_graph.upsert_edge( - src, tgt, edge_data - ) - logger.debug( - f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}" - ) - - # 6. Delete original document and status + # 9. Delete original document and status await self.full_docs.delete([doc_id]) await self.doc_status.delete([doc_id]) - # 7. Ensure all indexes are updated + # 10. Ensure all indexes are updated await self._insert_done() logger.info( - f"Successfully deleted document {doc_id} and related data. " - f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. " - f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships." + f"Successfully deleted document {doc_id}. " + f"Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. " + f"Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships." ) - async def process_data(data_type, vdb, chunk_id): - # Check data (entities or relationships) - storage = await vdb.client_storage - data_with_chunk = [ - dp - for dp in storage["data"] - if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP) - ] - - data_for_vdb = {} - if data_with_chunk: - logger.warning( - f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}" - ) - - for item in data_with_chunk: - old_sources = item["source_id"].split(GRAPH_FIELD_SEP) - new_sources = [src for src in old_sources if src != chunk_id] - - if not new_sources: - logger.info( - f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists" - ) - await vdb.delete_entity(item) - else: - item["source_id"] = GRAPH_FIELD_SEP.join(new_sources) - item_id = item["__id__"] - data_for_vdb[item_id] = item.copy() - if data_type == "entities": - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - item.get("entity_name", "") - + (item.get("description") or "") - ) - else: # relationships - data_for_vdb[item_id]["content"] = data_for_vdb[ - item_id - ].get("content") or ( - (item.get("keywords") or "") - + (item.get("src_id") or "") - + (item.get("tgt_id") or "") - + (item.get("description") or "") - ) - - if data_for_vdb: - await vdb.upsert(data_for_vdb) - logger.info(f"Successfully updated {data_type} in vector DB") - - # Add verification step - async def verify_deletion(): - # Verify if the document has been deleted - if await self.full_docs.get_by_id(doc_id): - logger.warning(f"Document {doc_id} still exists in full_docs") - - # Verify if chunks have been deleted - all_remaining_chunks = await self.text_chunks.get_all() - remaining_related_chunks = { - chunk_id: chunk_data - for chunk_id, chunk_data in all_remaining_chunks.items() - if isinstance(chunk_data, dict) - and chunk_data.get("full_doc_id") == doc_id - } - - if remaining_related_chunks: - logger.warning( - f"Found {len(remaining_related_chunks)} remaining chunks" - ) - - # Verify entities and relationships - for chunk_id in chunk_ids: - await process_data("entities", self.entities_vdb, chunk_id) - await process_data( - "relationships", self.relationships_vdb, chunk_id - ) - - await verify_deletion() - except Exception as e: logger.error(f"Error while deleting document {doc_id}: {e}") + raise async def adelete_by_entity(self, entity_name: str) -> None: """Asynchronously delete an entity and all its relationships. diff --git a/lightrag/operate.py b/lightrag/operate.py index e1295091..91d1ee68 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -240,6 +240,421 @@ async def _handle_single_relationship_extraction( ) +async def _rebuild_knowledge_from_chunks( + entities_to_rebuild: dict[str, set[str]], + relationships_to_rebuild: dict[tuple[str, str], set[str]], + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + relationships_vdb: BaseVectorStorage, + text_chunks: BaseKVStorage, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild entity and relationship descriptions from cached extraction results + + This method uses cached LLM extraction results instead of calling LLM again, + following the same approach as the insert process. + + Args: + entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + """ + if not entities_to_rebuild and not relationships_to_rebuild: + return + + # Get all referenced chunk IDs + all_referenced_chunk_ids = set() + for chunk_ids in entities_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + for chunk_ids in relationships_to_rebuild.values(): + all_referenced_chunk_ids.update(chunk_ids) + + logger.info( + f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" + ) + + # Get cached extraction results for these chunks + cached_results = await _get_cached_extraction_results( + llm_response_cache, all_referenced_chunk_ids + ) + + if not cached_results: + logger.warning("No cached extraction results found, cannot rebuild") + return + + # Process cached results to get entities and relationships for each chunk + chunk_entities = {} # chunk_id -> {entity_name: [entity_data]} + chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]} + + for chunk_id, extraction_result in cached_results.items(): + try: + entities, relationships = await _parse_extraction_result( + text_chunks=text_chunks, + extraction_result=extraction_result, + chunk_id=chunk_id, + ) + chunk_entities[chunk_id] = entities + chunk_relationships[chunk_id] = relationships + except Exception as e: + logger.error( + f"Failed to parse cached extraction result for chunk {chunk_id}: {e}" + ) + continue + + # Rebuild entities + for entity_name, chunk_ids in entities_to_rebuild.items(): + try: + await _rebuild_single_entity( + knowledge_graph_inst=knowledge_graph_inst, + entities_vdb=entities_vdb, + entity_name=entity_name, + chunk_ids=chunk_ids, + chunk_entities=chunk_entities, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild entity {entity_name}: {e}") + + # Rebuild relationships + for (src, tgt), chunk_ids in relationships_to_rebuild.items(): + try: + await _rebuild_single_relationship( + knowledge_graph_inst=knowledge_graph_inst, + relationships_vdb=relationships_vdb, + src=src, + tgt=tgt, + chunk_ids=chunk_ids, + chunk_relationships=chunk_relationships, + llm_response_cache=llm_response_cache, + global_config=global_config, + ) + logger.debug( + f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions" + ) + except Exception as e: + logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") + + logger.info("Completed rebuilding knowledge from cached extractions") + + +async def _get_cached_extraction_results( + llm_response_cache: BaseKVStorage, chunk_ids: set[str] +) -> dict[str, str]: + """Get cached extraction results for specific chunk IDs + + Args: + chunk_ids: Set of chunk IDs to get cached results for + + Returns: + Dict mapping chunk_id -> extraction_result_text + """ + cached_results = {} + + # Get all cached data for "default" mode (entity extraction cache) + default_cache = await llm_response_cache.get_by_id("default") or {} + + for cache_key, cache_entry in default_cache.items(): + if ( + isinstance(cache_entry, dict) + and cache_entry.get("cache_type") == "extract" + and cache_entry.get("chunk_id") in chunk_ids + ): + chunk_id = cache_entry["chunk_id"] + extraction_result = cache_entry["return"] + cached_results[chunk_id] = extraction_result + + logger.info( + f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs" + ) + return cached_results + + +async def _parse_extraction_result( + text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str +) -> tuple[dict, dict]: + """Parse cached extraction result using the same logic as extract_entities + + Args: + extraction_result: The cached LLM extraction result + chunk_id: The chunk ID for source tracking + + Returns: + Tuple of (entities_dict, relationships_dict) + """ + + # Get chunk data for file_path + chunk_data = await text_chunks.get_by_id(chunk_id) + file_path = ( + chunk_data.get("file_path", "unknown_source") + if chunk_data + else "unknown_source" + ) + context_base = dict( + tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], + record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"], + completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], + ) + maybe_nodes = defaultdict(list) + maybe_edges = defaultdict(list) + + # Parse the extraction result using the same logic as in extract_entities + records = split_string_by_multi_markers( + extraction_result, + [context_base["record_delimiter"], context_base["completion_delimiter"]], + ) + for record in records: + record = re.search(r"\((.*)\)", record) + if record is None: + continue + record = record.group(1) + record_attributes = split_string_by_multi_markers( + record, [context_base["tuple_delimiter"]] + ) + + # Try to parse as entity + entity_data = await _handle_single_entity_extraction( + record_attributes, chunk_id, file_path + ) + if entity_data is not None: + maybe_nodes[entity_data["entity_name"]].append(entity_data) + continue + + # Try to parse as relationship + relationship_data = await _handle_single_relationship_extraction( + record_attributes, chunk_id, file_path + ) + if relationship_data is not None: + maybe_edges[ + (relationship_data["src_id"], relationship_data["tgt_id"]) + ].append(relationship_data) + + return dict(maybe_nodes), dict(maybe_edges) + + +async def _rebuild_single_entity( + knowledge_graph_inst: BaseGraphStorage, + entities_vdb: BaseVectorStorage, + entity_name: str, + chunk_ids: set[str], + chunk_entities: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single entity from cached extraction results""" + + # Get current entity data + current_entity = await knowledge_graph_inst.get_node(entity_name) + if not current_entity: + return + + # Collect all entity data from relevant chunks + all_entity_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: + all_entity_data.extend(chunk_entities[chunk_id][entity_name]) + + if not all_entity_data: + logger.warning(f"No cached entity data found for {entity_name}") + return + + # Merge descriptions and get the most common entity type + descriptions = [] + entity_types = [] + file_paths = set() + + for entity_data in all_entity_data: + if entity_data.get("description"): + descriptions.append(entity_data["description"]) + if entity_data.get("entity_type"): + entity_types.append(entity_data["entity_type"]) + if entity_data.get("file_path"): + file_paths.add(entity_data["file_path"]) + + # Combine all descriptions + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_entity.get("description", "") + ) + + # Get most common entity type + entity_type = ( + max(set(entity_types), key=entity_types.count) + if entity_types + else current_entity.get("entity_type", "UNKNOWN") + ) + + # Use summary if description is too long + if len(combined_description) > global_config["summary_to_max_tokens"]: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + + # Update entity in graph storage + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + + # Update entity in vector database + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Delete old vector record first + try: + await entities_vdb.delete([entity_vdb_id]) + except Exception as e: + logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") + + # Insert new vector record + entity_content = f"{entity_name}\n{final_description}" + await entities_vdb.upsert( + { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_name, + "source_id": updated_entity_data["source_id"], + "description": final_description, + "entity_type": entity_type, + "file_path": updated_entity_data["file_path"], + } + } + ) + + +async def _rebuild_single_relationship( + knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage, + src: str, + tgt: str, + chunk_ids: set[str], + chunk_relationships: dict, + llm_response_cache: BaseKVStorage, + global_config: dict[str, str], +) -> None: + """Rebuild a single relationship from cached extraction results""" + + # Get current relationship data + current_relationship = await knowledge_graph_inst.get_edge(src, tgt) + if not current_relationship: + return + + # Collect all relationship data from relevant chunks + all_relationship_data = [] + for chunk_id in chunk_ids: + if chunk_id in chunk_relationships: + # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional + for edge_key in [(src, tgt), (tgt, src)]: + if edge_key in chunk_relationships[chunk_id]: + all_relationship_data.extend( + chunk_relationships[chunk_id][edge_key] + ) + + if not all_relationship_data: + logger.warning(f"No cached relationship data found for {src}-{tgt}") + return + + # Merge descriptions and keywords + descriptions = [] + keywords = [] + weights = [] + file_paths = set() + + for rel_data in all_relationship_data: + if rel_data.get("description"): + descriptions.append(rel_data["description"]) + if rel_data.get("keywords"): + keywords.append(rel_data["keywords"]) + if rel_data.get("weight"): + weights.append(rel_data["weight"]) + if rel_data.get("file_path"): + file_paths.add(rel_data["file_path"]) + + # Combine descriptions and keywords + combined_description = ( + GRAPH_FIELD_SEP.join(descriptions) + if descriptions + else current_relationship.get("description", "") + ) + combined_keywords = ( + ", ".join(set(keywords)) + if keywords + else current_relationship.get("keywords", "") + ) + avg_weight = ( + sum(weights) / len(weights) + if weights + else current_relationship.get("weight", 1.0) + ) + + # Use summary if description is too long + if len(combined_description) > global_config["summary_to_max_tokens"]: + final_description = await _handle_entity_relation_summary( + f"{src}-{tgt}", + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + + # Update relationship in graph storage + updated_relationship_data = { + **current_relationship, + "description": final_description, + "keywords": combined_keywords, + "weight": avg_weight, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) + if file_paths + else current_relationship.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) + + # Update relationship in vector database + rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") + + # Delete old vector records first (both directions to be safe) + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + + # Insert new vector record + rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}" + await relationships_vdb.upsert( + { + rel_vdb_id: { + "src_id": src, + "tgt_id": tgt, + "source_id": updated_relationship_data["source_id"], + "content": rel_content, + "keywords": combined_keywords, + "description": final_description, + "weight": avg_weight, + "file_path": updated_relationship_data["file_path"], + } + } + ) + + async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], @@ -757,6 +1172,7 @@ async def extract_entities( use_llm_func, llm_response_cache=llm_response_cache, cache_type="extract", + chunk_id=chunk_key, ) history = pack_user_ass_to_openai_messages(hint_prompt, final_result) @@ -773,6 +1189,7 @@ async def extract_entities( llm_response_cache=llm_response_cache, history_messages=history, cache_type="extract", + chunk_id=chunk_key, ) history += pack_user_ass_to_openai_messages(continue_prompt, glean_result) diff --git a/lightrag/utils.py b/lightrag/utils.py index 2e75b9b9..06b7a468 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -990,6 +990,7 @@ class CacheData: max_val: float | None = None mode: str = "default" cache_type: str = "query" + chunk_id: str | None = None async def save_to_cache(hashing_kv, cache_data: CacheData): @@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData): mode_cache[cache_data.args_hash] = { "return": cache_data.content, "cache_type": cache_data.cache_type, + "chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None, "embedding": cache_data.quantized.tobytes().hex() if cache_data.quantized is not None else None, @@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache( max_tokens: int = None, history_messages: list[dict[str, str]] = None, cache_type: str = "extract", + chunk_id: str | None = None, ) -> str: """Call LLM function with cache support @@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache( max_tokens: Maximum tokens for generation history_messages: History messages list cache_type: Type of cache + chunk_id: Chunk identifier to store in cache Returns: LLM response text @@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache( content=res, prompt=_prompt, cache_type=cache_type, + chunk_id=chunk_id, ), )