This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:14 +08:00
parent 86a6159b3f
commit 201084e05a
5 changed files with 199 additions and 574 deletions

View file

@ -3,15 +3,14 @@ This module contains all document-related routes for the LightRAG API.
"""
import asyncio
from functools import lru_cache
from lightrag.utils import logger, get_pinyin_sort_key
import aiofiles
import shutil
import traceback
import pipmaster as pm
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from io import BytesIO
from fastapi import (
APIRouter,
BackgroundTasks,
@ -29,24 +28,6 @@ from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@lru_cache(maxsize=1)
def _is_docling_available() -> bool:
"""Check if docling is available (cached check).
This function uses lru_cache to avoid repeated import attempts.
The result is cached after the first call.
Returns:
bool: True if docling is available, False otherwise
"""
try:
import docling # noqa: F401 # type: ignore[import-not-found]
return True
except ImportError:
return False
# Function to format datetime to ISO format string with timezone information
def format_datetime(dt: Any) -> Optional[str]:
"""Format datetime to ISO format string with timezone information
@ -180,28 +161,6 @@ class ReprocessResponse(BaseModel):
}
class CancelPipelineResponse(BaseModel):
"""Response model for pipeline cancellation operation
Attributes:
status: Status of the cancellation request
message: Message describing the operation result
"""
status: Literal["cancellation_requested", "not_busy"] = Field(
description="Status of the cancellation request"
)
message: str = Field(description="Human-readable message describing the operation")
class Config:
json_schema_extra = {
"example": {
"status": "cancellation_requested",
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
}
}
class InsertTextRequest(BaseModel):
"""Request model for inserting a single text document
@ -377,10 +336,6 @@ class DeleteDocRequest(BaseModel):
default=False,
description="Whether to delete the corresponding file in the upload directory.",
)
delete_llm_cache: bool = Field(
default=False,
description="Whether to delete cached LLM extraction results for the documents.",
)
@field_validator("doc_ids", mode="after")
@classmethod
@ -451,7 +406,7 @@ class DocStatusResponse(BaseModel):
"id": "doc_123456",
"content_summary": "Research paper on machine learning",
"content_length": 15240,
"status": "processed",
"status": "PROCESSED",
"created_at": "2025-03-31T12:34:56",
"updated_at": "2025-03-31T12:35:30",
"track_id": "upload_20250729_170612_abc123",
@ -484,7 +439,7 @@ class DocsStatusesResponse(BaseModel):
"id": "doc_123",
"content_summary": "Pending document",
"content_length": 5000,
"status": "pending",
"status": "PENDING",
"created_at": "2025-03-31T10:00:00",
"updated_at": "2025-03-31T10:00:00",
"track_id": "upload_20250331_100000_abc123",
@ -494,27 +449,12 @@ class DocsStatusesResponse(BaseModel):
"file_path": "pending_doc.pdf",
}
],
"PREPROCESSED": [
{
"id": "doc_789",
"content_summary": "Document pending final indexing",
"content_length": 7200,
"status": "preprocessed",
"created_at": "2025-03-31T09:30:00",
"updated_at": "2025-03-31T09:35:00",
"track_id": "upload_20250331_093000_xyz789",
"chunks_count": 10,
"error": None,
"metadata": None,
"file_path": "preprocessed_doc.pdf",
}
],
"PROCESSED": [
{
"id": "doc_456",
"content_summary": "Processed document",
"content_length": 8000,
"status": "processed",
"status": "PROCESSED",
"created_at": "2025-03-31T09:00:00",
"updated_at": "2025-03-31T09:05:00",
"track_id": "insert_20250331_090000_def456",
@ -686,7 +626,6 @@ class PaginatedDocsResponse(BaseModel):
"status_counts": {
"PENDING": 10,
"PROCESSING": 5,
"PREPROCESSED": 5,
"PROCESSED": 130,
"FAILED": 5,
},
@ -709,7 +648,6 @@ class StatusCountsResponse(BaseModel):
"status_counts": {
"PENDING": 10,
"PROCESSING": 5,
"PREPROCESSED": 5,
"PROCESSED": 130,
"FAILED": 5,
}
@ -898,6 +836,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
Returns:
str: Unique filename (may have numeric suffix added)
"""
from pathlib import Path
import time
original_path = Path(original_name)
@ -920,258 +859,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
return f"{base_name}_{timestamp}{extension}"
# Document processing helper functions (synchronous)
# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop
def _convert_with_docling(file_path: Path) -> str:
"""Convert document using docling (synchronous).
Args:
file_path: Path to the document file
Returns:
str: Extracted markdown content
"""
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()
def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
"""Extract PDF content using pypdf (synchronous).
Args:
file_bytes: PDF file content as bytes
password: Optional password for encrypted PDFs
Returns:
str: Extracted text content
Raises:
Exception: If PDF is encrypted and password is incorrect or missing
"""
from pypdf import PdfReader # type: ignore
pdf_file = BytesIO(file_bytes)
reader = PdfReader(pdf_file)
# Check if PDF is encrypted
if reader.is_encrypted:
if not password:
raise Exception("PDF is encrypted but no password provided")
decrypt_result = reader.decrypt(password)
if decrypt_result == 0:
raise Exception("Incorrect PDF password")
# Extract text from all pages
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
return content
def _extract_docx(file_bytes: bytes) -> str:
"""Extract DOCX content including tables in document order (synchronous).
Args:
file_bytes: DOCX file content as bytes
Returns:
str: Extracted text content with tables in their original positions.
Tables are separated from paragraphs with blank lines for clarity.
"""
from docx import Document # type: ignore
from docx.table import Table # type: ignore
from docx.text.paragraph import Paragraph # type: ignore
docx_file = BytesIO(file_bytes)
doc = Document(docx_file)
content_parts = []
in_table = False # Track if we're currently processing a table
# Iterate through all body elements in document order
for element in doc.element.body:
# Check if element is a paragraph
if element.tag.endswith("p"):
# If coming out of a table, add blank line after table
if in_table:
content_parts.append("") # Blank line after table
in_table = False
paragraph = Paragraph(element, doc)
text = paragraph.text
# Always append to preserve document spacing (including blank paragraphs)
content_parts.append(text)
# Check if element is a table
elif element.tag.endswith("tbl"):
# Add blank line before table (if content exists)
if content_parts and not in_table:
content_parts.append("") # Blank line before table
in_table = True
table = Table(element, doc)
for row in table.rows:
row_text = []
for cell in row.cells:
cell_text = cell.text
# Always append cell text to preserve column structure
row_text.append(cell_text)
# Only add row if at least one cell has content
if any(cell for cell in row_text):
content_parts.append("\t".join(row_text))
return "\n".join(content_parts)
def _extract_pptx(file_bytes: bytes) -> str:
"""Extract PPTX content (synchronous).
Args:
file_bytes: PPTX file content as bytes
Returns:
str: Extracted text content
"""
from pptx import Presentation # type: ignore
pptx_file = BytesIO(file_bytes)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
return content
def _extract_xlsx(file_bytes: bytes) -> str:
"""Extract XLSX content in tab-delimited format with clear sheet separation.
This function processes Excel workbooks and converts them to a structured text format
suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
separator lines, and special characters are escaped to preserve the tab-delimited structure.
Features:
- Each sheet is wrapped with '====================' separators for visual distinction
- Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
- Column alignment is preserved across all rows to maintain tabular structure
- Empty rows are preserved as blank lines to maintain row structure
- Two-pass processing: determines max column width, then extracts with consistent alignment
Args:
file_bytes: XLSX file content as bytes
Returns:
str: Extracted text content with all sheets in tab-delimited format.
Format: Sheet separators, sheet name, then tab-delimited rows.
Example output:
==================== Sheet: Data ====================
Name\tAge\tCity
Alice\t30\tNew York
Bob\t25\tLondon
==================== Sheet: Summary ====================
Total\t2
====================
"""
from openpyxl import load_workbook # type: ignore
xlsx_file = BytesIO(file_bytes)
wb = load_workbook(xlsx_file)
def escape_cell(cell_value: str | int | float | None) -> str:
"""Escape characters that would break tab-delimited layout.
Escape order is critical: backslashes first, then tabs/newlines.
This prevents double-escaping issues.
Args:
cell_value: The cell value to escape (can be None, str, int, or float)
Returns:
str: Escaped cell value safe for tab-delimited format
"""
if cell_value is None:
return ""
text = str(cell_value)
# CRITICAL: Escape backslash first to avoid double-escaping
return (
text.replace("\\", "\\\\") # Must be first: \ -> \\
.replace("\t", "\\t") # Tab -> \t (visible)
.replace("\r\n", "\\n") # Windows newline -> \n
.replace("\r", "\\n") # Mac newline -> \n
.replace("\n", "\\n") # Unix newline -> \n
)
def escape_sheet_title(title: str) -> str:
"""Escape sheet title to prevent formatting issues in separators.
Args:
title: Original sheet title
Returns:
str: Sanitized sheet title with tabs/newlines replaced
"""
return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")
content_parts: list[str] = []
sheet_separator = "=" * 20
for idx, sheet in enumerate(wb):
if idx > 0:
content_parts.append("") # Blank line between sheets for readability
# Escape sheet title to handle edge cases with special characters
safe_title = escape_sheet_title(sheet.title)
content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")
# Two-pass approach to preserve column alignment:
# Pass 1: Determine the maximum column width for this sheet
max_columns = 0
all_rows = list(sheet.iter_rows(values_only=True))
for row in all_rows:
last_nonempty_idx = -1
for idx, cell in enumerate(row):
# Check if cell has meaningful content (not None or empty string)
if cell is not None and str(cell).strip():
last_nonempty_idx = idx
if last_nonempty_idx >= 0:
max_columns = max(max_columns, last_nonempty_idx + 1)
# Pass 2: Extract rows with consistent width to preserve column alignment
for row in all_rows:
row_parts = []
# Build row up to max_columns width
for idx in range(max_columns):
if idx < len(row):
row_parts.append(escape_cell(row[idx]))
else:
row_parts.append("") # Pad short rows
# Check if row is completely empty
if all(part == "" for part in row_parts):
# Preserve empty rows as blank lines (maintains row structure)
content_parts.append("")
else:
# Join all columns to maintain consistent column count
content_parts.append("\t".join(row_parts))
# Final separator for symmetry (makes parsing easier)
content_parts.append(sheet_separator)
return "\n".join(content_parts)
async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
) -> tuple[bool, str]:
@ -1342,28 +1029,24 @@ async def pipeline_enqueue_file(
case ".pdf":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
)
# Use pypdf (non-blocking via to_thread)
content = await asyncio.to_thread(
_extract_pdf_pypdf,
file,
global_args.pdf_decrypt_password,
)
if not pm.is_installed("pypdf2"): # type: ignore
pm.install("pypdf2")
from PyPDF2 import PdfReader # type: ignore
from io import BytesIO
pdf_file = BytesIO(file)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
except Exception as e:
error_files = [
{
@ -1383,24 +1066,28 @@ async def pipeline_enqueue_file(
case ".docx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
)
# Use python-docx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_docx, file)
if not pm.is_installed("python-docx"): # type: ignore
try:
pm.install("python-docx")
except Exception:
pm.install("docx")
from docx import Document # type: ignore
from io import BytesIO
docx_file = BytesIO(file)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
except Exception as e:
error_files = [
{
@ -1420,24 +1107,26 @@ async def pipeline_enqueue_file(
case ".pptx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
)
# Use python-pptx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_pptx, file)
if not pm.is_installed("python-pptx"): # type: ignore
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
pptx_file = BytesIO(file)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
except Exception as e:
error_files = [
{
@ -1457,24 +1146,33 @@ async def pipeline_enqueue_file(
case ".xlsx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
)
# Use openpyxl (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_xlsx, file)
if not pm.is_installed("openpyxl"): # type: ignore
pm.install("openpyxl")
from openpyxl import load_workbook # type: ignore
from io import BytesIO
xlsx_file = BytesIO(file)
wb = load_workbook(xlsx_file)
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(
str(cell) if cell is not None else ""
for cell in row
)
+ "\n"
)
content += "\n"
except Exception as e:
error_files = [
{
@ -1772,20 +1470,15 @@ async def background_delete_documents(
doc_manager: DocumentManager,
doc_ids: List[str],
delete_file: bool = False,
delete_llm_cache: bool = False,
):
"""Background task to delete multiple documents"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_pipeline_status_lock,
)
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
total_docs = len(doc_ids)
successful_deletions = []
@ -1801,7 +1494,6 @@ async def background_delete_documents(
pipeline_status.update(
{
"busy": True,
# Job name can not be changed, it's verified in adelete_by_doc_id()
"job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(),
"docs": total_docs,
@ -1812,27 +1504,11 @@ async def background_delete_documents(
)
# Use slice assignment to clear the list in place
pipeline_status["history_messages"][:] = ["Starting document deletion process"]
if delete_llm_cache:
pipeline_status["history_messages"].append(
"LLM cache cleanup requested for this deletion job"
)
try:
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
# Check for cancellation at the start of each document deletion
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
# Add remaining documents to failed list with cancellation reason
failed_deletions.extend(
doc_ids[i - 1 :]
) # i-1 because enumerate starts at 1
break # Exit the loop, remaining documents unchanged
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i
@ -1841,9 +1517,7 @@ async def background_delete_documents(
file_path = "#"
try:
result = await rag.adelete_by_doc_id(
doc_id, delete_llm_cache=delete_llm_cache
)
result = await rag.adelete_by_doc_id(doc_id)
file_path = (
getattr(result, "file_path", "-") if "result" in locals() else "-"
)
@ -1995,10 +1669,6 @@ async def background_delete_documents(
# Final summary and check for pending requests
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["pending_requests"] = False # Reset pending requests flag
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
@ -2275,16 +1945,12 @@ def create_document_routes(
"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_pipeline_status_lock,
)
# Get pipeline status and lock
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Check and set status with lock
async with pipeline_status_lock:
@ -2320,8 +1986,6 @@ def create_document_routes(
rag.full_docs,
rag.full_entities,
rag.full_relations,
rag.entity_chunks,
rag.relation_chunks,
rag.entities_vdb,
rag.relationships_vdb,
rag.chunks_vdb,
@ -2475,19 +2139,13 @@ def create_document_routes(
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_all_update_flags_status,
)
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
pipeline_status = await get_namespace_data("pipeline_status")
# Get update flags status for all namespaces
update_status = await get_all_update_flags_status(workspace=rag.workspace)
update_status = await get_all_update_flags_status()
# Convert MutableBoolean objects to regular boolean values
processed_update_status = {}
@ -2501,9 +2159,8 @@ def create_document_routes(
processed_flags.append(bool(flag))
processed_update_status[namespace] = processed_flags
async with pipeline_status_lock:
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Add processed update_status to the status dictionary
status_dict["update_status"] = processed_update_status
@ -2543,7 +2200,7 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
# TODO: Deprecated, use /documents/paginated instead
# TODO: Deprecated
@router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
@ -2553,7 +2210,7 @@ def create_document_routes(
To prevent excessive resource consumption, a maximum of 1,000 records is returned.
This endpoint retrieves the current status of all documents, grouped by their
processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are
processing status (PENDING, PROCESSING, PROCESSED, FAILED). The results are
limited to 1000 total documents with fair distribution across all statuses.
Returns:
@ -2569,7 +2226,6 @@ def create_document_routes(
statuses = (
DocStatus.PENDING,
DocStatus.PROCESSING,
DocStatus.PREPROCESSED,
DocStatus.PROCESSED,
DocStatus.FAILED,
)
@ -2668,20 +2324,21 @@ def create_document_routes(
Delete documents and all their associated data by their IDs using background processing.
Deletes specific documents and all their associated data, including their status,
text chunks, vector embeddings, and any related graph data. When requested,
cached LLM extraction responses are removed after graph deletion/rebuild completes.
text chunks, vector embeddings, and any related graph data.
The deletion process runs in the background to avoid blocking the client connection.
It is disabled when llm cache for entity extraction is disabled.
This operation is irreversible and will interact with the pipeline status.
Args:
delete_request (DeleteDocRequest): The request containing the document IDs and deletion options.
delete_request (DeleteDocRequest): The request containing the document IDs and delete_file options.
background_tasks: FastAPI BackgroundTasks for async processing
Returns:
DeleteDocByIdResponse: The result of the deletion operation.
- status="deletion_started": The document deletion has been initiated in the background.
- status="busy": The pipeline is busy with another operation.
- status="not_allowed": Operation not allowed when LLM cache for entity extraction is disabled.
Raises:
HTTPException:
@ -2689,27 +2346,27 @@ def create_document_routes(
"""
doc_ids = delete_request.doc_ids
# The rag object is initialized from the server startup args,
# so we can access its properties here.
if not rag.enable_llm_cache_for_entity_extract:
return DeleteDocByIdResponse(
status="not_allowed",
message="Operation not allowed when LLM cache for entity extraction is disabled.",
doc_id=", ".join(delete_request.doc_ids),
)
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
pipeline_status = await get_namespace_data("pipeline_status")
# Check if pipeline is busy with proper lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Check if pipeline is busy
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Add deletion task to background tasks
background_tasks.add_task(
@ -2718,7 +2375,6 @@ def create_document_routes(
doc_manager,
doc_ids,
delete_request.delete_file,
delete_request.delete_llm_cache,
)
return DeleteDocByIdResponse(
@ -3076,67 +2732,4 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/cancel_pipeline",
response_model=CancelPipelineResponse,
dependencies=[Depends(combined_auth)],
)
async def cancel_pipeline():
"""
Request cancellation of the currently running pipeline.
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
1. Check this flag at key processing points
2. Stop processing new documents
3. Cancel all running document processing tasks
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
The cancellation is graceful and ensures data consistency. Documents that have
completed processing will remain in PROCESSED status.
Returns:
CancelPipelineResponse: Response with status and message
- status="cancellation_requested": Cancellation flag has been set
- status="not_busy": Pipeline is not currently running
Raises:
HTTPException: If an error occurs while setting cancellation flag (500).
"""
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
return CancelPipelineResponse(
status="not_busy",
message="Pipeline is not currently running. No cancellation needed.",
)
# Set cancellation flag
pipeline_status["cancellation_requested"] = True
cancel_msg = "Pipeline cancellation requested by user"
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
return CancelPipelineResponse(
status="cancellation_requested",
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
)
except Exception as e:
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
return router

View file

@ -155,6 +155,12 @@ export type ScanResponse = {
track_id: string
}
export type ReprocessFailedResponse = {
status: 'reprocessing_started'
message: string
track_id: string
}
export type DeleteDocResponse = {
status: 'deletion_started' | 'busy' | 'not_allowed'
message: string
@ -305,6 +311,11 @@ export const scanNewDocuments = async (): Promise<ScanResponse> => {
return response.data
}
export const reprocessFailedDocuments = async (): Promise<ReprocessFailedResponse> => {
const response = await axiosInstance.post('/documents/reprocess_failed')
return response.data
}
export const getDocumentsScanProgress = async (): Promise<LightragDocumentsScanProgress> => {
const response = await axiosInstance.get('/documents/scan-progress')
return response.data

View file

@ -22,6 +22,7 @@ import PaginationControls from '@/components/ui/PaginationControls'
import {
scanNewDocuments,
reprocessFailedDocuments,
getDocumentsPaginated,
DocsStatusesResponse,
DocStatus,
@ -913,6 +914,42 @@ export default function DocumentManager() {
}
}, [t, startPollingInterval, currentTab, health, statusCounts])
const retryFailedDocuments = useCallback(async () => {
try {
// Check if component is still mounted before starting the request
if (!isMountedRef.current) return;
const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
// Check again if component is still mounted after the request completes
if (!isMountedRef.current) return;
// Note: _track_id is available for future use (e.g., progress tracking)
toast.message(message || status);
// Reset health check timer with 1 second delay to avoid race condition
useBackendState.getState().resetHealthCheckTimerDelayed(1000);
// Start fast refresh with 2-second interval immediately after retry
startPollingInterval(2000);
// Set recovery timer to restore normal polling interval after 15 seconds
setTimeout(() => {
if (isMountedRef.current && currentTab === 'documents' && health) {
// Restore intelligent polling interval based on document status
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
const normalInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(normalInterval);
}
}, 15000); // Restore after 15 seconds
} catch (err) {
// Only show error if component is still mounted
if (isMountedRef.current) {
toast.error(errorMessage(err));
}
}
}, [startPollingInterval, currentTab, health, statusCounts])
// Handle page size change - update state and save to store
const handlePageSizeChange = useCallback((newPageSize: number) => {
if (newPageSize === pagination.page_size) return;
@ -1289,6 +1326,16 @@ export default function DocumentManager() {
>
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
</Button>
<Button
variant="outline"
onClick={retryFailedDocuments}
side="bottom"
tooltip={t('documentPanel.documentManager.retryFailedTooltip')}
size="sm"
disabled={pipelineBusy}
>
<RotateCcwIcon /> {t('documentPanel.documentManager.retryFailedButton')}
</Button>
<Button
variant="outline"
onClick={() => setShowPipelineStatus(true)}

View file

@ -70,7 +70,6 @@
"confirmButton": "YES",
"deleteFileOption": "Also delete uploaded files",
"deleteFileTooltip": "Check this option to also delete the corresponding uploaded files on the server",
"deleteLLMCacheOption": "Also delete extracted LLM cache",
"success": "Document deletion pipeline started successfully",
"failed": "Delete Documents Failed:\n{{message}}",
"error": "Delete Documents Failed:\n{{error}}",
@ -115,12 +114,12 @@
"documentManager": {
"title": "Document Management",
"scanButton": "Scan",
"scanTooltip": "Scan and process documents in input folder, and also reprocess all failed documents",
"retryFailedButton": "Retry",
"scanTooltip": "Scan documents in input folder",
"retryFailedButton": "Retry Failed",
"retryFailedTooltip": "Retry processing all failed documents",
"refreshTooltip": "Reset document list",
"pipelineStatusButton": "Pipeline",
"pipelineStatusTooltip": "View document processing pipeline status",
"pipelineStatusButton": "Pipeline Status",
"pipelineStatusTooltip": "View pipeline status",
"uploadedTitle": "Uploaded Documents",
"uploadedDescription": "List of uploaded documents and their statuses.",
"emptyTitle": "No Documents",
@ -140,7 +139,6 @@
"status": {
"all": "All",
"completed": "Completed",
"preprocessed": "Preprocessed",
"processing": "Processing",
"pending": "Pending",
"failed": "Failed"
@ -160,25 +158,14 @@
"title": "Pipeline Status",
"busy": "Pipeline Busy",
"requestPending": "Request Pending",
"cancellationRequested": "Cancellation Requested",
"jobName": "Job Name",
"startTime": "Start Time",
"progress": "Progress",
"unit": "Batch",
"unit": "batch",
"latestMessage": "Latest Message",
"historyMessages": "History Messages",
"cancelButton": "Cancel",
"cancelTooltip": "Cancel pipeline processing",
"cancelConfirmTitle": "Confirm Pipeline Cancellation",
"cancelConfirmDescription": "This will interrupt the ongoing pipeline processing. Are you sure you want to continue?",
"cancelConfirmButton": "Confirm Cancellation",
"cancelInProgress": "Cancellation in progress...",
"pipelineNotRunning": "Pipeline not running",
"cancelSuccess": "Pipeline cancellation requested",
"cancelFailed": "Failed to cancel pipeline\n{{error}}",
"cancelNotBusy": "Pipeline is not running, no need to cancel",
"errors": {
"fetchFailed": "Failed to fetch pipeline status\n{{error}}"
"fetchFailed": "Failed to get pipeline status\n{{error}}"
}
}
},
@ -330,9 +317,9 @@
"description": "Description",
"entity_id": "Name",
"entity_type": "Type",
"source_id": "C-ID",
"source_id": "SrcID",
"Neighbour": "Neigh",
"file_path": "File",
"file_path": "Source",
"keywords": "Keys",
"weight": "Weight"
}
@ -348,14 +335,14 @@
},
"search": {
"placeholder": "Search nodes in page...",
"message": "And {{count}} others"
"message": "And {count} others"
},
"graphLabels": {
"selectTooltip": "Get subgraph of a node (label)",
"noLabels": "No matching nodes found",
"label": "Search node name",
"placeholder": "Search node name...",
"andOthers": "And {{count}} others",
"andOthers": "And {count} others",
"refreshGlobalTooltip": "Refresh global graph data and reset search history",
"refreshCurrentLabelTooltip": "Refresh current page graph data",
"refreshingTooltip": "Refreshing data..."

View file

@ -70,7 +70,6 @@
"confirmButton": "确定",
"deleteFileOption": "同时删除上传文件",
"deleteFileTooltip": "选中此选项将同时删除服务器上对应的上传文件",
"deleteLLMCacheOption": "同时删除实体关系抽取 LLM 缓存",
"success": "文档删除流水线启动成功",
"failed": "删除文档失败:\n{{message}}",
"error": "删除文档失败:\n{{error}}",
@ -115,12 +114,12 @@
"documentManager": {
"title": "文档管理",
"scanButton": "扫描",
"scanTooltip": "扫描处理输入目录中的文档,同时重新处理所有失败的文档",
"retryFailedButton": "重试",
"scanTooltip": "扫描输入目录中的文档",
"retryFailedButton": "重试失败",
"retryFailedTooltip": "重新处理所有失败的文档",
"refreshTooltip": "复位文档清单",
"pipelineStatusButton": "流水线",
"pipelineStatusTooltip": "查看文档处理流水线状态",
"pipelineStatusButton": "流水线状态",
"pipelineStatusTooltip": "查看流水线状态",
"uploadedTitle": "已上传文档",
"uploadedDescription": "已上传文档列表及其状态",
"emptyTitle": "无文档",
@ -140,7 +139,6 @@
"status": {
"all": "全部",
"completed": "已完成",
"preprocessed": "预处理",
"processing": "处理中",
"pending": "等待中",
"failed": "失败"
@ -160,23 +158,12 @@
"title": "流水线状态",
"busy": "流水线忙碌",
"requestPending": "待处理请求",
"cancellationRequested": "取消请求",
"jobName": "作业名称",
"startTime": "开始时间",
"progress": "进度",
"unit": "批",
"latestMessage": "最新消息",
"historyMessages": "历史消息",
"cancelButton": "中断",
"cancelTooltip": "中断流水线处理",
"cancelConfirmTitle": "确认中断流水线",
"cancelConfirmDescription": "此操作将中断正在进行的流水线处理。确定要继续吗?",
"cancelConfirmButton": "确认中断",
"cancelInProgress": "取消请求进行中...",
"pipelineNotRunning": "流水线未运行",
"cancelSuccess": "流水线中断请求已发送",
"cancelFailed": "中断流水线失败\n{{error}}",
"cancelNotBusy": "流水线未运行,无需中断",
"errors": {
"fetchFailed": "获取流水线状态失败\n{{error}}"
}
@ -330,9 +317,9 @@
"description": "描述",
"entity_id": "名称",
"entity_type": "类型",
"source_id": "C-ID",
"source_id": "信源ID",
"Neighbour": "邻接",
"file_path": "文件",
"file_path": "信源",
"keywords": "Keys",
"weight": "权重"
}
@ -348,14 +335,14 @@
},
"search": {
"placeholder": "页面内搜索节点...",
"message": "还有 {{count}} 个"
"message": "还有 {count} 个"
},
"graphLabels": {
"selectTooltip": "获取节点(标签)子图",
"noLabels": "未找到匹配的节点",
"label": "搜索节点名称",
"placeholder": "搜索节点名称...",
"andOthers": "还有 {{count}} 个",
"andOthers": "还有 {count} 个",
"refreshGlobalTooltip": "刷新全图数据和重置搜索历史",
"refreshCurrentLabelTooltip": "刷新当前页面图数据",
"refreshingTooltip": "正在刷新数据..."