This commit is contained in:
Raphaël MANSUY 2025-12-04 19:15:04 +08:00
parent aff704e58a
commit d3d59b0dca

View file

@ -3,14 +3,15 @@ This module contains all document-related routes for the LightRAG API.
"""
import asyncio
from functools import lru_cache
from lightrag.utils import logger, get_pinyin_sort_key
import aiofiles
import shutil
import traceback
import pipmaster as pm
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from io import BytesIO
from fastapi import (
APIRouter,
BackgroundTasks,
@ -28,6 +29,24 @@ from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@lru_cache(maxsize=1)
def _is_docling_available() -> bool:
"""Check if docling is available (cached check).
This function uses lru_cache to avoid repeated import attempts.
The result is cached after the first call.
Returns:
bool: True if docling is available, False otherwise
"""
try:
import docling # noqa: F401 # type: ignore[import-not-found]
return True
except ImportError:
return False
# Function to format datetime to ISO format string with timezone information
def format_datetime(dt: Any) -> Optional[str]:
"""Format datetime to ISO format string with timezone information
@ -161,6 +180,28 @@ class ReprocessResponse(BaseModel):
}
class CancelPipelineResponse(BaseModel):
"""Response model for pipeline cancellation operation
Attributes:
status: Status of the cancellation request
message: Message describing the operation result
"""
status: Literal["cancellation_requested", "not_busy"] = Field(
description="Status of the cancellation request"
)
message: str = Field(description="Human-readable message describing the operation")
class Config:
json_schema_extra = {
"example": {
"status": "cancellation_requested",
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
}
}
class InsertTextRequest(BaseModel):
"""Request model for inserting a single text document
@ -458,7 +499,7 @@ class DocsStatusesResponse(BaseModel):
"id": "doc_789",
"content_summary": "Document pending final indexing",
"content_length": 7200,
"status": "multimodal_processed",
"status": "preprocessed",
"created_at": "2025-03-31T09:30:00",
"updated_at": "2025-03-31T09:35:00",
"track_id": "upload_20250331_093000_xyz789",
@ -857,7 +898,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
Returns:
str: Unique filename (may have numeric suffix added)
"""
from pathlib import Path
import time
original_path = Path(original_name)
@ -880,6 +920,161 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
return f"{base_name}_{timestamp}{extension}"
# Document processing helper functions (synchronous)
# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop
def _convert_with_docling(file_path: Path) -> str:
"""Convert document using docling (synchronous).
Args:
file_path: Path to the document file
Returns:
str: Extracted markdown content
"""
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()
def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
"""Extract PDF content using pypdf (synchronous).
Args:
file_bytes: PDF file content as bytes
password: Optional password for encrypted PDFs
Returns:
str: Extracted text content
Raises:
Exception: If PDF is encrypted and password is incorrect or missing
"""
from pypdf import PdfReader # type: ignore
pdf_file = BytesIO(file_bytes)
reader = PdfReader(pdf_file)
# Check if PDF is encrypted
if reader.is_encrypted:
if not password:
raise Exception("PDF is encrypted but no password provided")
decrypt_result = reader.decrypt(password)
if decrypt_result == 0:
raise Exception("Incorrect PDF password")
# Extract text from all pages
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
return content
def _extract_docx(file_bytes: bytes) -> str:
"""Extract DOCX content including tables in document order (synchronous).
Args:
file_bytes: DOCX file content as bytes
Returns:
str: Extracted text content with tables in their original positions.
Tables are separated from paragraphs with blank lines for clarity.
"""
from docx import Document # type: ignore
from docx.table import Table # type: ignore
from docx.text.paragraph import Paragraph # type: ignore
docx_file = BytesIO(file_bytes)
doc = Document(docx_file)
content_parts = []
in_table = False # Track if we're currently processing a table
# Iterate through all body elements in document order
for element in doc.element.body:
# Check if element is a paragraph
if element.tag.endswith("p"):
# If coming out of a table, add blank line after table
if in_table:
content_parts.append("") # Blank line after table
in_table = False
paragraph = Paragraph(element, doc)
text = paragraph.text.strip()
# Always append to preserve document spacing (including blank paragraphs)
content_parts.append(text)
# Check if element is a table
elif element.tag.endswith("tbl"):
# Add blank line before table (if content exists)
if content_parts and not in_table:
content_parts.append("") # Blank line before table
in_table = True
table = Table(element, doc)
for row in table.rows:
row_text = []
for cell in row.cells:
cell_text = cell.text.strip()
# Always append cell text to preserve column structure
row_text.append(cell_text)
# Only add row if at least one cell has content
if any(cell for cell in row_text):
content_parts.append("\t".join(row_text))
return "\n".join(content_parts)
def _extract_pptx(file_bytes: bytes) -> str:
"""Extract PPTX content (synchronous).
Args:
file_bytes: PPTX file content as bytes
Returns:
str: Extracted text content
"""
from pptx import Presentation # type: ignore
pptx_file = BytesIO(file_bytes)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
return content
def _extract_xlsx(file_bytes: bytes) -> str:
"""Extract XLSX content (synchronous).
Args:
file_bytes: XLSX file content as bytes
Returns:
str: Extracted text content
"""
from openpyxl import load_workbook # type: ignore
xlsx_file = BytesIO(file_bytes)
wb = load_workbook(xlsx_file)
content = ""
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(str(cell) if cell is not None else "" for cell in row) + "\n"
)
content += "\n"
return content
async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
) -> tuple[bool, str]:
@ -1050,87 +1245,28 @@ async def pipeline_enqueue_file(
case ".pdf":
try:
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if not pm.is_installed("pypdf"): # type: ignore
pm.install("pypdf")
if not pm.is_installed("pycryptodome"): # type: ignore
pm.install("pycryptodome")
from pypdf import PdfReader # type: ignore
from io import BytesIO
pdf_file = BytesIO(file)
reader = PdfReader(pdf_file)
# Check if PDF is encrypted
if reader.is_encrypted:
pdf_password = global_args.pdf_decrypt_password
if not pdf_password:
# PDF is encrypted but no password provided
error_files = [
{
"file_path": str(file_path.name),
"error_description": "[File Extraction]PDF is encrypted but no password provided",
"original_error": "Please set PDF_DECRYPT_PASSWORD environment variable to decrypt this PDF file",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"[File Extraction]PDF is encrypted but no password provided: {file_path.name}"
)
return False, track_id
# Try to decrypt with password
try:
decrypt_result = reader.decrypt(pdf_password)
if decrypt_result == 0:
# Password is incorrect
error_files = [
{
"file_path": str(file_path.name),
"error_description": "[File Extraction]Failed to decrypt PDF - incorrect password",
"original_error": "The provided PDF_DECRYPT_PASSWORD is incorrect for this file",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"[File Extraction]Incorrect PDF password: {file_path.name}"
)
return False, track_id
except Exception as decrypt_error:
# Decryption process error
error_files = [
{
"file_path": str(file_path.name),
"error_description": "[File Extraction]PDF decryption failed",
"original_error": f"Error during PDF decryption: {str(decrypt_error)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"[File Extraction]PDF decryption error for {file_path.name}: {str(decrypt_error)}"
)
return False, track_id
# Extract text from PDF (encrypted PDFs are now decrypted, unencrypted PDFs proceed directly)
for page in reader.pages:
content += page.extract_text() + "\n"
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
)
# Use pypdf (non-blocking via to_thread)
content = await asyncio.to_thread(
_extract_pdf_pypdf,
file,
global_args.pdf_decrypt_password,
)
except Exception as e:
error_files = [
{
@ -1150,28 +1286,24 @@ async def pipeline_enqueue_file(
case ".docx":
try:
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if not pm.is_installed("python-docx"): # type: ignore
try:
pm.install("python-docx")
except Exception:
pm.install("docx")
from docx import Document # type: ignore
from io import BytesIO
docx_file = BytesIO(file)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
)
# Use python-docx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_docx, file)
except Exception as e:
error_files = [
{
@ -1191,26 +1323,24 @@ async def pipeline_enqueue_file(
case ".pptx":
try:
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if not pm.is_installed("python-pptx"): # type: ignore
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
pptx_file = BytesIO(file)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
)
# Use python-pptx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_pptx, file)
except Exception as e:
error_files = [
{
@ -1230,33 +1360,24 @@ async def pipeline_enqueue_file(
case ".xlsx":
try:
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if not pm.is_installed("openpyxl"): # type: ignore
pm.install("openpyxl")
from openpyxl import load_workbook # type: ignore
from io import BytesIO
xlsx_file = BytesIO(file)
wb = load_workbook(xlsx_file)
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(
str(cell) if cell is not None else ""
for cell in row
)
+ "\n"
)
content += "\n"
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
)
# Use openpyxl (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_xlsx, file)
except Exception as e:
error_files = [
{
@ -1559,11 +1680,15 @@ async def background_delete_documents(
"""Background task to delete multiple documents"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_namespace_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
total_docs = len(doc_ids)
successful_deletions = []
@ -1579,6 +1704,7 @@ async def background_delete_documents(
pipeline_status.update(
{
"busy": True,
# Job name can not be changed, it's verified in adelete_by_doc_id()
"job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(),
"docs": total_docs,
@ -1597,7 +1723,19 @@ async def background_delete_documents(
try:
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
# Check for cancellation at the start of each document deletion
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
# Add remaining documents to failed list with cancellation reason
failed_deletions.extend(
doc_ids[i - 1 :]
) # i-1 because enumerate starts at 1
break # Exit the loop, remaining documents unchanged
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i
@ -1760,6 +1898,10 @@ async def background_delete_documents(
# Final summary and check for pending requests
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["pending_requests"] = False # Reset pending requests flag
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
@ -2036,12 +2178,16 @@ def create_document_routes(
"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_namespace_lock,
)
# Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Check and set status with lock
async with pipeline_status_lock:
@ -2232,13 +2378,19 @@ def create_document_routes(
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_all_update_flags_status,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Get update flags status for all namespaces
update_status = await get_all_update_flags_status()
update_status = await get_all_update_flags_status(workspace=rag.workspace)
# Convert MutableBoolean objects to regular boolean values
processed_update_status = {}
@ -2252,8 +2404,9 @@ def create_document_routes(
processed_flags.append(bool(flag))
processed_update_status[namespace] = processed_flags
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
async with pipeline_status_lock:
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Add processed update_status to the status dictionary
status_dict["update_status"] = processed_update_status
@ -2293,7 +2446,7 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
# TODO: Deprecated
# TODO: Deprecated, use /documents/paginated instead
@router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
@ -2440,17 +2593,26 @@ def create_document_routes(
doc_ids = delete_request.doc_ids
try:
from lightrag.kg.shared_storage import get_namespace_data
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Check if pipeline is busy
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Check if pipeline is busy with proper lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Add deletion task to background tasks
background_tasks.add_task(
@ -2817,4 +2979,67 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/cancel_pipeline",
response_model=CancelPipelineResponse,
dependencies=[Depends(combined_auth)],
)
async def cancel_pipeline():
"""
Request cancellation of the currently running pipeline.
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
1. Check this flag at key processing points
2. Stop processing new documents
3. Cancel all running document processing tasks
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
The cancellation is graceful and ensures data consistency. Documents that have
completed processing will remain in PROCESSED status.
Returns:
CancelPipelineResponse: Response with status and message
- status="cancellation_requested": Cancellation flag has been set
- status="not_busy": Pipeline is not currently running
Raises:
HTTPException: If an error occurs while setting cancellation flag (500).
"""
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
return CancelPipelineResponse(
status="not_busy",
message="Pipeline is not currently running. No cancellation needed.",
)
# Set cancellation flag
pipeline_status["cancellation_requested"] = True
cancel_msg = "Pipeline cancellation requested by user"
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
return CancelPipelineResponse(
status="cancellation_requested",
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
)
except Exception as e:
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
return router