update delete_by_doc_id

This commit is contained in:
zrguo 2025-06-09 18:52:34 +08:00
parent ea2fabe6b0
commit ead82a8dbd
5 changed files with 572 additions and 171 deletions

View file

@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC):
False: if the cache drop failed, or the cache mode is not supported False: if the cache drop failed, or the cache mode is not supported
""" """
async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
"""Delete specific cache records from storage by chunk IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
Args:
chunk_ids (list[str]): List of chunk IDs to be dropped from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed, or the operation is not supported
"""
@dataclass @dataclass
class BaseGraphStorage(StorageNameSpace, ABC): class BaseGraphStorage(StorageNameSpace, ABC):

View file

@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage):
except Exception: except Exception:
return False return False
async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
"""Delete specific cache records from storage by chunk IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
Args:
chunk_ids (list[str]): List of chunk IDs to be dropped from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed
"""
if not chunk_ids:
return False
try:
async with self._storage_lock:
# Iterate through all cache modes to find entries with matching chunk_ids
for mode_key, mode_data in list(self._data.items()):
if isinstance(mode_data, dict):
# Check each cached entry in this mode
for cache_key, cache_entry in list(mode_data.items()):
if (
isinstance(cache_entry, dict)
and cache_entry.get("chunk_id") in chunk_ids
):
# Remove this cache entry
del mode_data[cache_key]
logger.debug(
f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
)
# If the mode is now empty, remove it entirely
if not mode_data:
del self._data[mode_key]
# Set update flags to notify persistence is needed
await set_all_update_flags(self.namespace)
logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
return True
except Exception as e:
logger.error(f"Error clearing cache by chunk IDs: {e}")
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This action will persistent the data to disk immediately. This action will persistent the data to disk immediately.

View file

@ -56,6 +56,7 @@ from .operate import (
kg_query, kg_query,
naive_query, naive_query,
query_with_keywords, query_with_keywords,
_rebuild_knowledge_from_chunks,
) )
from .prompt import GRAPH_FIELD_SEP from .prompt import GRAPH_FIELD_SEP
from .utils import ( from .utils import (
@ -1207,6 +1208,7 @@ class LightRAG:
cast(StorageNameSpace, storage_inst).index_done_callback() cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore for storage_inst in [ # type: ignore
self.full_docs, self.full_docs,
self.doc_status,
self.text_chunks, self.text_chunks,
self.llm_response_cache, self.llm_response_cache,
self.entities_vdb, self.entities_vdb,
@ -1674,10 +1676,12 @@ class LightRAG:
# Return the dictionary containing statuses only for the found document IDs # Return the dictionary containing statuses only for the found document IDs
return found_statuses return found_statuses
# TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
# Document delete is not working properly for most of the storage implementations.
async def adelete_by_doc_id(self, doc_id: str) -> None: async def adelete_by_doc_id(self, doc_id: str) -> None:
"""Delete a document and all its related data """Delete a document and all its related data with cache cleanup and reconstruction
Optimized version that:
1. Clears LLM cache for related chunks
2. Rebuilds entity and relationship descriptions from remaining chunks
Args: Args:
doc_id: Document ID to delete doc_id: Document ID to delete
@ -1688,10 +1692,9 @@ class LightRAG:
logger.warning(f"Document {doc_id} not found") logger.warning(f"Document {doc_id} not found")
return return
logger.debug(f"Starting deletion for document {doc_id}") logger.info(f"Starting optimized deletion for document {doc_id}")
# 2. Get all chunks related to this document # 2. Get all chunks related to this document
# Find all chunks where full_doc_id equals the current doc_id
all_chunks = await self.text_chunks.get_all() all_chunks = await self.text_chunks.get_all()
related_chunks = { related_chunks = {
chunk_id: chunk_data chunk_id: chunk_data
@ -1704,64 +1707,46 @@ class LightRAG:
logger.warning(f"No chunks found for document {doc_id}") logger.warning(f"No chunks found for document {doc_id}")
return return
# Get all related chunk IDs
chunk_ids = set(related_chunks.keys()) chunk_ids = set(related_chunks.keys())
logger.debug(f"Found {len(chunk_ids)} chunks to delete") logger.info(f"Found {len(chunk_ids)} chunks to delete")
# TODO: self.entities_vdb.client_storage only works for local storage, need to fix this # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
logger.info("Clearing LLM cache for related chunks...")
cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
list(chunk_ids)
)
if cache_cleared:
logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
else:
logger.warning(
"Failed to clear chunk cache or cache clearing not supported"
)
# 3. Before deleting, check the related entities and relationships for these chunks # 4. Analyze entities and relationships that will be affected
for chunk_id in chunk_ids:
# Check entities
entities_storage = await self.entities_vdb.client_storage
entities = [
dp
for dp in entities_storage["data"]
if chunk_id in dp.get("source_id")
]
logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities")
# Check relationships
relationships_storage = await self.relationships_vdb.client_storage
relations = [
dp
for dp in relationships_storage["data"]
if chunk_id in dp.get("source_id")
]
logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations")
# Continue with the original deletion process...
# 4. Delete chunks from vector database
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
# 5. Find and process entities and relationships that have these chunks as source
# Get all nodes and edges from the graph storage using storage-agnostic methods
entities_to_delete = set() entities_to_delete = set()
entities_to_update = {} # entity_name -> new_source_id entities_to_rebuild = {} # entity_name -> remaining_chunk_ids
relationships_to_delete = set() relationships_to_delete = set()
relationships_to_update = {} # (src, tgt) -> new_source_id relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
# Process entities - use storage-agnostic methods # Process entities
all_labels = await self.chunk_entity_relation_graph.get_all_labels() all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels: for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(node_label) node_data = await self.chunk_entity_relation_graph.get_node(node_label)
if node_data and "source_id" in node_data: if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP # Split source_id using GRAPH_FIELD_SEP
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids) remaining_sources = sources - chunk_ids
if not sources:
if not remaining_sources:
entities_to_delete.add(node_label) entities_to_delete.add(node_label)
logger.debug( logger.debug(
f"Entity {node_label} marked for deletion - no remaining sources" f"Entity {node_label} marked for deletion - no remaining sources"
) )
else: elif remaining_sources != sources:
new_source_id = GRAPH_FIELD_SEP.join(sources) # Entity needs to be rebuilt from remaining chunks
entities_to_update[node_label] = new_source_id entities_to_rebuild[node_label] = remaining_sources
logger.debug( logger.debug(
f"Entity {node_label} will be updated with new source_id: {new_source_id}" f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
) )
# Process relationships # Process relationships
@ -1777,160 +1762,92 @@ class LightRAG:
if edge_data and "source_id" in edge_data: if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP # Split source_id using GRAPH_FIELD_SEP
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids) remaining_sources = sources - chunk_ids
if not sources:
if not remaining_sources:
relationships_to_delete.add((src, tgt)) relationships_to_delete.add((src, tgt))
logger.debug( logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources" f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
) )
else: elif remaining_sources != sources:
new_source_id = GRAPH_FIELD_SEP.join(sources) # Relationship needs to be rebuilt from remaining chunks
relationships_to_update[(src, tgt)] = new_source_id relationships_to_rebuild[(src, tgt)] = remaining_sources
logger.debug( logger.debug(
f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}" f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
) )
# Delete entities # 5. Delete chunks from storage
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
# 6. Delete entities that have no remaining sources
if entities_to_delete: if entities_to_delete:
for entity in entities_to_delete: # Delete from vector database
await self.entities_vdb.delete_entity(entity) entity_vdb_ids = [
logger.debug(f"Deleted entity {entity} from vector DB") compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes( await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete) list(entities_to_delete)
) )
logger.debug(f"Deleted {len(entities_to_delete)} entities from graph") logger.info(f"Deleted {len(entities_to_delete)} entities")
# Update entities # 7. Delete relationships that have no remaining sources
for entity, new_source_id in entities_to_update.items():
node_data = await self.chunk_entity_relation_graph.get_node(entity)
if node_data:
node_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_node(
entity, node_data
)
logger.debug(
f"Updated entity {entity} with new source_id: {new_source_id}"
)
# Delete relationships
if relationships_to_delete: if relationships_to_delete:
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in relationships_to_delete: for src, tgt in relationships_to_delete:
rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-") rel_ids_to_delete.extend(
rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-") [
await self.relationships_vdb.delete([rel_id_0, rel_id_1]) compute_mdhash_id(src + tgt, prefix="rel-"),
logger.debug(f"Deleted relationship {src}-{tgt} from vector DB") compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges( await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete) list(relationships_to_delete)
) )
logger.debug( logger.info(f"Deleted {len(relationships_to_delete)} relationships")
f"Deleted {len(relationships_to_delete)} relationships from graph"
# 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
logger.info(
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
)
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
) )
# Update relationships # 9. Delete original document and status
for (src, tgt), new_source_id in relationships_to_update.items():
edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt)
if edge_data:
edge_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_edge(
src, tgt, edge_data
)
logger.debug(
f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}"
)
# 6. Delete original document and status
await self.full_docs.delete([doc_id]) await self.full_docs.delete([doc_id])
await self.doc_status.delete([doc_id]) await self.doc_status.delete([doc_id])
# 7. Ensure all indexes are updated # 10. Ensure all indexes are updated
await self._insert_done() await self._insert_done()
logger.info( logger.info(
f"Successfully deleted document {doc_id} and related data. " f"Successfully deleted document {doc_id}. "
f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. " f"Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. "
f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships." f"Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships."
) )
async def process_data(data_type, vdb, chunk_id):
# Check data (entities or relationships)
storage = await vdb.client_storage
data_with_chunk = [
dp
for dp in storage["data"]
if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP)
]
data_for_vdb = {}
if data_with_chunk:
logger.warning(
f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}"
)
for item in data_with_chunk:
old_sources = item["source_id"].split(GRAPH_FIELD_SEP)
new_sources = [src for src in old_sources if src != chunk_id]
if not new_sources:
logger.info(
f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists"
)
await vdb.delete_entity(item)
else:
item["source_id"] = GRAPH_FIELD_SEP.join(new_sources)
item_id = item["__id__"]
data_for_vdb[item_id] = item.copy()
if data_type == "entities":
data_for_vdb[item_id]["content"] = data_for_vdb[
item_id
].get("content") or (
item.get("entity_name", "")
+ (item.get("description") or "")
)
else: # relationships
data_for_vdb[item_id]["content"] = data_for_vdb[
item_id
].get("content") or (
(item.get("keywords") or "")
+ (item.get("src_id") or "")
+ (item.get("tgt_id") or "")
+ (item.get("description") or "")
)
if data_for_vdb:
await vdb.upsert(data_for_vdb)
logger.info(f"Successfully updated {data_type} in vector DB")
# Add verification step
async def verify_deletion():
# Verify if the document has been deleted
if await self.full_docs.get_by_id(doc_id):
logger.warning(f"Document {doc_id} still exists in full_docs")
# Verify if chunks have been deleted
all_remaining_chunks = await self.text_chunks.get_all()
remaining_related_chunks = {
chunk_id: chunk_data
for chunk_id, chunk_data in all_remaining_chunks.items()
if isinstance(chunk_data, dict)
and chunk_data.get("full_doc_id") == doc_id
}
if remaining_related_chunks:
logger.warning(
f"Found {len(remaining_related_chunks)} remaining chunks"
)
# Verify entities and relationships
for chunk_id in chunk_ids:
await process_data("entities", self.entities_vdb, chunk_id)
await process_data(
"relationships", self.relationships_vdb, chunk_id
)
await verify_deletion()
except Exception as e: except Exception as e:
logger.error(f"Error while deleting document {doc_id}: {e}") logger.error(f"Error while deleting document {doc_id}: {e}")
raise
async def adelete_by_entity(self, entity_name: str) -> None: async def adelete_by_entity(self, entity_name: str) -> None:
"""Asynchronously delete an entity and all its relationships. """Asynchronously delete an entity and all its relationships.

