fix: sync core modules with upstream after Wave 2

This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:52 +08:00
parent 567037b795
commit 93778770ab
3 changed files with 1661 additions and 519 deletions

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import traceback import traceback
import asyncio import asyncio
import configparser import configparser
import inspect
import os import os
import time import time
import warnings import warnings
@ -12,6 +13,7 @@ from functools import partial
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Awaitable,
Callable, Callable,
Iterator, Iterator,
cast, cast,
@ -20,8 +22,10 @@ from typing import (
Optional, Optional,
List, List,
Dict, Dict,
Union,
) )
from lightrag.prompt import PROMPTS from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException
from lightrag.constants import ( from lightrag.constants import (
DEFAULT_MAX_GLEANING, DEFAULT_MAX_GLEANING,
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
@ -60,9 +64,10 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_default_workspace,
set_default_workspace,
get_namespace_lock,
) )
from lightrag.base import ( from lightrag.base import (
@ -86,7 +91,7 @@ from lightrag.operate import (
merge_nodes_and_edges, merge_nodes_and_edges,
kg_query, kg_query,
naive_query, naive_query,
_rebuild_knowledge_from_chunks, rebuild_knowledge_from_chunks,
) )
from lightrag.constants import GRAPH_FIELD_SEP from lightrag.constants import GRAPH_FIELD_SEP
from lightrag.utils import ( from lightrag.utils import (
@ -242,23 +247,28 @@ class LightRAG:
int, int,
int, int,
], ],
List[Dict[str, Any]], Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]],
] = field(default_factory=lambda: chunking_by_token_size) ] = field(default_factory=lambda: chunking_by_token_size)
""" """
Custom chunking function for splitting text into chunks before processing. Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters: The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization. - `tokenizer`: A Tokenizer instance to use for tokenization.
- `content`: The text to be split into chunks. - `content`: The text to be split into chunks.
- `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens.
- `split_by_character_only`: If True, the text is split only on the specified character. - `split_by_character_only`: If True, the text is split only on the specified character.
- `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
- `chunk_token_size`: The maximum number of tokens per chunk.
The function should return a list of dictionaries, where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk. The function should return a list of dictionaries (or an awaitable that resolves to a list),
- `content`: The text content of the chunk. where each dictionary contains the following keys:
- `tokens` (int): The number of tokens in the chunk.
- `content` (str): The text content of the chunk.
- `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document.
Defaults to `chunking_by_token_size` if not specified. Defaults to `chunking_by_token_size` if not specified.
""" """
@ -269,6 +279,9 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None) embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use.""" """Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations.""" """Batch size for embedding computations."""
@ -512,6 +525,16 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n") logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding # Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call( self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async, self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout, llm_timeout=self.default_embedding_timeout,
@ -638,6 +661,22 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set the first initialized workspace will set the default workspace
# Allows namespace operation without specifying workspace for backward compatibility
default_workspace = get_default_workspace()
if default_workspace is None:
set_default_workspace(self.workspace)
elif default_workspace != self.workspace:
logger.info(
f"Creating LightRAG instance with workspace='{self.workspace}' "
f"while default workspace is set to '{default_workspace}'"
)
# Auto-initialize pipeline_status for this workspace
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status(workspace=self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
self.text_chunks, self.text_chunks,
@ -709,7 +748,7 @@ class LightRAG:
async def check_and_migrate_data(self): async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary""" """Check if data migration is needed and perform migration if necessary"""
async with get_data_init_lock(enable_logging=True): async with get_data_init_lock():
try: try:
# Check if migration is needed: # Check if migration is needed:
# 1. chunk_entity_relation_graph has entities and relations (count > 0) # 1. chunk_entity_relation_graph has entities and relations (count > 0)
@ -1572,8 +1611,12 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_pipeline_status_lock() "pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -1603,6 +1646,7 @@ class LightRAG:
"batchs": 0, # Total number of files to be processed "batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed "cur_batch": 0, # Number of files already processed
"request_pending": False, # Clear any previous request "request_pending": False, # Clear any previous request
"cancellation_requested": False, # Initialize cancellation flag
"latest_message": "", "latest_message": "",
} }
) )
@ -1619,6 +1663,22 @@ class LightRAG:
try: try:
# Process documents until no more documents or requests # Process documents until no more documents or requests
while True: while True:
# Check for cancellation request at the start of main loop
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
# Clear pending request
pipeline_status["request_pending"] = False
# Celar cancellation flag
pipeline_status["cancellation_requested"] = False
log_message = "Pipeline cancelled by user"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Exit directly, skipping request_pending check
return
if not to_process_docs: if not to_process_docs:
log_message = "All enqueued documents have been processed" log_message = "All enqueued documents have been processed"
logger.info(log_message) logger.info(log_message)
@ -1681,14 +1741,25 @@ class LightRAG:
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
) -> None: ) -> None:
"""Process single document""" """Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore: async with semaphore:
nonlocal processed_count nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling # Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = [] first_stage_tasks = []
entity_relation_task = None entity_relation_task = None
try: try:
# Check for cancellation before starting document processing
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Get file path from status document # Get file path from status document
file_path = getattr( file_path = getattr(
status_doc, "file_path", "unknown_source" status_doc, "file_path", "unknown_source"
@ -1727,7 +1798,28 @@ class LightRAG:
) )
content = content_data["content"] content = content_data["content"]
# Generate chunks from document # Call chunking function, supporting both sync and async implementations
chunking_result = self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
# If result is awaitable, await to get actual result
if inspect.isawaitable(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp, **dp,
@ -1735,14 +1827,7 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk "file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
} }
for dp in self.chunking_func( for dp in chunking_result
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
} }
if not chunks: if not chunks:
@ -1751,6 +1836,11 @@ class LightRAG:
# Record processing start time # Record processing start time
processing_start_time = int(time.time()) processing_start_time = int(time.time())
# Check for cancellation before entity extraction
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Process document in two stages # Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution) # Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task( doc_status_task = asyncio.create_task(
@ -1801,20 +1891,33 @@ class LightRAG:
chunks, pipeline_status, pipeline_status_lock chunks, pipeline_status, pipeline_status_lock
) )
) )
await entity_relation_task chunk_results = await entity_relation_task
file_extraction_stage_ok = True file_extraction_stage_ok = True
except Exception as e: except Exception as e:
# Log error and update pipeline status # Check if this is a user cancellation
logger.error(traceback.format_exc()) if isinstance(e, PipelineCancelledException):
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" # User cancellation - log brief message only, no traceback
logger.error(error_msg) error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
async with pipeline_status_lock: logger.warning(error_msg)
pipeline_status["latest_message"] = error_msg async with pipeline_status_lock:
pipeline_status["history_messages"].append( pipeline_status["latest_message"] = error_msg
traceback.format_exc() pipeline_status["history_messages"].append(
) error_msg
pipeline_status["history_messages"].append(error_msg) )
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Cancel tasks that are not yet completed # Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + ( all_tasks = first_stage_tasks + (
@ -1824,9 +1927,14 @@ class LightRAG:
if task and not task.done(): if task and not task.done():
task.cancel() task.cancel()
# Persistent llm cache # Persistent llm cache with error handling
if self.llm_response_cache: if self.llm_response_cache:
await self.llm_response_cache.index_done_callback() try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case # Record processing end time for failed case
processing_end_time = int(time.time()) processing_end_time = int(time.time())
@ -1856,8 +1964,16 @@ class LightRAG:
# Concurrency is controlled by keyed lock for individual entities and relationships # Concurrency is controlled by keyed lock for individual entities and relationships
if file_extraction_stage_ok: if file_extraction_stage_ok:
try: try:
# Get chunk_results from entity_relation_task # Check for cancellation before merge
chunk_results = await entity_relation_task async with pipeline_status_lock:
if pipeline_status.get(
"cancellation_requested", False
):
raise PipelineCancelledException(
"User cancelled"
)
# Use chunk_results from entity_relation_task
await merge_nodes_and_edges( await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -1914,22 +2030,38 @@ class LightRAG:
) )
except Exception as e: except Exception as e:
# Log error and update pipeline status # Check if this is a user cancellation
logger.error(traceback.format_exc()) if isinstance(e, PipelineCancelledException):
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" # User cancellation - log brief message only, no traceback
logger.error(error_msg) error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
async with pipeline_status_lock: logger.warning(error_msg)
pipeline_status["latest_message"] = error_msg async with pipeline_status_lock:
pipeline_status["history_messages"].append( pipeline_status["latest_message"] = error_msg
traceback.format_exc() pipeline_status["history_messages"].append(
) error_msg
pipeline_status["history_messages"].append( )
error_msg else:
) # Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache # Persistent llm cache with error handling
if self.llm_response_cache: if self.llm_response_cache:
await self.llm_response_cache.index_done_callback() try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case # Record processing end time for failed case
processing_end_time = int(time.time()) processing_end_time = int(time.time())
@ -1970,7 +2102,19 @@ class LightRAG:
) )
# Wait for all document processing to complete # Wait for all document processing to complete
await asyncio.gather(*doc_tasks) try:
await asyncio.gather(*doc_tasks)
except PipelineCancelledException:
# Cancel all remaining tasks
for task in doc_tasks:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
# Exit directly (document statuses already updated in process_document)
return
# Check if there's a pending request to process more documents (with lock) # Check if there's a pending request to process more documents (with lock)
has_pending_request = False has_pending_request = False
@ -2001,11 +2145,14 @@ class LightRAG:
to_process_docs.update(pending_docs) to_process_docs.update(pending_docs)
finally: finally:
log_message = "Enqueued document processing pipeline stoped" log_message = "Enqueued document processing pipeline stopped"
logger.info(log_message) logger.info(log_message)
# Always reset busy status when done or if an exception occurs (with lock) # Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["busy"] = False pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
@ -2803,6 +2950,26 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents. are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2810,20 +2977,62 @@ class LightRAG:
Returns: Returns:
DeletionResult: An object containing the outcome of the deletion process. DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure". - `status` (str): "success", "not_found", "not_allowed", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted. - `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result. - `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available. - `file_path` (str | None): The file path of the deleted document, if available.
""" """
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False deletion_operations_started = False
original_exception = None original_exception = None
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message) logger.info(log_message)
@ -3055,6 +3264,9 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3076,6 +3288,7 @@ class LightRAG:
# Process relationships # Process relationships
for edge_data in affected_edges: for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
@ -3112,6 +3325,9 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3137,38 +3353,31 @@ class LightRAG:
if entity_chunk_updates and self.entity_chunks: if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {} entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items(): for entity_name, remaining in entity_chunk_updates.items():
if not remaining: if not remaining:
entity_delete_ids.add(entity_name) # Empty entities are deleted alongside graph nodes later
else: continue
entity_upsert_payload[entity_name] = { entity_upsert_payload[entity_name] = {
"chunk_ids": remaining, "chunk_ids": remaining,
"count": len(remaining), "count": len(remaining),
"updated_at": current_time, "updated_at": current_time,
} }
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload: if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload) await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks: if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {} relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items(): for edge_tuple, remaining in relation_chunk_updates.items():
storage_key = make_relation_chunk_key(*edge_tuple)
if not remaining: if not remaining:
relation_delete_ids.add(storage_key) # Empty relations are deleted alongside graph edges later
else: continue
relation_upsert_payload[storage_key] = { storage_key = make_relation_chunk_key(*edge_tuple)
"chunk_ids": remaining, relation_upsert_payload[storage_key] = {
"count": len(remaining), "chunk_ids": remaining,
"updated_at": current_time, "count": len(remaining),
} "updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload: if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload) await self.relation_chunks.upsert(relation_upsert_payload)
@ -3176,56 +3385,111 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
# Use graph database lock to prevent dirty read # Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore)
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
async with pipeline_status_lock: # 5. Delete chunks from storage
log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" if chunk_ids:
logger.info(log_message) try:
pipeline_status["latest_message"] = log_message await self.chunks_vdb.delete(chunk_ids)
pipeline_status["history_messages"].append(log_message) await self.text_chunks.delete(chunk_ids)
except Exception as e: async with pipeline_status_lock:
logger.error(f"Failed to delete chunks: {e}") log_message = (
raise Exception(f"Failed to delete document chunks: {e}") from e f"Successfully deleted {len(chunk_ids)} chunks from storage"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 6. Delete entities that have no remaining sources except Exception as e:
if entities_to_delete: logger.error(f"Failed to delete chunks: {e}")
try: raise Exception(f"Failed to delete document chunks: {e}") from e
# Delete from vector database
entity_vdb_ids = [ # 6. Delete relationships that have no remaining sources
compute_mdhash_id(entity, prefix="ent-") if relationships_to_delete:
for entity in entities_to_delete try:
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
] ]
await self.entities_vdb.delete(entity_vdb_ids) await self.relation_chunks.delete(relation_storage_keys)
# Delete from graph async with pipeline_status_lock:
await self.chunk_entity_relation_graph.remove_nodes( log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete) list(entities_to_delete)
) )
)
async with pipeline_status_lock: # Debug: Check and log all edges before deleting nodes
log_message = f"Successfully deleted {len(entities_to_delete)} entities" edges_to_delete = set()
logger.info(log_message) edges_still_exist = 0
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: for entity, edges in nodes_edges_dict.items():
logger.error(f"Failed to delete entities: {e}") if edges:
raise Exception(f"Failed to delete entities: {e}") from e for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
# 7. Delete relationships that have no remaining sources if (
if relationships_to_delete: src in entities_to_delete
try: and tgt in entities_to_delete
# Delete from vector database ):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in relationships_to_delete: for src, tgt in edges_to_delete:
rel_ids_to_delete.extend( rel_ids_to_delete.extend(
[ [
compute_mdhash_id(src + tgt, prefix="rel-"), compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3234,28 +3498,53 @@ class LightRAG:
) )
await self.relationships_vdb.delete(rel_ids_to_delete) await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph # Delete from relation_chunks storage
await self.chunk_entity_relation_graph.remove_edges( if self.relation_chunks:
list(relationships_to_delete) relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
) )
async with pipeline_status_lock: # Delete from graph (edges will be auto-deleted with nodes)
log_message = f"Successfully deleted {len(relationships_to_delete)} relations" await self.chunk_entity_relation_graph.remove_nodes(
logger.info(log_message) list(entities_to_delete)
pipeline_status["latest_message"] = log_message )
pipeline_status["history_messages"].append(log_message)
except Exception as e: # Delete from vector vdb
logger.error(f"Failed to delete relationships: {e}") entity_vdb_ids = [
raise Exception(f"Failed to delete relationships: {e}") from e compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Persist changes to graph database before releasing graph database lock # Delete from entity_chunks storage
await self._insert_done() if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = (
f"Successfully deleted {len(entities_to_delete)} entities"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before entity and relationship rebuild
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:
try: try:
await _rebuild_knowledge_from_chunks( await rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild, entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild, relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -3302,14 +3591,12 @@ class LightRAG:
pipeline_status["history_messages"].append(cache_log_message) pipeline_status["history_messages"].append(cache_log_message)
log_message = cache_log_message log_message = cache_log_message
except Exception as cache_delete_error: except Exception as cache_delete_error:
logger.error( log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
"Failed to delete LLM cache for document %s: %s", logger.error(log_message)
doc_id, logger.error(traceback.format_exc())
cache_delete_error, async with pipeline_status_lock:
) pipeline_status["latest_message"] = log_message
raise Exception( pipeline_status["history_messages"].append(log_message)
f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
) from cache_delete_error
return DeletionResult( return DeletionResult(
status="success", status="success",
@ -3358,6 +3645,18 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence" f"No deletion operations were started for document {doc_id}, skipping persistence"
) )
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult: async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships. """Asynchronously delete an entity and all its relationships.
@ -3475,16 +3774,22 @@ class LightRAG:
) )
async def aedit_entity( async def aedit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Asynchronously edit entity information. """Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database. Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args: Args:
entity_name: Name of the entity to edit entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name
Returns: Returns:
Dictionary containing updated entity information Dictionary containing updated entity information
@ -3498,14 +3803,21 @@ class LightRAG:
entity_name, entity_name,
updated_data, updated_data,
allow_rename, allow_rename,
allow_merge,
self.entity_chunks,
self.relation_chunks,
) )
def edit_entity( def edit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
loop = always_get_an_event_loop() loop = always_get_an_event_loop()
return loop.run_until_complete( return loop.run_until_complete(
self.aedit_entity(entity_name, updated_data, allow_rename) self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge)
) )
async def aedit_relation( async def aedit_relation(
@ -3514,6 +3826,7 @@ class LightRAG:
"""Asynchronously edit relation information. """Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args: Args:
source_entity: Name of the source entity source_entity: Name of the source entity
@ -3532,6 +3845,7 @@ class LightRAG:
source_entity, source_entity,
target_entity, target_entity,
updated_data, updated_data,
self.relation_chunks,
) )
def edit_relation( def edit_relation(
@ -3643,6 +3957,8 @@ class LightRAG:
target_entity, target_entity,
merge_strategy, merge_strategy,
target_entity_data, target_entity_data,
self.entity_chunks,
self.relation_chunks,
) )
def merge_entities( def merge_entities(

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,8 @@
from __future__ import annotations from __future__ import annotations
import weakref import weakref
import sys
import asyncio import asyncio
import html import html
import csv import csv
@ -40,6 +42,35 @@ from lightrag.constants import (
SOURCE_IDS_LIMIT_METHOD_FIFO, SOURCE_IDS_LIMIT_METHOD_FIFO,
) )
# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]")
class SafeStreamHandler(logging.StreamHandler):
"""StreamHandler that gracefully handles closed streams during shutdown.
This handler prevents "ValueError: I/O operation on closed file" errors
that can occur when pytest or other test frameworks close stdout/stderr
before Python's logging cleanup runs.
"""
def flush(self):
"""Flush the stream, ignoring errors if the stream is closed."""
try:
super().flush()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
def close(self):
"""Close the handler, ignoring errors if the stream is already closed."""
try:
super().close()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
# Initialize logger with basic configuration # Initialize logger with basic configuration
logger = logging.getLogger("lightrag") logger = logging.getLogger("lightrag")
logger.propagate = False # prevent log message send to root logger logger.propagate = False # prevent log message send to root logger
@ -47,7 +78,7 @@ logger.setLevel(logging.INFO)
# Add console handler if no handlers exist # Add console handler if no handlers exist
if not logger.handlers: if not logger.handlers:
console_handler = logging.StreamHandler() console_handler = SafeStreamHandler()
console_handler.setLevel(logging.INFO) console_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)s: %(message)s") formatter = logging.Formatter("%(levelname)s: %(message)s")
console_handler.setFormatter(formatter) console_handler.setFormatter(formatter)
@ -56,8 +87,32 @@ if not logger.handlers:
# Set httpx logging level to WARNING # Set httpx logging level to WARNING
logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING)
# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") def _patch_ascii_colors_console_handler() -> None:
"""Prevent ascii_colors from printing flush errors during interpreter exit."""
try:
from ascii_colors import ConsoleHandler
except ImportError:
return
if getattr(ConsoleHandler, "_lightrag_patched", False):
return
original_handle_error = ConsoleHandler.handle_error
def _safe_handle_error(self, message: str) -> None: # type: ignore[override]
exc_type, _, _ = sys.exc_info()
if exc_type in (ValueError, OSError) and "close" in message.lower():
return
original_handle_error(self, message)
ConsoleHandler.handle_error = _safe_handle_error # type: ignore[assignment]
ConsoleHandler._lightrag_patched = True # type: ignore[attr-defined]
_patch_ascii_colors_console_handler()
# Global import for pypinyin with startup-time logging # Global import for pypinyin with startup-time logging
try: try:
@ -286,8 +341,8 @@ def setup_logger(
logger_instance.handlers = [] # Clear existing handlers logger_instance.handlers = [] # Clear existing handlers
logger_instance.propagate = False logger_instance.propagate = False
# Add console handler # Add console handler with safe stream handling
console_handler = logging.StreamHandler() console_handler = SafeStreamHandler()
console_handler.setFormatter(simple_formatter) console_handler.setFormatter(simple_formatter)
console_handler.setLevel(level) console_handler.setLevel(level)
logger_instance.addHandler(console_handler) logger_instance.addHandler(console_handler)
@ -363,6 +418,7 @@ class EmbeddingFunc:
max_token_size: Optional token limit for the embedding model max_token_size: Optional token limit for the embedding model
send_dimensions: Whether to inject embedding_dim as a keyword argument send_dimensions: Whether to inject embedding_dim as a keyword argument
""" """
embedding_dim: int embedding_dim: int
func: callable func: callable
max_token_size: int | None = None # Token limit for the embedding model max_token_size: int | None = None # Token limit for the embedding model