Fix: Remove redundant entity/relation chunk deletions
(cherry picked from commit ea141e2779)
This commit is contained in:
parent
211dbc3f78
commit
70ba7cd787
1 changed files with 335 additions and 95 deletions
|
|
@ -22,6 +22,7 @@ from typing import (
|
||||||
Dict,
|
Dict,
|
||||||
)
|
)
|
||||||
from lightrag.prompt import PROMPTS
|
from lightrag.prompt import PROMPTS
|
||||||
|
from lightrag.exceptions import PipelineCancelledException
|
||||||
from lightrag.constants import (
|
from lightrag.constants import (
|
||||||
DEFAULT_MAX_GLEANING,
|
DEFAULT_MAX_GLEANING,
|
||||||
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
|
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
|
||||||
|
|
@ -47,6 +48,8 @@ from lightrag.constants import (
|
||||||
DEFAULT_LLM_TIMEOUT,
|
DEFAULT_LLM_TIMEOUT,
|
||||||
DEFAULT_EMBEDDING_TIMEOUT,
|
DEFAULT_EMBEDDING_TIMEOUT,
|
||||||
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
|
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
|
||||||
|
DEFAULT_MAX_FILE_PATHS,
|
||||||
|
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
|
||||||
)
|
)
|
||||||
from lightrag.utils import get_env_value
|
from lightrag.utils import get_env_value
|
||||||
|
|
||||||
|
|
@ -84,7 +87,7 @@ from lightrag.operate import (
|
||||||
merge_nodes_and_edges,
|
merge_nodes_and_edges,
|
||||||
kg_query,
|
kg_query,
|
||||||
naive_query,
|
naive_query,
|
||||||
_rebuild_knowledge_from_chunks,
|
rebuild_knowledge_from_chunks,
|
||||||
)
|
)
|
||||||
from lightrag.constants import GRAPH_FIELD_SEP
|
from lightrag.constants import GRAPH_FIELD_SEP
|
||||||
from lightrag.utils import (
|
from lightrag.utils import (
|
||||||
|
|
@ -393,6 +396,14 @@ class LightRAG:
|
||||||
)
|
)
|
||||||
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
|
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
|
||||||
|
|
||||||
|
max_file_paths: int = field(
|
||||||
|
default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int)
|
||||||
|
)
|
||||||
|
"""Maximum number of file paths to store in entity/relation file_path field."""
|
||||||
|
|
||||||
|
file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER)
|
||||||
|
"""Placeholder text when file paths exceed max_file_paths limit."""
|
||||||
|
|
||||||
addon_params: dict[str, Any] = field(
|
addon_params: dict[str, Any] = field(
|
||||||
default_factory=lambda: {
|
default_factory=lambda: {
|
||||||
"language": get_env_value(
|
"language": get_env_value(
|
||||||
|
|
@ -699,7 +710,7 @@ class LightRAG:
|
||||||
|
|
||||||
async def check_and_migrate_data(self):
|
async def check_and_migrate_data(self):
|
||||||
"""Check if data migration is needed and perform migration if necessary"""
|
"""Check if data migration is needed and perform migration if necessary"""
|
||||||
async with get_data_init_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
try:
|
try:
|
||||||
# Check if migration is needed:
|
# Check if migration is needed:
|
||||||
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
|
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
|
||||||
|
|
@ -877,13 +888,13 @@ class LightRAG:
|
||||||
need_entity_migration = await self.entity_chunks.is_empty()
|
need_entity_migration = await self.entity_chunks.is_empty()
|
||||||
except Exception as exc: # pragma: no cover - defensive logging
|
except Exception as exc: # pragma: no cover - defensive logging
|
||||||
logger.error(f"Failed to check entity chunks storage: {exc}")
|
logger.error(f"Failed to check entity chunks storage: {exc}")
|
||||||
need_entity_migration = True
|
raise exc
|
||||||
|
|
||||||
try:
|
try:
|
||||||
need_relation_migration = await self.relation_chunks.is_empty()
|
need_relation_migration = await self.relation_chunks.is_empty()
|
||||||
except Exception as exc: # pragma: no cover - defensive logging
|
except Exception as exc: # pragma: no cover - defensive logging
|
||||||
logger.error(f"Failed to check relation chunks storage: {exc}")
|
logger.error(f"Failed to check relation chunks storage: {exc}")
|
||||||
need_relation_migration = True
|
raise exc
|
||||||
|
|
||||||
if not need_entity_migration and not need_relation_migration:
|
if not need_entity_migration and not need_relation_migration:
|
||||||
return
|
return
|
||||||
|
|
@ -1593,6 +1604,7 @@ class LightRAG:
|
||||||
"batchs": 0, # Total number of files to be processed
|
"batchs": 0, # Total number of files to be processed
|
||||||
"cur_batch": 0, # Number of files already processed
|
"cur_batch": 0, # Number of files already processed
|
||||||
"request_pending": False, # Clear any previous request
|
"request_pending": False, # Clear any previous request
|
||||||
|
"cancellation_requested": False, # Initialize cancellation flag
|
||||||
"latest_message": "",
|
"latest_message": "",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
@ -1609,6 +1621,22 @@ class LightRAG:
|
||||||
try:
|
try:
|
||||||
# Process documents until no more documents or requests
|
# Process documents until no more documents or requests
|
||||||
while True:
|
while True:
|
||||||
|
# Check for cancellation request at the start of main loop
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("cancellation_requested", False):
|
||||||
|
# Clear pending request
|
||||||
|
pipeline_status["request_pending"] = False
|
||||||
|
# Celar cancellation flag
|
||||||
|
pipeline_status["cancellation_requested"] = False
|
||||||
|
|
||||||
|
log_message = "Pipeline cancelled by user"
|
||||||
|
logger.info(log_message)
|
||||||
|
pipeline_status["latest_message"] = log_message
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
# Exit directly, skipping request_pending check
|
||||||
|
return
|
||||||
|
|
||||||
if not to_process_docs:
|
if not to_process_docs:
|
||||||
log_message = "All enqueued documents have been processed"
|
log_message = "All enqueued documents have been processed"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
|
|
@ -1671,14 +1699,25 @@ class LightRAG:
|
||||||
semaphore: asyncio.Semaphore,
|
semaphore: asyncio.Semaphore,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process single document"""
|
"""Process single document"""
|
||||||
|
# Initialize variables at the start to prevent UnboundLocalError in error handling
|
||||||
|
file_path = "unknown_source"
|
||||||
|
current_file_number = 0
|
||||||
file_extraction_stage_ok = False
|
file_extraction_stage_ok = False
|
||||||
|
processing_start_time = int(time.time())
|
||||||
|
first_stage_tasks = []
|
||||||
|
entity_relation_task = None
|
||||||
|
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
nonlocal processed_count
|
nonlocal processed_count
|
||||||
current_file_number = 0
|
|
||||||
# Initialize to prevent UnboundLocalError in error handling
|
# Initialize to prevent UnboundLocalError in error handling
|
||||||
first_stage_tasks = []
|
first_stage_tasks = []
|
||||||
entity_relation_task = None
|
entity_relation_task = None
|
||||||
try:
|
try:
|
||||||
|
# Check for cancellation before starting document processing
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("cancellation_requested", False):
|
||||||
|
raise PipelineCancelledException("User cancelled")
|
||||||
|
|
||||||
# Get file path from status document
|
# Get file path from status document
|
||||||
file_path = getattr(
|
file_path = getattr(
|
||||||
status_doc, "file_path", "unknown_source"
|
status_doc, "file_path", "unknown_source"
|
||||||
|
|
@ -1741,6 +1780,11 @@ class LightRAG:
|
||||||
# Record processing start time
|
# Record processing start time
|
||||||
processing_start_time = int(time.time())
|
processing_start_time = int(time.time())
|
||||||
|
|
||||||
|
# Check for cancellation before entity extraction
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("cancellation_requested", False):
|
||||||
|
raise PipelineCancelledException("User cancelled")
|
||||||
|
|
||||||
# Process document in two stages
|
# Process document in two stages
|
||||||
# Stage 1: Process text chunks and docs (parallel execution)
|
# Stage 1: Process text chunks and docs (parallel execution)
|
||||||
doc_status_task = asyncio.create_task(
|
doc_status_task = asyncio.create_task(
|
||||||
|
|
@ -1791,20 +1835,33 @@ class LightRAG:
|
||||||
chunks, pipeline_status, pipeline_status_lock
|
chunks, pipeline_status, pipeline_status_lock
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
await entity_relation_task
|
chunk_results = await entity_relation_task
|
||||||
file_extraction_stage_ok = True
|
file_extraction_stage_ok = True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Log error and update pipeline status
|
# Check if this is a user cancellation
|
||||||
logger.error(traceback.format_exc())
|
if isinstance(e, PipelineCancelledException):
|
||||||
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
|
# User cancellation - log brief message only, no traceback
|
||||||
logger.error(error_msg)
|
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
|
||||||
async with pipeline_status_lock:
|
logger.warning(error_msg)
|
||||||
pipeline_status["latest_message"] = error_msg
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(
|
pipeline_status["latest_message"] = error_msg
|
||||||
traceback.format_exc()
|
pipeline_status["history_messages"].append(
|
||||||
)
|
error_msg
|
||||||
pipeline_status["history_messages"].append(error_msg)
|
)
|
||||||
|
else:
|
||||||
|
# Other exceptions - log with traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = error_msg
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
traceback.format_exc()
|
||||||
|
)
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
error_msg
|
||||||
|
)
|
||||||
|
|
||||||
# Cancel tasks that are not yet completed
|
# Cancel tasks that are not yet completed
|
||||||
all_tasks = first_stage_tasks + (
|
all_tasks = first_stage_tasks + (
|
||||||
|
|
@ -1814,9 +1871,14 @@ class LightRAG:
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
# Persistent llm cache
|
# Persistent llm cache with error handling
|
||||||
if self.llm_response_cache:
|
if self.llm_response_cache:
|
||||||
await self.llm_response_cache.index_done_callback()
|
try:
|
||||||
|
await self.llm_response_cache.index_done_callback()
|
||||||
|
except Exception as persist_error:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to persist LLM cache: {persist_error}"
|
||||||
|
)
|
||||||
|
|
||||||
# Record processing end time for failed case
|
# Record processing end time for failed case
|
||||||
processing_end_time = int(time.time())
|
processing_end_time = int(time.time())
|
||||||
|
|
@ -1846,8 +1908,16 @@ class LightRAG:
|
||||||
# Concurrency is controlled by keyed lock for individual entities and relationships
|
# Concurrency is controlled by keyed lock for individual entities and relationships
|
||||||
if file_extraction_stage_ok:
|
if file_extraction_stage_ok:
|
||||||
try:
|
try:
|
||||||
# Get chunk_results from entity_relation_task
|
# Check for cancellation before merge
|
||||||
chunk_results = await entity_relation_task
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get(
|
||||||
|
"cancellation_requested", False
|
||||||
|
):
|
||||||
|
raise PipelineCancelledException(
|
||||||
|
"User cancelled"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use chunk_results from entity_relation_task
|
||||||
await merge_nodes_and_edges(
|
await merge_nodes_and_edges(
|
||||||
chunk_results=chunk_results, # result collected from entity_relation_task
|
chunk_results=chunk_results, # result collected from entity_relation_task
|
||||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||||
|
|
@ -1904,22 +1974,38 @@ class LightRAG:
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Log error and update pipeline status
|
# Check if this is a user cancellation
|
||||||
logger.error(traceback.format_exc())
|
if isinstance(e, PipelineCancelledException):
|
||||||
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
|
# User cancellation - log brief message only, no traceback
|
||||||
logger.error(error_msg)
|
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
|
||||||
async with pipeline_status_lock:
|
logger.warning(error_msg)
|
||||||
pipeline_status["latest_message"] = error_msg
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(
|
pipeline_status["latest_message"] = error_msg
|
||||||
traceback.format_exc()
|
pipeline_status["history_messages"].append(
|
||||||
)
|
error_msg
|
||||||
pipeline_status["history_messages"].append(
|
)
|
||||||
error_msg
|
else:
|
||||||
)
|
# Other exceptions - log with traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = error_msg
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
traceback.format_exc()
|
||||||
|
)
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
error_msg
|
||||||
|
)
|
||||||
|
|
||||||
# Persistent llm cache
|
# Persistent llm cache with error handling
|
||||||
if self.llm_response_cache:
|
if self.llm_response_cache:
|
||||||
await self.llm_response_cache.index_done_callback()
|
try:
|
||||||
|
await self.llm_response_cache.index_done_callback()
|
||||||
|
except Exception as persist_error:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to persist LLM cache: {persist_error}"
|
||||||
|
)
|
||||||
|
|
||||||
# Record processing end time for failed case
|
# Record processing end time for failed case
|
||||||
processing_end_time = int(time.time())
|
processing_end_time = int(time.time())
|
||||||
|
|
@ -1960,7 +2046,19 @@ class LightRAG:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait for all document processing to complete
|
# Wait for all document processing to complete
|
||||||
await asyncio.gather(*doc_tasks)
|
try:
|
||||||
|
await asyncio.gather(*doc_tasks)
|
||||||
|
except PipelineCancelledException:
|
||||||
|
# Cancel all remaining tasks
|
||||||
|
for task in doc_tasks:
|
||||||
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
# Wait for all tasks to complete cancellation
|
||||||
|
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
|
||||||
|
|
||||||
|
# Exit directly (document statuses already updated in process_document)
|
||||||
|
return
|
||||||
|
|
||||||
# Check if there's a pending request to process more documents (with lock)
|
# Check if there's a pending request to process more documents (with lock)
|
||||||
has_pending_request = False
|
has_pending_request = False
|
||||||
|
|
@ -1991,11 +2089,14 @@ class LightRAG:
|
||||||
to_process_docs.update(pending_docs)
|
to_process_docs.update(pending_docs)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log_message = "Enqueued document processing pipeline stoped"
|
log_message = "Enqueued document processing pipeline stopped"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
# Always reset busy status when done or if an exception occurs (with lock)
|
# Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["busy"] = False
|
pipeline_status["busy"] = False
|
||||||
|
pipeline_status["cancellation_requested"] = (
|
||||||
|
False # Always reset cancellation flag
|
||||||
|
)
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
|
@ -2783,7 +2884,9 @@ class LightRAG:
|
||||||
# Return the dictionary containing statuses only for the found document IDs
|
# Return the dictionary containing statuses only for the found document IDs
|
||||||
return found_statuses
|
return found_statuses
|
||||||
|
|
||||||
async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult:
|
async def adelete_by_doc_id(
|
||||||
|
self, doc_id: str, delete_llm_cache: bool = False
|
||||||
|
) -> DeletionResult:
|
||||||
"""Delete a document and all its related data, including chunks, graph elements.
|
"""Delete a document and all its related data, including chunks, graph elements.
|
||||||
|
|
||||||
This method orchestrates a comprehensive deletion process for a given document ID.
|
This method orchestrates a comprehensive deletion process for a given document ID.
|
||||||
|
|
@ -2793,6 +2896,8 @@ class LightRAG:
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
doc_id (str): The unique identifier of the document to be deleted.
|
doc_id (str): The unique identifier of the document to be deleted.
|
||||||
|
delete_llm_cache (bool): Whether to delete cached LLM extraction results
|
||||||
|
associated with the document. Defaults to False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
DeletionResult: An object containing the outcome of the deletion process.
|
DeletionResult: An object containing the outcome of the deletion process.
|
||||||
|
|
@ -2804,6 +2909,7 @@ class LightRAG:
|
||||||
"""
|
"""
|
||||||
deletion_operations_started = False
|
deletion_operations_started = False
|
||||||
original_exception = None
|
original_exception = None
|
||||||
|
doc_llm_cache_ids: list[str] = []
|
||||||
|
|
||||||
# Get pipeline status shared data and lock for status updates
|
# Get pipeline status shared data and lock for status updates
|
||||||
pipeline_status = await get_namespace_data("pipeline_status")
|
pipeline_status = await get_namespace_data("pipeline_status")
|
||||||
|
|
@ -2904,6 +3010,57 @@ class LightRAG:
|
||||||
# Mark that deletion operations have started
|
# Mark that deletion operations have started
|
||||||
deletion_operations_started = True
|
deletion_operations_started = True
|
||||||
|
|
||||||
|
if delete_llm_cache and chunk_ids:
|
||||||
|
if not self.llm_response_cache:
|
||||||
|
logger.info(
|
||||||
|
"Skipping LLM cache collection for document %s because cache storage is unavailable",
|
||||||
|
doc_id,
|
||||||
|
)
|
||||||
|
elif not self.text_chunks:
|
||||||
|
logger.info(
|
||||||
|
"Skipping LLM cache collection for document %s because text chunk storage is unavailable",
|
||||||
|
doc_id,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
chunk_data_list = await self.text_chunks.get_by_ids(
|
||||||
|
list(chunk_ids)
|
||||||
|
)
|
||||||
|
seen_cache_ids: set[str] = set()
|
||||||
|
for chunk_data in chunk_data_list:
|
||||||
|
if not chunk_data or not isinstance(chunk_data, dict):
|
||||||
|
continue
|
||||||
|
cache_ids = chunk_data.get("llm_cache_list", [])
|
||||||
|
if not isinstance(cache_ids, list):
|
||||||
|
continue
|
||||||
|
for cache_id in cache_ids:
|
||||||
|
if (
|
||||||
|
isinstance(cache_id, str)
|
||||||
|
and cache_id
|
||||||
|
and cache_id not in seen_cache_ids
|
||||||
|
):
|
||||||
|
doc_llm_cache_ids.append(cache_id)
|
||||||
|
seen_cache_ids.add(cache_id)
|
||||||
|
if doc_llm_cache_ids:
|
||||||
|
logger.info(
|
||||||
|
"Collected %d LLM cache entries for document %s",
|
||||||
|
len(doc_llm_cache_ids),
|
||||||
|
doc_id,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"No LLM cache entries found for document %s", doc_id
|
||||||
|
)
|
||||||
|
except Exception as cache_collect_error:
|
||||||
|
logger.error(
|
||||||
|
"Failed to collect LLM cache ids for document %s: %s",
|
||||||
|
doc_id,
|
||||||
|
cache_collect_error,
|
||||||
|
)
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to collect LLM cache ids for document {doc_id}: {cache_collect_error}"
|
||||||
|
) from cache_collect_error
|
||||||
|
|
||||||
# 4. Analyze entities and relationships that will be affected
|
# 4. Analyze entities and relationships that will be affected
|
||||||
entities_to_delete = set()
|
entities_to_delete = set()
|
||||||
entities_to_rebuild = {} # entity_name -> remaining chunk id list
|
entities_to_rebuild = {} # entity_name -> remaining chunk id list
|
||||||
|
|
@ -3078,38 +3235,31 @@ class LightRAG:
|
||||||
|
|
||||||
if entity_chunk_updates and self.entity_chunks:
|
if entity_chunk_updates and self.entity_chunks:
|
||||||
entity_upsert_payload = {}
|
entity_upsert_payload = {}
|
||||||
entity_delete_ids: set[str] = set()
|
|
||||||
for entity_name, remaining in entity_chunk_updates.items():
|
for entity_name, remaining in entity_chunk_updates.items():
|
||||||
if not remaining:
|
if not remaining:
|
||||||
entity_delete_ids.add(entity_name)
|
# Empty entities are deleted alongside graph nodes later
|
||||||
else:
|
continue
|
||||||
entity_upsert_payload[entity_name] = {
|
entity_upsert_payload[entity_name] = {
|
||||||
"chunk_ids": remaining,
|
"chunk_ids": remaining,
|
||||||
"count": len(remaining),
|
"count": len(remaining),
|
||||||
"updated_at": current_time,
|
"updated_at": current_time,
|
||||||
}
|
}
|
||||||
|
|
||||||
if entity_delete_ids:
|
|
||||||
await self.entity_chunks.delete(list(entity_delete_ids))
|
|
||||||
if entity_upsert_payload:
|
if entity_upsert_payload:
|
||||||
await self.entity_chunks.upsert(entity_upsert_payload)
|
await self.entity_chunks.upsert(entity_upsert_payload)
|
||||||
|
|
||||||
if relation_chunk_updates and self.relation_chunks:
|
if relation_chunk_updates and self.relation_chunks:
|
||||||
relation_upsert_payload = {}
|
relation_upsert_payload = {}
|
||||||
relation_delete_ids: set[str] = set()
|
|
||||||
for edge_tuple, remaining in relation_chunk_updates.items():
|
for edge_tuple, remaining in relation_chunk_updates.items():
|
||||||
storage_key = make_relation_chunk_key(*edge_tuple)
|
|
||||||
if not remaining:
|
if not remaining:
|
||||||
relation_delete_ids.add(storage_key)
|
# Empty relations are deleted alongside graph edges later
|
||||||
else:
|
continue
|
||||||
relation_upsert_payload[storage_key] = {
|
storage_key = make_relation_chunk_key(*edge_tuple)
|
||||||
"chunk_ids": remaining,
|
relation_upsert_payload[storage_key] = {
|
||||||
"count": len(remaining),
|
"chunk_ids": remaining,
|
||||||
"updated_at": current_time,
|
"count": len(remaining),
|
||||||
}
|
"updated_at": current_time,
|
||||||
|
}
|
||||||
|
|
||||||
if relation_delete_ids:
|
|
||||||
await self.relation_chunks.delete(list(relation_delete_ids))
|
|
||||||
if relation_upsert_payload:
|
if relation_upsert_payload:
|
||||||
await self.relation_chunks.upsert(relation_upsert_payload)
|
await self.relation_chunks.upsert(relation_upsert_payload)
|
||||||
|
|
||||||
|
|
@ -3136,39 +3286,10 @@ class LightRAG:
|
||||||
logger.error(f"Failed to delete chunks: {e}")
|
logger.error(f"Failed to delete chunks: {e}")
|
||||||
raise Exception(f"Failed to delete document chunks: {e}") from e
|
raise Exception(f"Failed to delete document chunks: {e}") from e
|
||||||
|
|
||||||
# 6. Delete entities that have no remaining sources
|
# 6. Delete relationships that have no remaining sources
|
||||||
if entities_to_delete:
|
|
||||||
try:
|
|
||||||
# Delete from vector database
|
|
||||||
entity_vdb_ids = [
|
|
||||||
compute_mdhash_id(entity, prefix="ent-")
|
|
||||||
for entity in entities_to_delete
|
|
||||||
]
|
|
||||||
await self.entities_vdb.delete(entity_vdb_ids)
|
|
||||||
|
|
||||||
# Delete from graph
|
|
||||||
await self.chunk_entity_relation_graph.remove_nodes(
|
|
||||||
list(entities_to_delete)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Delete from entity_chunks storage
|
|
||||||
if self.entity_chunks:
|
|
||||||
await self.entity_chunks.delete(list(entities_to_delete))
|
|
||||||
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
|
||||||
logger.info(log_message)
|
|
||||||
pipeline_status["latest_message"] = log_message
|
|
||||||
pipeline_status["history_messages"].append(log_message)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to delete entities: {e}")
|
|
||||||
raise Exception(f"Failed to delete entities: {e}") from e
|
|
||||||
|
|
||||||
# 7. Delete relationships that have no remaining sources
|
|
||||||
if relationships_to_delete:
|
if relationships_to_delete:
|
||||||
try:
|
try:
|
||||||
# Delete from vector database
|
# Delete from relation vdb
|
||||||
rel_ids_to_delete = []
|
rel_ids_to_delete = []
|
||||||
for src, tgt in relationships_to_delete:
|
for src, tgt in relationships_to_delete:
|
||||||
rel_ids_to_delete.extend(
|
rel_ids_to_delete.extend(
|
||||||
|
|
@ -3202,13 +3323,105 @@ class LightRAG:
|
||||||
logger.error(f"Failed to delete relationships: {e}")
|
logger.error(f"Failed to delete relationships: {e}")
|
||||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||||
|
|
||||||
|
# 7. Delete entities that have no remaining sources
|
||||||
|
if entities_to_delete:
|
||||||
|
try:
|
||||||
|
# Batch get all edges for entities to avoid N+1 query problem
|
||||||
|
nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch(
|
||||||
|
list(entities_to_delete)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Debug: Check and log all edges before deleting nodes
|
||||||
|
edges_to_delete = set()
|
||||||
|
edges_still_exist = 0
|
||||||
|
|
||||||
|
for entity, edges in nodes_edges_dict.items():
|
||||||
|
if edges:
|
||||||
|
for src, tgt in edges:
|
||||||
|
# Normalize edge representation (sorted for consistency)
|
||||||
|
edge_tuple = tuple(sorted((src, tgt)))
|
||||||
|
edges_to_delete.add(edge_tuple)
|
||||||
|
|
||||||
|
if (
|
||||||
|
src in entities_to_delete
|
||||||
|
and tgt in entities_to_delete
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {src} <-> {tgt}"
|
||||||
|
)
|
||||||
|
elif src in entities_to_delete:
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {src} --> {tgt}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {src} <-- {tgt}"
|
||||||
|
)
|
||||||
|
edges_still_exist += 1
|
||||||
|
|
||||||
|
if edges_still_exist:
|
||||||
|
logger.warning(
|
||||||
|
f"⚠️ {edges_still_exist} entities still has edges before deletion"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Clean residual edges from VDB and storage before deleting nodes
|
||||||
|
if edges_to_delete:
|
||||||
|
# Delete from relationships_vdb
|
||||||
|
rel_ids_to_delete = []
|
||||||
|
for src, tgt in edges_to_delete:
|
||||||
|
rel_ids_to_delete.extend(
|
||||||
|
[
|
||||||
|
compute_mdhash_id(src + tgt, prefix="rel-"),
|
||||||
|
compute_mdhash_id(tgt + src, prefix="rel-"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
await self.relationships_vdb.delete(rel_ids_to_delete)
|
||||||
|
|
||||||
|
# Delete from relation_chunks storage
|
||||||
|
if self.relation_chunks:
|
||||||
|
relation_storage_keys = [
|
||||||
|
make_relation_chunk_key(src, tgt)
|
||||||
|
for src, tgt in edges_to_delete
|
||||||
|
]
|
||||||
|
await self.relation_chunks.delete(relation_storage_keys)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete from graph (edges will be auto-deleted with nodes)
|
||||||
|
await self.chunk_entity_relation_graph.remove_nodes(
|
||||||
|
list(entities_to_delete)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete from vector vdb
|
||||||
|
entity_vdb_ids = [
|
||||||
|
compute_mdhash_id(entity, prefix="ent-")
|
||||||
|
for entity in entities_to_delete
|
||||||
|
]
|
||||||
|
await self.entities_vdb.delete(entity_vdb_ids)
|
||||||
|
|
||||||
|
# Delete from entity_chunks storage
|
||||||
|
if self.entity_chunks:
|
||||||
|
await self.entity_chunks.delete(list(entities_to_delete))
|
||||||
|
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||||
|
logger.info(log_message)
|
||||||
|
pipeline_status["latest_message"] = log_message
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete entities: {e}")
|
||||||
|
raise Exception(f"Failed to delete entities: {e}") from e
|
||||||
|
|
||||||
# Persist changes to graph database before releasing graph database lock
|
# Persist changes to graph database before releasing graph database lock
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
# 8. Rebuild entities and relationships from remaining chunks
|
# 8. Rebuild entities and relationships from remaining chunks
|
||||||
if entities_to_rebuild or relationships_to_rebuild:
|
if entities_to_rebuild or relationships_to_rebuild:
|
||||||
try:
|
try:
|
||||||
await _rebuild_knowledge_from_chunks(
|
await rebuild_knowledge_from_chunks(
|
||||||
entities_to_rebuild=entities_to_rebuild,
|
entities_to_rebuild=entities_to_rebuild,
|
||||||
relationships_to_rebuild=relationships_to_rebuild,
|
relationships_to_rebuild=relationships_to_rebuild,
|
||||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||||
|
|
@ -3245,6 +3458,23 @@ class LightRAG:
|
||||||
logger.error(f"Failed to delete document and status: {e}")
|
logger.error(f"Failed to delete document and status: {e}")
|
||||||
raise Exception(f"Failed to delete document and status: {e}") from e
|
raise Exception(f"Failed to delete document and status: {e}") from e
|
||||||
|
|
||||||
|
if delete_llm_cache and doc_llm_cache_ids and self.llm_response_cache:
|
||||||
|
try:
|
||||||
|
await self.llm_response_cache.delete(doc_llm_cache_ids)
|
||||||
|
cache_log_message = f"Successfully deleted {len(doc_llm_cache_ids)} LLM cache entries for document {doc_id}"
|
||||||
|
logger.info(cache_log_message)
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = cache_log_message
|
||||||
|
pipeline_status["history_messages"].append(cache_log_message)
|
||||||
|
log_message = cache_log_message
|
||||||
|
except Exception as cache_delete_error:
|
||||||
|
log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
|
||||||
|
logger.error(log_message)
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = log_message
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
return DeletionResult(
|
return DeletionResult(
|
||||||
status="success",
|
status="success",
|
||||||
doc_id=doc_id,
|
doc_id=doc_id,
|
||||||
|
|
@ -3409,7 +3639,11 @@ class LightRAG:
|
||||||
)
|
)
|
||||||
|
|
||||||
async def aedit_entity(
|
async def aedit_entity(
|
||||||
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
|
self,
|
||||||
|
entity_name: str,
|
||||||
|
updated_data: dict[str, str],
|
||||||
|
allow_rename: bool = True,
|
||||||
|
allow_merge: bool = False,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Asynchronously edit entity information.
|
"""Asynchronously edit entity information.
|
||||||
|
|
||||||
|
|
@ -3420,6 +3654,7 @@ class LightRAG:
|
||||||
entity_name: Name of the entity to edit
|
entity_name: Name of the entity to edit
|
||||||
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
|
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
|
||||||
allow_rename: Whether to allow entity renaming, defaults to True
|
allow_rename: Whether to allow entity renaming, defaults to True
|
||||||
|
allow_merge: Whether to merge into an existing entity when renaming to an existing name
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing updated entity information
|
Dictionary containing updated entity information
|
||||||
|
|
@ -3433,16 +3668,21 @@ class LightRAG:
|
||||||
entity_name,
|
entity_name,
|
||||||
updated_data,
|
updated_data,
|
||||||
allow_rename,
|
allow_rename,
|
||||||
|
allow_merge,
|
||||||
self.entity_chunks,
|
self.entity_chunks,
|
||||||
self.relation_chunks,
|
self.relation_chunks,
|
||||||
)
|
)
|
||||||
|
|
||||||
def edit_entity(
|
def edit_entity(
|
||||||
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
|
self,
|
||||||
|
entity_name: str,
|
||||||
|
updated_data: dict[str, str],
|
||||||
|
allow_rename: bool = True,
|
||||||
|
allow_merge: bool = False,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
loop = always_get_an_event_loop()
|
loop = always_get_an_event_loop()
|
||||||
return loop.run_until_complete(
|
return loop.run_until_complete(
|
||||||
self.aedit_entity(entity_name, updated_data, allow_rename)
|
self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def aedit_relation(
|
async def aedit_relation(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue