diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 14e03f5f..7e44b57d 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,15 +3,14 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio -from functools import lru_cache from lightrag.utils import logger, get_pinyin_sort_key import aiofiles import shutil import traceback +import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal -from io import BytesIO from fastapi import ( APIRouter, BackgroundTasks, @@ -29,24 +28,6 @@ from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args -@lru_cache(maxsize=1) -def _is_docling_available() -> bool: - """Check if docling is available (cached check). - - This function uses lru_cache to avoid repeated import attempts. - The result is cached after the first call. - - Returns: - bool: True if docling is available, False otherwise - """ - try: - import docling # noqa: F401 # type: ignore[import-not-found] - - return True - except ImportError: - return False - - # Function to format datetime to ISO format string with timezone information def format_datetime(dt: Any) -> Optional[str]: """Format datetime to ISO format string with timezone information @@ -180,28 +161,6 @@ class ReprocessResponse(BaseModel): } -class CancelPipelineResponse(BaseModel): - """Response model for pipeline cancellation operation - - Attributes: - status: Status of the cancellation request - message: Message describing the operation result - """ - - status: Literal["cancellation_requested", "not_busy"] = Field( - description="Status of the cancellation request" - ) - message: str = Field(description="Human-readable message describing the operation") - - class Config: - json_schema_extra = { - "example": { - "status": "cancellation_requested", - "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.", - } - } - - class InsertTextRequest(BaseModel): """Request model for inserting a single text document @@ -377,10 +336,6 @@ class DeleteDocRequest(BaseModel): default=False, description="Whether to delete the corresponding file in the upload directory.", ) - delete_llm_cache: bool = Field( - default=False, - description="Whether to delete cached LLM extraction results for the documents.", - ) @field_validator("doc_ids", mode="after") @classmethod @@ -451,7 +406,7 @@ class DocStatusResponse(BaseModel): "id": "doc_123456", "content_summary": "Research paper on machine learning", "content_length": 15240, - "status": "processed", + "status": "PROCESSED", "created_at": "2025-03-31T12:34:56", "updated_at": "2025-03-31T12:35:30", "track_id": "upload_20250729_170612_abc123", @@ -484,7 +439,7 @@ class DocsStatusesResponse(BaseModel): "id": "doc_123", "content_summary": "Pending document", "content_length": 5000, - "status": "pending", + "status": "PENDING", "created_at": "2025-03-31T10:00:00", "updated_at": "2025-03-31T10:00:00", "track_id": "upload_20250331_100000_abc123", @@ -494,27 +449,12 @@ class DocsStatusesResponse(BaseModel): "file_path": "pending_doc.pdf", } ], - "PREPROCESSED": [ - { - "id": "doc_789", - "content_summary": "Document pending final indexing", - "content_length": 7200, - "status": "preprocessed", - "created_at": "2025-03-31T09:30:00", - "updated_at": "2025-03-31T09:35:00", - "track_id": "upload_20250331_093000_xyz789", - "chunks_count": 10, - "error": None, - "metadata": None, - "file_path": "preprocessed_doc.pdf", - } - ], "PROCESSED": [ { "id": "doc_456", "content_summary": "Processed document", "content_length": 8000, - "status": "processed", + "status": "PROCESSED", "created_at": "2025-03-31T09:00:00", "updated_at": "2025-03-31T09:05:00", "track_id": "insert_20250331_090000_def456", @@ -686,7 +626,6 @@ class PaginatedDocsResponse(BaseModel): "status_counts": { "PENDING": 10, "PROCESSING": 5, - "PREPROCESSED": 5, "PROCESSED": 130, "FAILED": 5, }, @@ -709,7 +648,6 @@ class StatusCountsResponse(BaseModel): "status_counts": { "PENDING": 10, "PROCESSING": 5, - "PREPROCESSED": 5, "PROCESSED": 130, "FAILED": 5, } @@ -898,6 +836,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str Returns: str: Unique filename (may have numeric suffix added) """ + from pathlib import Path import time original_path = Path(original_name) @@ -920,258 +859,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str return f"{base_name}_{timestamp}{extension}" -# Document processing helper functions (synchronous) -# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop - - -def _convert_with_docling(file_path: Path) -> str: - """Convert document using docling (synchronous). - - Args: - file_path: Path to the document file - - Returns: - str: Extracted markdown content - """ - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - return result.document.export_to_markdown() - - -def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str: - """Extract PDF content using pypdf (synchronous). - - Args: - file_bytes: PDF file content as bytes - password: Optional password for encrypted PDFs - - Returns: - str: Extracted text content - - Raises: - Exception: If PDF is encrypted and password is incorrect or missing - """ - from pypdf import PdfReader # type: ignore - - pdf_file = BytesIO(file_bytes) - reader = PdfReader(pdf_file) - - # Check if PDF is encrypted - if reader.is_encrypted: - if not password: - raise Exception("PDF is encrypted but no password provided") - - decrypt_result = reader.decrypt(password) - if decrypt_result == 0: - raise Exception("Incorrect PDF password") - - # Extract text from all pages - content = "" - for page in reader.pages: - content += page.extract_text() + "\n" - - return content - - -def _extract_docx(file_bytes: bytes) -> str: - """Extract DOCX content including tables in document order (synchronous). - - Args: - file_bytes: DOCX file content as bytes - - Returns: - str: Extracted text content with tables in their original positions. - Tables are separated from paragraphs with blank lines for clarity. - """ - from docx import Document # type: ignore - from docx.table import Table # type: ignore - from docx.text.paragraph import Paragraph # type: ignore - - docx_file = BytesIO(file_bytes) - doc = Document(docx_file) - - content_parts = [] - in_table = False # Track if we're currently processing a table - - # Iterate through all body elements in document order - for element in doc.element.body: - # Check if element is a paragraph - if element.tag.endswith("p"): - # If coming out of a table, add blank line after table - if in_table: - content_parts.append("") # Blank line after table - in_table = False - - paragraph = Paragraph(element, doc) - text = paragraph.text - # Always append to preserve document spacing (including blank paragraphs) - content_parts.append(text) - - # Check if element is a table - elif element.tag.endswith("tbl"): - # Add blank line before table (if content exists) - if content_parts and not in_table: - content_parts.append("") # Blank line before table - - in_table = True - table = Table(element, doc) - for row in table.rows: - row_text = [] - for cell in row.cells: - cell_text = cell.text - # Always append cell text to preserve column structure - row_text.append(cell_text) - # Only add row if at least one cell has content - if any(cell for cell in row_text): - content_parts.append("\t".join(row_text)) - - return "\n".join(content_parts) - - -def _extract_pptx(file_bytes: bytes) -> str: - """Extract PPTX content (synchronous). - - Args: - file_bytes: PPTX file content as bytes - - Returns: - str: Extracted text content - """ - from pptx import Presentation # type: ignore - - pptx_file = BytesIO(file_bytes) - prs = Presentation(pptx_file) - content = "" - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text"): - content += shape.text + "\n" - return content - - -def _extract_xlsx(file_bytes: bytes) -> str: - """Extract XLSX content in tab-delimited format with clear sheet separation. - - This function processes Excel workbooks and converts them to a structured text format - suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with - separator lines, and special characters are escaped to preserve the tab-delimited structure. - - Features: - - Each sheet is wrapped with '====================' separators for visual distinction - - Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption - - Column alignment is preserved across all rows to maintain tabular structure - - Empty rows are preserved as blank lines to maintain row structure - - Two-pass processing: determines max column width, then extracts with consistent alignment - - Args: - file_bytes: XLSX file content as bytes - - Returns: - str: Extracted text content with all sheets in tab-delimited format. - Format: Sheet separators, sheet name, then tab-delimited rows. - - Example output: - ==================== Sheet: Data ==================== - Name\tAge\tCity - Alice\t30\tNew York - Bob\t25\tLondon - - ==================== Sheet: Summary ==================== - Total\t2 - ==================== - """ - from openpyxl import load_workbook # type: ignore - - xlsx_file = BytesIO(file_bytes) - wb = load_workbook(xlsx_file) - - def escape_cell(cell_value: str | int | float | None) -> str: - """Escape characters that would break tab-delimited layout. - - Escape order is critical: backslashes first, then tabs/newlines. - This prevents double-escaping issues. - - Args: - cell_value: The cell value to escape (can be None, str, int, or float) - - Returns: - str: Escaped cell value safe for tab-delimited format - """ - if cell_value is None: - return "" - text = str(cell_value) - # CRITICAL: Escape backslash first to avoid double-escaping - return ( - text.replace("\\", "\\\\") # Must be first: \ -> \\ - .replace("\t", "\\t") # Tab -> \t (visible) - .replace("\r\n", "\\n") # Windows newline -> \n - .replace("\r", "\\n") # Mac newline -> \n - .replace("\n", "\\n") # Unix newline -> \n - ) - - def escape_sheet_title(title: str) -> str: - """Escape sheet title to prevent formatting issues in separators. - - Args: - title: Original sheet title - - Returns: - str: Sanitized sheet title with tabs/newlines replaced - """ - return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ") - - content_parts: list[str] = [] - sheet_separator = "=" * 20 - - for idx, sheet in enumerate(wb): - if idx > 0: - content_parts.append("") # Blank line between sheets for readability - - # Escape sheet title to handle edge cases with special characters - safe_title = escape_sheet_title(sheet.title) - content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}") - - # Two-pass approach to preserve column alignment: - # Pass 1: Determine the maximum column width for this sheet - max_columns = 0 - all_rows = list(sheet.iter_rows(values_only=True)) - - for row in all_rows: - last_nonempty_idx = -1 - for idx, cell in enumerate(row): - # Check if cell has meaningful content (not None or empty string) - if cell is not None and str(cell).strip(): - last_nonempty_idx = idx - - if last_nonempty_idx >= 0: - max_columns = max(max_columns, last_nonempty_idx + 1) - - # Pass 2: Extract rows with consistent width to preserve column alignment - for row in all_rows: - row_parts = [] - - # Build row up to max_columns width - for idx in range(max_columns): - if idx < len(row): - row_parts.append(escape_cell(row[idx])) - else: - row_parts.append("") # Pad short rows - - # Check if row is completely empty - if all(part == "" for part in row_parts): - # Preserve empty rows as blank lines (maintains row structure) - content_parts.append("") - else: - # Join all columns to maintain consistent column count - content_parts.append("\t".join(row_parts)) - - # Final separator for symmetry (makes parsing easier) - content_parts.append(sheet_separator) - return "\n".join(content_parts) - - async def pipeline_enqueue_file( rag: LightRAG, file_path: Path, track_id: str = None ) -> tuple[bool, str]: @@ -1342,28 +1029,24 @@ async def pipeline_enqueue_file( case ".pdf": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf." - ) - # Use pypdf (non-blocking via to_thread) - content = await asyncio.to_thread( - _extract_pdf_pypdf, - file, - global_args.pdf_decrypt_password, - ) + if not pm.is_installed("pypdf2"): # type: ignore + pm.install("pypdf2") + from PyPDF2 import PdfReader # type: ignore + from io import BytesIO + + pdf_file = BytesIO(file) + reader = PdfReader(pdf_file) + for page in reader.pages: + content += page.extract_text() + "\n" except Exception as e: error_files = [ { @@ -1383,24 +1066,28 @@ async def pipeline_enqueue_file( case ".docx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx." - ) - # Use python-docx (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_docx, file) + if not pm.is_installed("python-docx"): # type: ignore + try: + pm.install("python-docx") + except Exception: + pm.install("docx") + from docx import Document # type: ignore + from io import BytesIO + + docx_file = BytesIO(file) + doc = Document(docx_file) + content = "\n".join( + [paragraph.text for paragraph in doc.paragraphs] + ) except Exception as e: error_files = [ { @@ -1420,24 +1107,26 @@ async def pipeline_enqueue_file( case ".pptx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx." - ) - # Use python-pptx (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_pptx, file) + if not pm.is_installed("python-pptx"): # type: ignore + pm.install("pptx") + from pptx import Presentation # type: ignore + from io import BytesIO + + pptx_file = BytesIO(file) + prs = Presentation(pptx_file) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" except Exception as e: error_files = [ { @@ -1457,24 +1146,33 @@ async def pipeline_enqueue_file( case ".xlsx": try: - # Try DOCLING first if configured and available - if ( - global_args.document_loading_engine == "DOCLING" - and _is_docling_available() - ): - content = await asyncio.to_thread( - _convert_with_docling, file_path - ) + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() else: - if ( - global_args.document_loading_engine == "DOCLING" - and not _is_docling_available() - ): - logger.warning( - f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl." - ) - # Use openpyxl (non-blocking via to_thread) - content = await asyncio.to_thread(_extract_xlsx, file) + if not pm.is_installed("openpyxl"): # type: ignore + pm.install("openpyxl") + from openpyxl import load_workbook # type: ignore + from io import BytesIO + + xlsx_file = BytesIO(file) + wb = load_workbook(xlsx_file) + for sheet in wb: + content += f"Sheet: {sheet.title}\n" + for row in sheet.iter_rows(values_only=True): + content += ( + "\t".join( + str(cell) if cell is not None else "" + for cell in row + ) + + "\n" + ) + content += "\n" except Exception as e: error_files = [ { @@ -1772,20 +1470,15 @@ async def background_delete_documents( doc_manager: DocumentManager, doc_ids: List[str], delete_file: bool = False, - delete_llm_cache: bool = False, ): """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, + get_pipeline_status_lock, ) - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=rag.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=rag.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() total_docs = len(doc_ids) successful_deletions = [] @@ -1801,7 +1494,6 @@ async def background_delete_documents( pipeline_status.update( { "busy": True, - # Job name can not be changed, it's verified in adelete_by_doc_id() "job_name": f"Deleting {total_docs} Documents", "job_start": datetime.now().isoformat(), "docs": total_docs, @@ -1812,27 +1504,11 @@ async def background_delete_documents( ) # Use slice assignment to clear the list in place pipeline_status["history_messages"][:] = ["Starting document deletion process"] - if delete_llm_cache: - pipeline_status["history_messages"].append( - "LLM cache cleanup requested for this deletion job" - ) try: # Loop through each document ID and delete them one by one for i, doc_id in enumerate(doc_ids, 1): - # Check for cancellation at the start of each document deletion async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining." - logger.info(cancel_msg) - pipeline_status["latest_message"] = cancel_msg - pipeline_status["history_messages"].append(cancel_msg) - # Add remaining documents to failed list with cancellation reason - failed_deletions.extend( - doc_ids[i - 1 :] - ) # i-1 because enumerate starts at 1 - break # Exit the loop, remaining documents unchanged - start_msg = f"Deleting document {i}/{total_docs}: {doc_id}" logger.info(start_msg) pipeline_status["cur_batch"] = i @@ -1841,9 +1517,7 @@ async def background_delete_documents( file_path = "#" try: - result = await rag.adelete_by_doc_id( - doc_id, delete_llm_cache=delete_llm_cache - ) + result = await rag.adelete_by_doc_id(doc_id) file_path = ( getattr(result, "file_path", "-") if "result" in locals() else "-" ) @@ -1995,10 +1669,6 @@ async def background_delete_documents( # Final summary and check for pending requests async with pipeline_status_lock: pipeline_status["busy"] = False - pipeline_status["pending_requests"] = False # Reset pending requests flag - pipeline_status["cancellation_requested"] = ( - False # Always reset cancellation flag - ) completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" pipeline_status["latest_message"] = completion_msg pipeline_status["history_messages"].append(completion_msg) @@ -2275,16 +1945,12 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, + get_pipeline_status_lock, ) # Get pipeline status and lock - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=rag.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=rag.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() # Check and set status with lock async with pipeline_status_lock: @@ -2320,8 +1986,6 @@ def create_document_routes( rag.full_docs, rag.full_entities, rag.full_relations, - rag.entity_chunks, - rag.relation_chunks, rag.entities_vdb, rag.relationships_vdb, rag.chunks_vdb, @@ -2475,19 +2139,13 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_namespace_lock, get_all_update_flags_status, ) - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=rag.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=rag.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") # Get update flags status for all namespaces - update_status = await get_all_update_flags_status(workspace=rag.workspace) + update_status = await get_all_update_flags_status() # Convert MutableBoolean objects to regular boolean values processed_update_status = {} @@ -2501,9 +2159,8 @@ def create_document_routes( processed_flags.append(bool(flag)) processed_update_status[namespace] = processed_flags - async with pipeline_status_lock: - # Convert to regular dict if it's a Manager.dict - status_dict = dict(pipeline_status) + # Convert to regular dict if it's a Manager.dict + status_dict = dict(pipeline_status) # Add processed update_status to the status dictionary status_dict["update_status"] = processed_update_status @@ -2543,7 +2200,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - # TODO: Deprecated, use /documents/paginated instead + # TODO: Deprecated @router.get( "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)] ) @@ -2553,7 +2210,7 @@ def create_document_routes( To prevent excessive resource consumption, a maximum of 1,000 records is returned. This endpoint retrieves the current status of all documents, grouped by their - processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are + processing status (PENDING, PROCESSING, PROCESSED, FAILED). The results are limited to 1000 total documents with fair distribution across all statuses. Returns: @@ -2569,7 +2226,6 @@ def create_document_routes( statuses = ( DocStatus.PENDING, DocStatus.PROCESSING, - DocStatus.PREPROCESSED, DocStatus.PROCESSED, DocStatus.FAILED, ) @@ -2668,20 +2324,21 @@ def create_document_routes( Delete documents and all their associated data by their IDs using background processing. Deletes specific documents and all their associated data, including their status, - text chunks, vector embeddings, and any related graph data. When requested, - cached LLM extraction responses are removed after graph deletion/rebuild completes. + text chunks, vector embeddings, and any related graph data. The deletion process runs in the background to avoid blocking the client connection. + It is disabled when llm cache for entity extraction is disabled. This operation is irreversible and will interact with the pipeline status. Args: - delete_request (DeleteDocRequest): The request containing the document IDs and deletion options. + delete_request (DeleteDocRequest): The request containing the document IDs and delete_file options. background_tasks: FastAPI BackgroundTasks for async processing Returns: DeleteDocByIdResponse: The result of the deletion operation. - status="deletion_started": The document deletion has been initiated in the background. - status="busy": The pipeline is busy with another operation. + - status="not_allowed": Operation not allowed when LLM cache for entity extraction is disabled. Raises: HTTPException: @@ -2689,27 +2346,27 @@ def create_document_routes( """ doc_ids = delete_request.doc_ids + # The rag object is initialized from the server startup args, + # so we can access its properties here. + if not rag.enable_llm_cache_for_entity_extract: + return DeleteDocByIdResponse( + status="not_allowed", + message="Operation not allowed when LLM cache for entity extraction is disabled.", + doc_id=", ".join(delete_request.doc_ids), + ) + try: - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_namespace_lock, - ) + from lightrag.kg.shared_storage import get_namespace_data - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=rag.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=rag.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") - # Check if pipeline is busy with proper lock - async with pipeline_status_lock: - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Check if pipeline is busy + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2718,7 +2375,6 @@ def create_document_routes( doc_manager, doc_ids, delete_request.delete_file, - delete_request.delete_llm_cache, ) return DeleteDocByIdResponse( @@ -3076,67 +2732,4 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @router.post( - "/cancel_pipeline", - response_model=CancelPipelineResponse, - dependencies=[Depends(combined_auth)], - ) - async def cancel_pipeline(): - """ - Request cancellation of the currently running pipeline. - - This endpoint sets a cancellation flag in the pipeline status. The pipeline will: - 1. Check this flag at key processing points - 2. Stop processing new documents - 3. Cancel all running document processing tasks - 4. Mark all PROCESSING documents as FAILED with reason "User cancelled" - - The cancellation is graceful and ensures data consistency. Documents that have - completed processing will remain in PROCESSED status. - - Returns: - CancelPipelineResponse: Response with status and message - - status="cancellation_requested": Cancellation flag has been set - - status="not_busy": Pipeline is not currently running - - Raises: - HTTPException: If an error occurs while setting cancellation flag (500). - """ - try: - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_namespace_lock, - ) - - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=rag.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=rag.workspace - ) - - async with pipeline_status_lock: - if not pipeline_status.get("busy", False): - return CancelPipelineResponse( - status="not_busy", - message="Pipeline is not currently running. No cancellation needed.", - ) - - # Set cancellation flag - pipeline_status["cancellation_requested"] = True - cancel_msg = "Pipeline cancellation requested by user" - logger.info(cancel_msg) - pipeline_status["latest_message"] = cancel_msg - pipeline_status["history_messages"].append(cancel_msg) - - return CancelPipelineResponse( - status="cancellation_requested", - message="Pipeline cancellation has been requested. Documents will be marked as FAILED.", - ) - - except Exception as e: - logger.error(f"Error requesting pipeline cancellation: {str(e)}") - logger.error(traceback.format_exc()) - raise HTTPException(status_code=500, detail=str(e)) - return router diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 2067f6b5..514cc417 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -155,6 +155,12 @@ export type ScanResponse = { track_id: string } +export type ReprocessFailedResponse = { + status: 'reprocessing_started' + message: string + track_id: string +} + export type DeleteDocResponse = { status: 'deletion_started' | 'busy' | 'not_allowed' message: string @@ -305,6 +311,11 @@ export const scanNewDocuments = async (): Promise => { return response.data } +export const reprocessFailedDocuments = async (): Promise => { + const response = await axiosInstance.post('/documents/reprocess_failed') + return response.data +} + export const getDocumentsScanProgress = async (): Promise => { const response = await axiosInstance.get('/documents/scan-progress') return response.data diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index db52db47..9d38a24e 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -22,6 +22,7 @@ import PaginationControls from '@/components/ui/PaginationControls' import { scanNewDocuments, + reprocessFailedDocuments, getDocumentsPaginated, DocsStatusesResponse, DocStatus, @@ -913,6 +914,42 @@ export default function DocumentManager() { } }, [t, startPollingInterval, currentTab, health, statusCounts]) + const retryFailedDocuments = useCallback(async () => { + try { + // Check if component is still mounted before starting the request + if (!isMountedRef.current) return; + + const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars + + // Check again if component is still mounted after the request completes + if (!isMountedRef.current) return; + + // Note: _track_id is available for future use (e.g., progress tracking) + toast.message(message || status); + + // Reset health check timer with 1 second delay to avoid race condition + useBackendState.getState().resetHealthCheckTimerDelayed(1000); + + // Start fast refresh with 2-second interval immediately after retry + startPollingInterval(2000); + + // Set recovery timer to restore normal polling interval after 15 seconds + setTimeout(() => { + if (isMountedRef.current && currentTab === 'documents' && health) { + // Restore intelligent polling interval based on document status + const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0; + const normalInterval = hasActiveDocuments ? 5000 : 30000; + startPollingInterval(normalInterval); + } + }, 15000); // Restore after 15 seconds + } catch (err) { + // Only show error if component is still mounted + if (isMountedRef.current) { + toast.error(errorMessage(err)); + } + } + }, [startPollingInterval, currentTab, health, statusCounts]) + // Handle page size change - update state and save to store const handlePageSizeChange = useCallback((newPageSize: number) => { if (newPageSize === pagination.page_size) return; @@ -1289,6 +1326,16 @@ export default function DocumentManager() { > {t('documentPanel.documentManager.scanButton')} +