View file

@ -240,6 +240,421 @@ async def _handle_single_relationship_extraction(
) )
async def _rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, set[str]],
relationships_to_rebuild: dict[tuple[str, str], set[str]],
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
text_chunks: BaseKVStorage,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results
This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process.
Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
"""
if not entities_to_rebuild and not relationships_to_rebuild:
return
# Get all referenced chunk IDs
all_referenced_chunk_ids = set()
for chunk_ids in entities_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
logger.info(
f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
)
# Get cached extraction results for these chunks
cached_results = await _get_cached_extraction_results(
llm_response_cache, all_referenced_chunk_ids
)
if not cached_results:
logger.warning("No cached extraction results found, cannot rebuild")
return
# Process cached results to get entities and relationships for each chunk
chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
for chunk_id, extraction_result in cached_results.items():
try:
entities, relationships = await _parse_extraction_result(
text_chunks=text_chunks,
extraction_result=extraction_result,
chunk_id=chunk_id,
)
chunk_entities[chunk_id] = entities
chunk_relationships[chunk_id] = relationships
except Exception as e:
logger.error(
f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
)
continue
# Rebuild entities
for entity_name, chunk_ids in entities_to_rebuild.items():
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions"
)
except Exception as e:
logger.error(f"Failed to rebuild entity {entity_name}: {e}")
# Rebuild relationships
for (src, tgt), chunk_ids in relationships_to_rebuild.items():
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions"
)
except Exception as e:
logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
logger.info("Completed rebuilding knowledge from cached extractions")
async def _get_cached_extraction_results(
llm_response_cache: BaseKVStorage, chunk_ids: set[str]
) -> dict[str, str]:
"""Get cached extraction results for specific chunk IDs
Args:
chunk_ids: Set of chunk IDs to get cached results for
Returns:
Dict mapping chunk_id -> extraction_result_text
"""
cached_results = {}
# Get all cached data for "default" mode (entity extraction cache)
default_cache = await llm_response_cache.get_by_id("default") or {}
for cache_key, cache_entry in default_cache.items():
if (
isinstance(cache_entry, dict)
and cache_entry.get("cache_type") == "extract"
and cache_entry.get("chunk_id") in chunk_ids
):
chunk_id = cache_entry["chunk_id"]
extraction_result = cache_entry["return"]
cached_results[chunk_id] = extraction_result
logger.info(
f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
)
return cached_results
async def _parse_extraction_result(
text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str
) -> tuple[dict, dict]:
"""Parse cached extraction result using the same logic as extract_entities
Args:
extraction_result: The cached LLM extraction result
chunk_id: The chunk ID for source tracking
Returns:
Tuple of (entities_dict, relationships_dict)
"""
# Get chunk data for file_path
chunk_data = await text_chunks.get_by_id(chunk_id)
file_path = (
chunk_data.get("file_path", "unknown_source")
if chunk_data
else "unknown_source"
)
context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
)
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
# Parse the extraction result using the same logic as in extract_entities
records = split_string_by_multi_markers(
extraction_result,
[context_base["record_delimiter"], context_base["completion_delimiter"]],
)
for record in records:
record = re.search(r"\((.*)\)", record)
if record is None:
continue
record = record.group(1)
record_attributes = split_string_by_multi_markers(
record, [context_base["tuple_delimiter"]]
)
# Try to parse as entity
entity_data = await _handle_single_entity_extraction(
record_attributes, chunk_id, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
continue
# Try to parse as relationship
relationship_data = await _handle_single_relationship_extraction(
record_attributes, chunk_id, file_path
)
if relationship_data is not None:
maybe_edges[
(relationship_data["src_id"], relationship_data["tgt_id"])
].append(relationship_data)
return dict(maybe_nodes), dict(maybe_edges)
async def _rebuild_single_entity(
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
entity_name: str,
chunk_ids: set[str],
chunk_entities: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single entity from cached extraction results"""
# Get current entity data
current_entity = await knowledge_graph_inst.get_node(entity_name)
if not current_entity:
return
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name])
if not all_entity_data:
logger.warning(f"No cached entity data found for {entity_name}")
return
# Merge descriptions and get the most common entity type
descriptions = []
entity_types = []
file_paths = set()
for entity_data in all_entity_data:
if entity_data.get("description"):
descriptions.append(entity_data["description"])
if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Get most common entity type
entity_type = (
max(set(entity_types), key=entity_types.count)
if entity_types
else current_entity.get("entity_type", "UNKNOWN")
)
# Use summary if description is too long
if len(combined_description) > global_config["summary_to_max_tokens"]:
final_description = await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# Update entity in graph storage
updated_entity_data = {
**current_entity,
"description": final_description,
"entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_entity.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
# Update entity in vector database
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
# Delete old vector record first
try:
await entities_vdb.delete([entity_vdb_id])
except Exception as e:
logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}")
# Insert new vector record
entity_content = f"{entity_name}\n{final_description}"
await entities_vdb.upsert(
{
entity_vdb_id: {
"content": entity_content,
"entity_name": entity_name,
"source_id": updated_entity_data["source_id"],
"description": final_description,
"entity_type": entity_type,
"file_path": updated_entity_data["file_path"],
}
}
)
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: set[str],
chunk_relationships: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single relationship from cached extraction results"""
# Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
if not current_relationship:
return
# Collect all relationship data from relevant chunks
all_relationship_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_relationships:
# Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
for edge_key in [(src, tgt), (tgt, src)]:
if edge_key in chunk_relationships[chunk_id]:
all_relationship_data.extend(
chunk_relationships[chunk_id][edge_key]
)
if not all_relationship_data:
logger.warning(f"No cached relationship data found for {src}-{tgt}")
return
# Merge descriptions and keywords
descriptions = []
keywords = []
weights = []
file_paths = set()
for rel_data in all_relationship_data:
if rel_data.get("description"):
descriptions.append(rel_data["description"])
if rel_data.get("keywords"):
keywords.append(rel_data["keywords"])
if rel_data.get("weight"):
weights.append(rel_data["weight"])
if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"])
# Combine descriptions and keywords
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_relationship.get("description", "")
)
combined_keywords = (
", ".join(set(keywords))
if keywords
else current_relationship.get("keywords", "")
)
avg_weight = (
sum(weights) / len(weights)
if weights
else current_relationship.get("weight", 1.0)
)
# Use summary if description is too long
if len(combined_description) > global_config["summary_to_max_tokens"]:
final_description = await _handle_entity_relation_summary(
f"{src}-{tgt}",
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# Update relationship in graph storage
updated_relationship_data = {
**current_relationship,
"description": final_description,
"keywords": combined_keywords,
"weight": avg_weight,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths
else current_relationship.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
# Delete old vector records first (both directions to be safe)
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
# Insert new vector record
rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
await relationships_vdb.upsert(
{
rel_vdb_id: {
"src_id": src,
"tgt_id": tgt,
"source_id": updated_relationship_data["source_id"],
"content": rel_content,
"keywords": combined_keywords,
"description": final_description,
"weight": avg_weight,
"file_path": updated_relationship_data["file_path"],
}
}
)
async def _merge_nodes_then_upsert( async def _merge_nodes_then_upsert(
entity_name: str, entity_name: str,
nodes_data: list[dict], nodes_data: list[dict],
@ -757,6 +1172,7 @@ async def extract_entities(
use_llm_func, use_llm_func,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
cache_type="extract", cache_type="extract",
chunk_id=chunk_key,
) )
history = pack_user_ass_to_openai_messages(hint_prompt, final_result) history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
@ -773,6 +1189,7 @@ async def extract_entities(
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
history_messages=history, history_messages=history,
cache_type="extract", cache_type="extract",
chunk_id=chunk_key,
) )
history += pack_user_ass_to_openai_messages(continue_prompt, glean_result) history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)

View file

@ -990,6 +990,7 @@ class CacheData:
max_val: float | None = None max_val: float | None = None
mode: str = "default" mode: str = "default"
cache_type: str = "query" cache_type: str = "query"
chunk_id: str | None = None
async def save_to_cache(hashing_kv, cache_data: CacheData): async def save_to_cache(hashing_kv, cache_data: CacheData):
@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
mode_cache[cache_data.args_hash] = { mode_cache[cache_data.args_hash] = {
"return": cache_data.content, "return": cache_data.content,
"cache_type": cache_data.cache_type, "cache_type": cache_data.cache_type,
"chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None,
"embedding": cache_data.quantized.tobytes().hex() "embedding": cache_data.quantized.tobytes().hex()
if cache_data.quantized is not None if cache_data.quantized is not None
else None, else None,
@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache(
max_tokens: int = None, max_tokens: int = None,
history_messages: list[dict[str, str]] = None, history_messages: list[dict[str, str]] = None,
cache_type: str = "extract", cache_type: str = "extract",
chunk_id: str | None = None,
) -> str: ) -> str:
"""Call LLM function with cache support """Call LLM function with cache support
@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache(
max_tokens: Maximum tokens for generation max_tokens: Maximum tokens for generation
history_messages: History messages list history_messages: History messages list
cache_type: Type of cache cache_type: Type of cache
chunk_id: Chunk identifier to store in cache
Returns: Returns:
LLM response text LLM response text
@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache(
content=res, content=res,
prompt=_prompt, prompt=_prompt,
cache_type=cache_type, cache_type=cache_type,
chunk_id=chunk_id,
), ),
) )