From 7fa3cab3558ad7cbed1db09e11715428a9b878ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:14:29 +0800 Subject: [PATCH] cherry-pick 162370b6 --- lightrag/api/routers/document_routes.py | 512 +++++------------- lightrag/lightrag.py | 434 ++++----------- lightrag_webui/src/api/lightrag.ts | 8 +- .../documents/DeleteDocumentsDialog.tsx | 20 +- 4 files changed, 254 insertions(+), 720 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 1b3066c7..848d5eb8 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,15 +3,14 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio -from functools import lru_cache from lightrag.utils import logger, get_pinyin_sort_key import aiofiles import shutil import traceback +import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal -from io import BytesIO from fastapi import ( APIRouter, BackgroundTasks, @@ -24,33 +23,11 @@ from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus -from lightrag.utils import ( - generate_track_id, - compute_mdhash_id, - sanitize_text_for_encoding, -) +from lightrag.utils import generate_track_id from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args -@lru_cache(maxsize=1) -def _is_docling_available() -> bool: - """Check if docling is available (cached check). - - This function uses lru_cache to avoid repeated import attempts. - The result is cached after the first call. - - Returns: - bool: True if docling is available, False otherwise - """ - try: - import docling # noqa: F401 # type: ignore[import-not-found] - - return True - except ImportError: - return False - - # Function to format datetime to ISO format string with timezone information def format_datetime(dt: Any) -> Optional[str]: """Format datetime to ISO format string with timezone information @@ -163,7 +140,7 @@ class ReprocessResponse(BaseModel): Attributes: status: Status of the reprocessing operation message: Message describing the operation result - track_id: Always empty string. Reprocessed documents retain their original track_id. + track_id: Tracking ID for monitoring reprocessing progress """ status: Literal["reprocessing_started"] = Field( @@ -171,8 +148,7 @@ class ReprocessResponse(BaseModel): ) message: str = Field(description="Human-readable message describing the operation") track_id: str = Field( - default="", - description="Always empty string. Reprocessed documents retain their original track_id from initial upload.", + description="Tracking ID for monitoring reprocessing progress" ) class Config: @@ -180,29 +156,7 @@ class ReprocessResponse(BaseModel): "example": { "status": "reprocessing_started", "message": "Reprocessing of failed documents has been initiated in background", - "track_id": "", - } - } - - -class CancelPipelineResponse(BaseModel): - """Response model for pipeline cancellation operation - - Attributes: - status: Status of the cancellation request - message: Message describing the operation result - """ - - status: Literal["cancellation_requested", "not_busy"] = Field( - description="Status of the cancellation request" - ) - message: str = Field(description="Human-readable message describing the operation") - - class Config: - json_schema_extra = { - "example": { - "status": "cancellation_requested", - "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.", + "track_id": "retry_20250729_170612_def456", } } @@ -504,7 +458,7 @@ class DocsStatusesResponse(BaseModel): "id": "doc_789", "content_summary": "Document pending final indexing", "content_length": 7200, - "status": "preprocessed", + "status": "multimodal_processed", "created_at": "2025-03-31T09:30:00", "updated_at": "2025-03-31T09:35:00", "track_id": "upload_20250331_093000_xyz789", @@ -903,6 +857,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str Returns: str: Unique filename (may have numeric suffix added) """ + from pathlib import Path import time original_path = Path(original_name) @@ -925,122 +880,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str return f"{base_name}_{timestamp}{extension}" -# Document processing helper functions (synchronous) -# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop - - -def _convert_with_docling(file_path: Path) -> str: - """Convert document using docling (synchronous). - - Args: - file_path: Path to the document file - - Returns: - str: Extracted markdown content - """ - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - return result.document.export_to_markdown() - - -def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str: - """Extract PDF content using pypdf (synchronous). - - Args: - file_bytes: PDF file content as bytes - password: Optional password for encrypted PDFs - - Returns: - str: Extracted text content - - Raises: - Exception: If PDF is encrypted and password is incorrect or missing - """ - from pypdf import PdfReader # type: ignore - - pdf_file = BytesIO(file_bytes) - reader = PdfReader(pdf_file) - - # Check if PDF is encrypted - if reader.is_encrypted: - if not password: - raise Exception("PDF is encrypted but no password provided") - - decrypt_result = reader.decrypt(password) - if decrypt_result == 0: - raise Exception("Incorrect PDF password") - - # Extract text from all pages - content = "" - for page in reader.pages: - content += page.extract_text() + "\n" - - return content - - -def _extract_docx(file_bytes: bytes) -> str: - """Extract DOCX content (synchronous). - - Args: - file_bytes: DOCX file content as bytes - - Returns: - str: Extracted text content - """ - from docx import Document # type: ignore - - docx_file = BytesIO(file_bytes) - doc = Document(docx_file) - return "\n".join([paragraph.text for paragraph in doc.paragraphs]) - - -def _extract_pptx(file_bytes: bytes) -> str: - """Extract PPTX content (synchronous). - - Args: - file_bytes: PPTX file content as bytes - - Returns: - str: Extracted text content - """ - from pptx import Presentation # type: ignore - - pptx_file = BytesIO(file_bytes) - prs = Presentation(pptx_file) - content = "" - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text"): - content += shape.text + "\n" - return content - - -def _extract_xlsx(file_bytes: bytes) -> str: - """Extract XLSX content (synchronous). - - Args: - file_bytes: XLSX file content as bytes - - Returns: - str: Extracted text content - """ - from openpyxl import load_workbook # type: ignore - - xlsx_file = BytesIO(file_bytes) - wb = load_workbook(xlsx_file) - content = "" - for sheet in wb: - content += f"Sheet: {sheet.title}\n" - for row in sheet.iter_rows(values_only=True): - content += ( - "\t".join(str(cell) if cell is not None else "" for cell in row) + "\n" - ) - content += "\n" - return content - - async def pipeline_enqueue_file( rag: LightRAG, file_path: Path, track_id: str = None ) -> tuple[bool, str]: @@ -1211,28 +1050,24 @@ async def pipeline_enqueue_file( case ".pdf": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf." - ) - # Use pypdf (non-blocking via to_thread) - content = await asyncio.to_thread( - _extract_pdf_pypdf, - file, - global_args.pdf_decrypt_password, - ) + if not pm.is_installed("pypdf2"): # type: ignore + pm.install("pypdf2") + from PyPDF2 import PdfReader # type: ignore + from io import BytesIO + + pdf_file = BytesIO(file) + reader = PdfReader(pdf_file) + for page in reader.pages: + content += page.extract_text() + "\n" except Exception as e: error_files = [ { @@ -1252,24 +1087,28 @@ async def pipeline_enqueue_file( case ".docx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx." - ) - # Use python-docx (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_docx, file) + if not pm.is_installed("python-docx"): # type: ignore + try: + pm.install("python-docx") + except Exception: + pm.install("docx") + from docx import Document # type: ignore + from io import BytesIO + + docx_file = BytesIO(file) + doc = Document(docx_file) + content = "\n".join( + [paragraph.text for paragraph in doc.paragraphs] + ) except Exception as e: error_files = [ { @@ -1289,24 +1128,26 @@ async def pipeline_enqueue_file( case ".pptx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx." - ) - # Use python-pptx (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_pptx, file) + if not pm.is_installed("python-pptx"): # type: ignore + pm.install("pptx") + from pptx import Presentation # type: ignore + from io import BytesIO + + pptx_file = BytesIO(file) + prs = Presentation(pptx_file) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" except Exception as e: error_files = [ { @@ -1326,24 +1167,33 @@ async def pipeline_enqueue_file( case ".xlsx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl." - ) - # Use openpyxl (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_xlsx, file) + if not pm.is_installed("openpyxl"): # type: ignore + pm.install("openpyxl") + from openpyxl import load_workbook # type: ignore + from io import BytesIO + + xlsx_file = BytesIO(file) + wb = load_workbook(xlsx_file) + for sheet in wb: + content += f"Sheet: {sheet.title}\n" + for row in sheet.iter_rows(values_only=True): + content += ( + "\t".join( + str(cell) if cell is not None else "" + for cell in row + ) + + "\n" + ) + content += "\n" except Exception as e: error_files = [ { @@ -1646,11 +1496,11 @@ async def background_delete_documents( """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, + get_pipeline_status_lock, ) pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() total_docs = len(doc_ids) successful_deletions = [] @@ -1684,19 +1534,7 @@ async def background_delete_documents( try: # Loop through each document ID and delete them one by one for i, doc_id in enumerate(doc_ids, 1): - # Check for cancellation at the start of each document deletion async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining." - logger.info(cancel_msg) - pipeline_status["latest_message"] = cancel_msg - pipeline_status["history_messages"].append(cancel_msg) - # Add remaining documents to failed list with cancellation reason - failed_deletions.extend( - doc_ids[i - 1 :] - ) # i-1 because enumerate starts at 1 - break # Exit the loop, remaining documents unchanged - start_msg = f"Deleting document {i}/{total_docs}: {doc_id}" logger.info(start_msg) pipeline_status["cur_batch"] = i @@ -1859,10 +1697,6 @@ async def background_delete_documents( # Final summary and check for pending requests async with pipeline_status_lock: pipeline_status["busy"] = False - pipeline_status["pending_requests"] = False # Reset pending requests flag - pipeline_status["cancellation_requested"] = ( - False # Always reset cancellation flag - ) completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" pipeline_status["latest_message"] = completion_msg pipeline_status["history_messages"].append(completion_msg) @@ -1949,14 +1783,12 @@ def create_document_routes( # Check if filename already exists in doc_status storage existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) if existing_doc_data: - # Get document status and track_id from existing document + # Get document status information for error message status = existing_doc_data.get("status", "unknown") - # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) - existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", - track_id=existing_track_id, + track_id="", ) file_path = doc_manager.input_dir / safe_filename @@ -2020,30 +1852,14 @@ def create_document_routes( request.file_source ) if existing_doc_data: - # Get document status and track_id from existing document + # Get document status information for error message status = existing_doc_data.get("status", "unknown") - # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) - existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", - track_id=existing_track_id, + track_id="", ) - # Check if content already exists by computing content hash (doc_id) - sanitized_text = sanitize_text_for_encoding(request.text) - content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") - existing_doc = await rag.doc_status.get_by_id(content_doc_id) - if existing_doc: - # Content already exists, return duplicated with existing track_id - status = existing_doc.get("status", "unknown") - existing_track_id = existing_doc.get("track_id") or "" - return InsertResponse( - status="duplicated", - message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", - track_id=existing_track_id, - ) - # Generate track_id for text insertion track_id = generate_track_id("insert") @@ -2102,31 +1918,14 @@ def create_document_routes( file_source ) if existing_doc_data: - # Get document status and track_id from existing document + # Get document status information for error message status = existing_doc_data.get("status", "unknown") - # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) - existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{file_source}' already exists in document storage (Status: {status}).", - track_id=existing_track_id, + track_id="", ) - # Check if any content already exists by computing content hash (doc_id) - for text in request.texts: - sanitized_text = sanitize_text_for_encoding(text) - content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") - existing_doc = await rag.doc_status.get_by_id(content_doc_id) - if existing_doc: - # Content already exists, return duplicated with existing track_id - status = existing_doc.get("status", "unknown") - existing_track_id = existing_doc.get("track_id") or "" - return InsertResponse( - status="duplicated", - message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", - track_id=existing_track_id, - ) - # Generate track_id for texts insertion track_id = generate_track_id("insert") @@ -2174,12 +1973,12 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, + get_pipeline_status_lock, ) # Get pipeline status and lock pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() # Check and set status with lock async with pipeline_status_lock: @@ -2370,15 +2169,13 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, get_all_update_flags_status, ) pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") # Get update flags status for all namespaces - update_status = await get_all_update_flags_status(workspace=rag.workspace) + update_status = await get_all_update_flags_status() # Convert MutableBoolean objects to regular boolean values processed_update_status = {} @@ -2392,9 +2189,8 @@ def create_document_routes( processed_flags.append(bool(flag)) processed_update_status[namespace] = processed_flags - async with pipeline_status_lock: - # Convert to regular dict if it's a Manager.dict - status_dict = dict(pipeline_status) + # Convert to regular dict if it's a Manager.dict + status_dict = dict(pipeline_status) # Add processed update_status to the status dictionary status_dict["update_status"] = processed_update_status @@ -2434,7 +2230,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - # TODO: Deprecated, use /documents/paginated instead + # TODO: Deprecated @router.get( "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)] ) @@ -2581,22 +2377,17 @@ def create_document_routes( doc_ids = delete_request.doc_ids try: - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_namespace_lock, - ) + from lightrag.kg.shared_storage import get_namespace_data pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") - # Check if pipeline is busy with proper lock - async with pipeline_status_lock: - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Check if pipeline is busy + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2933,27 +2724,29 @@ def create_document_routes( This is useful for recovering from server crashes, network errors, LLM service outages, or other temporary failures that caused document processing to fail. - The processing happens in the background and can be monitored by checking the - pipeline status. The reprocessed documents retain their original track_id from - initial upload, so use their original track_id to monitor progress. + The processing happens in the background and can be monitored using the + returned track_id or by checking the pipeline status. Returns: - ReprocessResponse: Response with status and message. - track_id is always empty string because reprocessed documents retain - their original track_id from initial upload. + ReprocessResponse: Response with status, message, and track_id Raises: HTTPException: If an error occurs while initiating reprocessing (500). """ try: + # Generate track_id with "retry" prefix for retry operation + track_id = generate_track_id("retry") + # Start the reprocessing in the background - # Note: Reprocessed documents retain their original track_id from initial upload background_tasks.add_task(rag.apipeline_process_enqueue_documents) - logger.info("Reprocessing of failed documents initiated") + logger.info( + f"Reprocessing of failed documents initiated with track_id: {track_id}" + ) return ReprocessResponse( status="reprocessing_started", - message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.", + message="Reprocessing of failed documents has been initiated in background", + track_id=track_id, ) except Exception as e: @@ -2961,63 +2754,4 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @router.post( - "/cancel_pipeline", - response_model=CancelPipelineResponse, - dependencies=[Depends(combined_auth)], - ) - async def cancel_pipeline(): - """ - Request cancellation of the currently running pipeline. - - This endpoint sets a cancellation flag in the pipeline status. The pipeline will: - 1. Check this flag at key processing points - 2. Stop processing new documents - 3. Cancel all running document processing tasks - 4. Mark all PROCESSING documents as FAILED with reason "User cancelled" - - The cancellation is graceful and ensures data consistency. Documents that have - completed processing will remain in PROCESSED status. - - Returns: - CancelPipelineResponse: Response with status and message - - status="cancellation_requested": Cancellation flag has been set - - status="not_busy": Pipeline is not currently running - - Raises: - HTTPException: If an error occurs while setting cancellation flag (500). - """ - try: - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_namespace_lock, - ) - - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") - - async with pipeline_status_lock: - if not pipeline_status.get("busy", False): - return CancelPipelineResponse( - status="not_busy", - message="Pipeline is not currently running. No cancellation needed.", - ) - - # Set cancellation flag - pipeline_status["cancellation_requested"] = True - cancel_msg = "Pipeline cancellation requested by user" - logger.info(cancel_msg) - pipeline_status["latest_message"] = cancel_msg - pipeline_status["history_messages"].append(cancel_msg) - - return CancelPipelineResponse( - status="cancellation_requested", - message="Pipeline cancellation has been requested. Documents will be marked as FAILED.", - ) - - except Exception as e: - logger.error(f"Error requesting pipeline cancellation: {str(e)}") - logger.error(traceback.format_exc()) - raise HTTPException(status_code=500, detail=str(e)) - return router diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0c6f59f1..46d31ca2 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,7 +3,6 @@ from __future__ import annotations import traceback import asyncio import configparser -import inspect import os import time import warnings @@ -13,7 +12,6 @@ from functools import partial from typing import ( Any, AsyncIterator, - Awaitable, Callable, Iterator, cast, @@ -22,10 +20,8 @@ from typing import ( Optional, List, Dict, - Union, ) from lightrag.prompt import PROMPTS -from lightrag.exceptions import PipelineCancelledException from lightrag.constants import ( DEFAULT_MAX_GLEANING, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, @@ -90,7 +86,7 @@ from lightrag.operate import ( merge_nodes_and_edges, kg_query, naive_query, - rebuild_knowledge_from_chunks, + _rebuild_knowledge_from_chunks, ) from lightrag.constants import GRAPH_FIELD_SEP from lightrag.utils import ( @@ -246,13 +242,11 @@ class LightRAG: int, int, ], - Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], + List[Dict[str, Any]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. - The function can be either synchronous or asynchronous. - The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. @@ -262,8 +256,7 @@ class LightRAG: - `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - The function should return a list of dictionaries (or an awaitable that resolves to a list), - where each dictionary contains the following keys: + The function should return a list of dictionaries, where each dictionary contains the following keys: - `tokens`: The number of tokens in the chunk. - `content`: The text content of the chunk. @@ -276,9 +269,6 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" - embedding_token_limit: int | None = field(default=None, init=False) - """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" - embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -522,28 +512,12 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding - # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) - embedding_max_token_size = None - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - embedding_max_token_size = self.embedding_func.max_token_size - logger.debug( - f"Captured embedding max_token_size: {embedding_max_token_size}" - ) - self.embedding_token_limit = embedding_max_token_size - - # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, queue_name="Embedding func", )(self.embedding_func) - # Initialize embedding_token_limit from embedding_func - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - self.embedding_token_limit = self.embedding_func.max_token_size - else: - self.embedding_token_limit = None - # Initialize all storages self.key_string_value_json_storage_cls: type[BaseKVStorage] = ( self._get_storage_class(self.kv_storage) @@ -735,7 +709,7 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - async with get_data_init_lock(): + async with get_data_init_lock(enable_logging=True): try: # Check if migration is needed: # 1. chunk_entity_relation_graph has entities and relations (count > 0) @@ -1629,7 +1603,6 @@ class LightRAG: "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request - "cancellation_requested": False, # Initialize cancellation flag "latest_message": "", } ) @@ -1646,22 +1619,6 @@ class LightRAG: try: # Process documents until no more documents or requests while True: - # Check for cancellation request at the start of main loop - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - # Clear pending request - pipeline_status["request_pending"] = False - # Celar cancellation flag - pipeline_status["cancellation_requested"] = False - - log_message = "Pipeline cancelled by user" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Exit directly, skipping request_pending check - return - if not to_process_docs: log_message = "All enqueued documents have been processed" logger.info(log_message) @@ -1724,25 +1681,14 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - # Initialize variables at the start to prevent UnboundLocalError in error handling - file_path = "unknown_source" - current_file_number = 0 file_extraction_stage_ok = False - processing_start_time = int(time.time()) - first_stage_tasks = [] - entity_relation_task = None - async with semaphore: nonlocal processed_count + current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None try: - # Check for cancellation before starting document processing - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled") - # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" @@ -1781,28 +1727,7 @@ class LightRAG: ) content = content_data["content"] - # Call chunking function, supporting both sync and async implementations - chunking_result = self.chunking_func( - self.tokenizer, - content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) - - # If result is awaitable, await to get actual result - if inspect.isawaitable(chunking_result): - chunking_result = await chunking_result - - # Validate return type - if not isinstance(chunking_result, (list, tuple)): - raise TypeError( - f"chunking_func must return a list or tuple of dicts, " - f"got {type(chunking_result)}" - ) - - # Build chunks dictionary + # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1810,7 +1735,14 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in chunking_result + for dp in self.chunking_func( + self.tokenizer, + content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) } if not chunks: @@ -1819,11 +1751,6 @@ class LightRAG: # Record processing start time processing_start_time = int(time.time()) - # Check for cancellation before entity extraction - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled") - # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1874,33 +1801,20 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - chunk_results = await entity_relation_task + await entity_relation_task file_extraction_stage_ok = True except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append(error_msg) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1910,14 +1824,9 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1947,16 +1856,8 @@ class LightRAG: # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: - # Check for cancellation before merge - async with pipeline_status_lock: - if pipeline_status.get( - "cancellation_requested", False - ): - raise PipelineCancelledException( - "User cancelled" - ) - - # Use chunk_results from entity_relation_task + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -2013,38 +1914,22 @@ class LightRAG: ) except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -2085,19 +1970,7 @@ class LightRAG: ) # Wait for all document processing to complete - try: - await asyncio.gather(*doc_tasks) - except PipelineCancelledException: - # Cancel all remaining tasks - for task in doc_tasks: - if not task.done(): - task.cancel() - - # Wait for all tasks to complete cancellation - await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED) - - # Exit directly (document statuses already updated in process_document) - return + await asyncio.gather(*doc_tasks) # Check if there's a pending request to process more documents (with lock) has_pending_request = False @@ -2128,14 +2001,11 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Enqueued document processing pipeline stopped" + log_message = "Enqueued document processing pipeline stoped" logger.info(log_message) - # Always reset busy status and cancellation flag when done or if an exception occurs (with lock) + # Always reset busy status when done or if an exception occurs (with lock) async with pipeline_status_lock: pipeline_status["busy"] = False - pipeline_status["cancellation_requested"] = ( - False # Always reset cancellation flag - ) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -3185,9 +3055,6 @@ class LightRAG: ] if not existing_sources: - # No chunk references means this entity should be deleted - entities_to_delete.add(node_label) - entity_chunk_updates[node_label] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3209,7 +3076,6 @@ class LightRAG: # Process relationships for edge_data in affected_edges: - # source target is not in normalize order in graph db property src = edge_data.get("source") tgt = edge_data.get("target") @@ -3246,9 +3112,6 @@ class LightRAG: ] if not existing_sources: - # No chunk references means this relationship should be deleted - relationships_to_delete.add(edge_tuple) - relation_chunk_updates[edge_tuple] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3274,31 +3137,38 @@ class LightRAG: if entity_chunk_updates and self.entity_chunks: entity_upsert_payload = {} + entity_delete_ids: set[str] = set() for entity_name, remaining in entity_chunk_updates.items(): if not remaining: - # Empty entities are deleted alongside graph nodes later - continue - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + entity_delete_ids.add(entity_name) + else: + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + + if entity_delete_ids: + await self.entity_chunks.delete(list(entity_delete_ids)) if entity_upsert_payload: await self.entity_chunks.upsert(entity_upsert_payload) if relation_chunk_updates and self.relation_chunks: relation_upsert_payload = {} + relation_delete_ids: set[str] = set() for edge_tuple, remaining in relation_chunk_updates.items(): - if not remaining: - # Empty relations are deleted alongside graph edges later - continue storage_key = make_relation_chunk_key(*edge_tuple) - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + if not remaining: + relation_delete_ids.add(storage_key) + else: + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + if relation_delete_ids: + await self.relation_chunks.delete(list(relation_delete_ids)) if relation_upsert_payload: await self.relation_chunks.upsert(relation_upsert_payload) @@ -3325,10 +3195,35 @@ class LightRAG: logger.error(f"Failed to delete chunks: {e}") raise Exception(f"Failed to delete document chunks: {e}") from e - # 6. Delete relationships that have no remaining sources + # 6. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + + # 7. Delete relationships that have no remaining sources if relationships_to_delete: try: - # Delete from relation vdb + # Delete from vector database rel_ids_to_delete = [] for src, tgt in relationships_to_delete: rel_ids_to_delete.extend( @@ -3344,14 +3239,6 @@ class LightRAG: list(relationships_to_delete) ) - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - async with pipeline_status_lock: log_message = f"Successfully deleted {len(relationships_to_delete)} relations" logger.info(log_message) @@ -3362,105 +3249,13 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch( - list(entities_to_delete) - ) - - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 - - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) - - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 - - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb - rel_ids_to_delete = [] - for src, tgt in edges_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" - ) - - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - - # Delete from vector vdb - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) - - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - # Persist changes to graph database before releasing graph database lock await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - await rebuild_knowledge_from_chunks( + await _rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -3507,12 +3302,14 @@ class LightRAG: pipeline_status["history_messages"].append(cache_log_message) log_message = cache_log_message except Exception as cache_delete_error: - log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" - logger.error(log_message) - logger.error(traceback.format_exc()) - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + logger.error( + "Failed to delete LLM cache for document %s: %s", + doc_id, + cache_delete_error, + ) + raise Exception( + f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" + ) from cache_delete_error return DeletionResult( status="success", @@ -3678,22 +3475,16 @@ class LightRAG: ) async def aedit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. - Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True - allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3707,21 +3498,14 @@ class LightRAG: entity_name, updated_data, allow_rename, - allow_merge, - self.entity_chunks, - self.relation_chunks, ) def edit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) + self.aedit_entity(entity_name, updated_data, allow_rename) ) async def aedit_relation( @@ -3730,7 +3514,6 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. - Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3749,7 +3532,6 @@ class LightRAG: source_entity, target_entity, updated_data, - self.relation_chunks, ) def edit_relation( @@ -3861,8 +3643,6 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, - self.entity_chunks, - self.relation_chunks, ) def merge_entities( diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 97663954..2067f6b5 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -605,9 +605,13 @@ export const clearCache = async (): Promise<{ return response.data } -export const deleteDocuments = async (docIds: string[], deleteFile: boolean = false): Promise => { +export const deleteDocuments = async ( + docIds: string[], + deleteFile: boolean = false, + deleteLLMCache: boolean = false +): Promise => { const response = await axiosInstance.delete('/documents/delete_document', { - data: { doc_ids: docIds, delete_file: deleteFile } + data: { doc_ids: docIds, delete_file: deleteFile, delete_llm_cache: deleteLLMCache } }) return response.data } diff --git a/lightrag_webui/src/components/documents/DeleteDocumentsDialog.tsx b/lightrag_webui/src/components/documents/DeleteDocumentsDialog.tsx index 65305ccb..eb5b173f 100644 --- a/lightrag_webui/src/components/documents/DeleteDocumentsDialog.tsx +++ b/lightrag_webui/src/components/documents/DeleteDocumentsDialog.tsx @@ -44,6 +44,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet const [confirmText, setConfirmText] = useState('') const [deleteFile, setDeleteFile] = useState(false) const [isDeleting, setIsDeleting] = useState(false) + const [deleteLLMCache, setDeleteLLMCache] = useState(false) const isConfirmEnabled = confirmText.toLowerCase() === 'yes' && !isDeleting // Reset state when dialog closes @@ -51,6 +52,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet if (!open) { setConfirmText('') setDeleteFile(false) + setDeleteLLMCache(false) setIsDeleting(false) } }, [open]) @@ -60,7 +62,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet setIsDeleting(true) try { - const result = await deleteDocuments(selectedDocIds, deleteFile) + const result = await deleteDocuments(selectedDocIds, deleteFile, deleteLLMCache) if (result.status === 'deletion_started') { toast.success(t('documentPanel.deleteDocuments.success', { count: selectedDocIds.length })) @@ -94,7 +96,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet } finally { setIsDeleting(false) } - }, [isConfirmEnabled, selectedDocIds, deleteFile, setOpen, t, onDocumentsDeleted]) + }, [isConfirmEnabled, selectedDocIds, deleteFile, deleteLLMCache, setOpen, t, onDocumentsDeleted]) return ( @@ -155,6 +157,20 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet {t('documentPanel.deleteDocuments.deleteFileOption')} + +
+ setDeleteLLMCache(e.target.checked)} + disabled={isDeleting} + className="h-4 w-4 text-red-600 focus:ring-red-500 border-gray-300 rounded" + /> + +