diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index fa734481..1726e197 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,14 +3,15 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio +from functools import lru_cache from lightrag.utils import logger, get_pinyin_sort_key import aiofiles import shutil import traceback -import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal +from io import BytesIO from fastapi import ( APIRouter, BackgroundTasks, @@ -28,6 +29,24 @@ from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args +@lru_cache(maxsize=1) +def _is_docling_available() -> bool: + """Check if docling is available (cached check). + + This function uses lru_cache to avoid repeated import attempts. + The result is cached after the first call. + + Returns: + bool: True if docling is available, False otherwise + """ + try: + import docling # noqa: F401 # type: ignore[import-not-found] + + return True + except ImportError: + return False + + # Function to format datetime to ISO format string with timezone information def format_datetime(dt: Any) -> Optional[str]: """Format datetime to ISO format string with timezone information @@ -161,6 +180,28 @@ class ReprocessResponse(BaseModel): } +class CancelPipelineResponse(BaseModel): + """Response model for pipeline cancellation operation + + Attributes: + status: Status of the cancellation request + message: Message describing the operation result + """ + + status: Literal["cancellation_requested", "not_busy"] = Field( + description="Status of the cancellation request" + ) + message: str = Field(description="Human-readable message describing the operation") + + class Config: + json_schema_extra = { + "example": { + "status": "cancellation_requested", + "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.", + } + } + + class InsertTextRequest(BaseModel): """Request model for inserting a single text document @@ -458,7 +499,7 @@ class DocsStatusesResponse(BaseModel): "id": "doc_789", "content_summary": "Document pending final indexing", "content_length": 7200, - "status": "multimodal_processed", + "status": "preprocessed", "created_at": "2025-03-31T09:30:00", "updated_at": "2025-03-31T09:35:00", "track_id": "upload_20250331_093000_xyz789", @@ -857,7 +898,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str Returns: str: Unique filename (may have numeric suffix added) """ - from pathlib import Path import time original_path = Path(original_name) @@ -880,6 +920,161 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str return f"{base_name}_{timestamp}{extension}" +# Document processing helper functions (synchronous) +# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop + + +def _convert_with_docling(file_path: Path) -> str: + """Convert document using docling (synchronous). + + Args: + file_path: Path to the document file + + Returns: + str: Extracted markdown content + """ + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + return result.document.export_to_markdown() + + +def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str: + """Extract PDF content using pypdf (synchronous). + + Args: + file_bytes: PDF file content as bytes + password: Optional password for encrypted PDFs + + Returns: + str: Extracted text content + + Raises: + Exception: If PDF is encrypted and password is incorrect or missing + """ + from pypdf import PdfReader # type: ignore + + pdf_file = BytesIO(file_bytes) + reader = PdfReader(pdf_file) + + # Check if PDF is encrypted + if reader.is_encrypted: + if not password: + raise Exception("PDF is encrypted but no password provided") + + decrypt_result = reader.decrypt(password) + if decrypt_result == 0: + raise Exception("Incorrect PDF password") + + # Extract text from all pages + content = "" + for page in reader.pages: + content += page.extract_text() + "\n" + + return content + + +def _extract_docx(file_bytes: bytes) -> str: + """Extract DOCX content including tables in document order (synchronous). + + Args: + file_bytes: DOCX file content as bytes + + Returns: + str: Extracted text content with tables in their original positions. + Tables are separated from paragraphs with blank lines for clarity. + """ + from docx import Document # type: ignore + from docx.table import Table # type: ignore + from docx.text.paragraph import Paragraph # type: ignore + + docx_file = BytesIO(file_bytes) + doc = Document(docx_file) + + content_parts = [] + in_table = False # Track if we're currently processing a table + + # Iterate through all body elements in document order + for element in doc.element.body: + # Check if element is a paragraph + if element.tag.endswith("p"): + # If coming out of a table, add blank line after table + if in_table: + content_parts.append("") # Blank line after table + in_table = False + + paragraph = Paragraph(element, doc) + text = paragraph.text.strip() + # Always append to preserve document spacing (including blank paragraphs) + content_parts.append(text) + + # Check if element is a table + elif element.tag.endswith("tbl"): + # Add blank line before table (if content exists) + if content_parts and not in_table: + content_parts.append("") # Blank line before table + + in_table = True + table = Table(element, doc) + for row in table.rows: + row_text = [] + for cell in row.cells: + cell_text = cell.text.strip() + # Always append cell text to preserve column structure + row_text.append(cell_text) + # Only add row if at least one cell has content + if any(cell for cell in row_text): + content_parts.append("\t".join(row_text)) + + return "\n".join(content_parts) + + +def _extract_pptx(file_bytes: bytes) -> str: + """Extract PPTX content (synchronous). + + Args: + file_bytes: PPTX file content as bytes + + Returns: + str: Extracted text content + """ + from pptx import Presentation # type: ignore + + pptx_file = BytesIO(file_bytes) + prs = Presentation(pptx_file) + content = "" + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + return content + + +def _extract_xlsx(file_bytes: bytes) -> str: + """Extract XLSX content (synchronous). + + Args: + file_bytes: XLSX file content as bytes + + Returns: + str: Extracted text content + """ + from openpyxl import load_workbook # type: ignore + + xlsx_file = BytesIO(file_bytes) + wb = load_workbook(xlsx_file) + content = "" + for sheet in wb: + content += f"Sheet: {sheet.title}\n" + for row in sheet.iter_rows(values_only=True): + content += ( + "\t".join(str(cell) if cell is not None else "" for cell in row) + "\n" + ) + content += "\n" + return content + + async def pipeline_enqueue_file( rag: LightRAG, file_path: Path, track_id: str = None ) -> tuple[bool, str]: @@ -1050,87 +1245,28 @@ async def pipeline_enqueue_file( case ".pdf": try: - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() + # Try DOCLING first if configured and available + if ( + global_args.document_loading_engine == "DOCLING" + and _is_docling_available() + ): + content = await asyncio.to_thread( + _convert_with_docling, file_path + ) else: - if not pm.is_installed("pypdf"): # type: ignore - pm.install("pypdf") - if not pm.is_installed("pycryptodome"): # type: ignore - pm.install("pycryptodome") - from pypdf import PdfReader # type: ignore - from io import BytesIO - - pdf_file = BytesIO(file) - reader = PdfReader(pdf_file) - - # Check if PDF is encrypted - if reader.is_encrypted: - pdf_password = global_args.pdf_decrypt_password - if not pdf_password: - # PDF is encrypted but no password provided - error_files = [ - { - "file_path": str(file_path.name), - "error_description": "[File Extraction]PDF is encrypted but no password provided", - "original_error": "Please set PDF_DECRYPT_PASSWORD environment variable to decrypt this PDF file", - "file_size": file_size, - } - ] - await rag.apipeline_enqueue_error_documents( - error_files, track_id - ) - logger.error( - f"[File Extraction]PDF is encrypted but no password provided: {file_path.name}" - ) - return False, track_id - - # Try to decrypt with password - try: - decrypt_result = reader.decrypt(pdf_password) - if decrypt_result == 0: - # Password is incorrect - error_files = [ - { - "file_path": str(file_path.name), - "error_description": "[File Extraction]Failed to decrypt PDF - incorrect password", - "original_error": "The provided PDF_DECRYPT_PASSWORD is incorrect for this file", - "file_size": file_size, - } - ] - await rag.apipeline_enqueue_error_documents( - error_files, track_id - ) - logger.error( - f"[File Extraction]Incorrect PDF password: {file_path.name}" - ) - return False, track_id - except Exception as decrypt_error: - # Decryption process error - error_files = [ - { - "file_path": str(file_path.name), - "error_description": "[File Extraction]PDF decryption failed", - "original_error": f"Error during PDF decryption: {str(decrypt_error)}", - "file_size": file_size, - } - ] - await rag.apipeline_enqueue_error_documents( - error_files, track_id - ) - logger.error( - f"[File Extraction]PDF decryption error for {file_path.name}: {str(decrypt_error)}" - ) - return False, track_id - - # Extract text from PDF (encrypted PDFs are now decrypted, unencrypted PDFs proceed directly) - for page in reader.pages: - content += page.extract_text() + "\n" + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf." + ) + # Use pypdf (non-blocking via to_thread) + content = await asyncio.to_thread( + _extract_pdf_pypdf, + file, + global_args.pdf_decrypt_password, + ) except Exception as e: error_files = [ { @@ -1150,28 +1286,24 @@ async def pipeline_enqueue_file( case ".docx": try: - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() - else: - if not pm.is_installed("python-docx"): # type: ignore - try: - pm.install("python-docx") - except Exception: - pm.install("docx") - from docx import Document # type: ignore - from io import BytesIO - - docx_file = BytesIO(file) - doc = Document(docx_file) - content = "\n".join( - [paragraph.text for paragraph in doc.paragraphs] + # Try DOCLING first if configured and available + if ( + global_args.document_loading_engine == "DOCLING" + and _is_docling_available() + ): + content = await asyncio.to_thread( + _convert_with_docling, file_path ) + else: + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx." + ) + # Use python-docx (non-blocking via to_thread) + content = await asyncio.to_thread(_extract_docx, file) except Exception as e: error_files = [ { @@ -1191,26 +1323,24 @@ async def pipeline_enqueue_file( case ".pptx": try: - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() + # Try DOCLING first if configured and available + if ( + global_args.document_loading_engine == "DOCLING" + and _is_docling_available() + ): + content = await asyncio.to_thread( + _convert_with_docling, file_path + ) else: - if not pm.is_installed("python-pptx"): # type: ignore - pm.install("pptx") - from pptx import Presentation # type: ignore - from io import BytesIO - - pptx_file = BytesIO(file) - prs = Presentation(pptx_file) - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text"): - content += shape.text + "\n" + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx." + ) + # Use python-pptx (non-blocking via to_thread) + content = await asyncio.to_thread(_extract_pptx, file) except Exception as e: error_files = [ { @@ -1230,33 +1360,24 @@ async def pipeline_enqueue_file( case ".xlsx": try: - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() + # Try DOCLING first if configured and available + if ( + global_args.document_loading_engine == "DOCLING" + and _is_docling_available() + ): + content = await asyncio.to_thread( + _convert_with_docling, file_path + ) else: - if not pm.is_installed("openpyxl"): # type: ignore - pm.install("openpyxl") - from openpyxl import load_workbook # type: ignore - from io import BytesIO - - xlsx_file = BytesIO(file) - wb = load_workbook(xlsx_file) - for sheet in wb: - content += f"Sheet: {sheet.title}\n" - for row in sheet.iter_rows(values_only=True): - content += ( - "\t".join( - str(cell) if cell is not None else "" - for cell in row - ) - + "\n" - ) - content += "\n" + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl." + ) + # Use openpyxl (non-blocking via to_thread) + content = await asyncio.to_thread(_extract_xlsx, file) except Exception as e: error_files = [ { @@ -1559,11 +1680,15 @@ async def background_delete_documents( """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_namespace_lock, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) total_docs = len(doc_ids) successful_deletions = [] @@ -1579,6 +1704,7 @@ async def background_delete_documents( pipeline_status.update( { "busy": True, + # Job name can not be changed, it's verified in adelete_by_doc_id() "job_name": f"Deleting {total_docs} Documents", "job_start": datetime.now().isoformat(), "docs": total_docs, @@ -1597,7 +1723,19 @@ async def background_delete_documents( try: # Loop through each document ID and delete them one by one for i, doc_id in enumerate(doc_ids, 1): + # Check for cancellation at the start of each document deletion async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining." + logger.info(cancel_msg) + pipeline_status["latest_message"] = cancel_msg + pipeline_status["history_messages"].append(cancel_msg) + # Add remaining documents to failed list with cancellation reason + failed_deletions.extend( + doc_ids[i - 1 :] + ) # i-1 because enumerate starts at 1 + break # Exit the loop, remaining documents unchanged + start_msg = f"Deleting document {i}/{total_docs}: {doc_id}" logger.info(start_msg) pipeline_status["cur_batch"] = i @@ -1760,6 +1898,10 @@ async def background_delete_documents( # Final summary and check for pending requests async with pipeline_status_lock: pipeline_status["busy"] = False + pipeline_status["pending_requests"] = False # Reset pending requests flag + pipeline_status["cancellation_requested"] = ( + False # Always reset cancellation flag + ) completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" pipeline_status["latest_message"] = completion_msg pipeline_status["history_messages"].append(completion_msg) @@ -2036,12 +2178,16 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_namespace_lock, ) # Get pipeline status and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Check and set status with lock async with pipeline_status_lock: @@ -2232,13 +2378,19 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, + get_namespace_lock, get_all_update_flags_status, ) - pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Get update flags status for all namespaces - update_status = await get_all_update_flags_status() + update_status = await get_all_update_flags_status(workspace=rag.workspace) # Convert MutableBoolean objects to regular boolean values processed_update_status = {} @@ -2252,8 +2404,9 @@ def create_document_routes( processed_flags.append(bool(flag)) processed_update_status[namespace] = processed_flags - # Convert to regular dict if it's a Manager.dict - status_dict = dict(pipeline_status) + async with pipeline_status_lock: + # Convert to regular dict if it's a Manager.dict + status_dict = dict(pipeline_status) # Add processed update_status to the status dictionary status_dict["update_status"] = processed_update_status @@ -2293,7 +2446,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - # TODO: Deprecated + # TODO: Deprecated, use /documents/paginated instead @router.get( "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)] ) @@ -2440,17 +2593,26 @@ def create_document_routes( doc_ids = delete_request.doc_ids try: - from lightrag.kg.shared_storage import get_namespace_data + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) - pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) - # Check if pipeline is busy - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Check if pipeline is busy with proper lock + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2817,4 +2979,67 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.post( + "/cancel_pipeline", + response_model=CancelPipelineResponse, + dependencies=[Depends(combined_auth)], + ) + async def cancel_pipeline(): + """ + Request cancellation of the currently running pipeline. + + This endpoint sets a cancellation flag in the pipeline status. The pipeline will: + 1. Check this flag at key processing points + 2. Stop processing new documents + 3. Cancel all running document processing tasks + 4. Mark all PROCESSING documents as FAILED with reason "User cancelled" + + The cancellation is graceful and ensures data consistency. Documents that have + completed processing will remain in PROCESSED status. + + Returns: + CancelPipelineResponse: Response with status and message + - status="cancellation_requested": Cancellation flag has been set + - status="not_busy": Pipeline is not currently running + + Raises: + HTTPException: If an error occurs while setting cancellation flag (500). + """ + try: + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + return CancelPipelineResponse( + status="not_busy", + message="Pipeline is not currently running. No cancellation needed.", + ) + + # Set cancellation flag + pipeline_status["cancellation_requested"] = True + cancel_msg = "Pipeline cancellation requested by user" + logger.info(cancel_msg) + pipeline_status["latest_message"] = cancel_msg + pipeline_status["history_messages"].append(cancel_msg) + + return CancelPipelineResponse( + status="cancellation_requested", + message="Pipeline cancellation has been requested. Documents will be marked as FAILED.", + ) + + except Exception as e: + logger.error(f"Error requesting pipeline cancellation: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + return router