fix: sync document_routes.py from upstream to resolve conflict markers
This commit is contained in:
parent
d8c80243a4
commit
4621ee216e
1 changed files with 415 additions and 101 deletions
|
|
@ -3,6 +3,7 @@ This module contains all document-related routes for the LightRAG API.
|
|||
"""
|
||||
|
||||
import asyncio
|
||||
from functools import lru_cache
|
||||
from lightrag.utils import logger, get_pinyin_sort_key
|
||||
import aiofiles
|
||||
import shutil
|
||||
|
|
@ -23,23 +24,31 @@ from pydantic import BaseModel, Field, field_validator
|
|||
|
||||
from lightrag import LightRAG
|
||||
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
|
||||
from lightrag.utils import generate_track_id
|
||||
from lightrag.utils import (
|
||||
generate_track_id,
|
||||
compute_mdhash_id,
|
||||
sanitize_text_for_encoding,
|
||||
)
|
||||
from lightrag.api.utils_api import get_combined_auth_dependency
|
||||
from ..config import global_args
|
||||
|
||||
# Check docling availability at module load time
|
||||
DOCLING_AVAILABLE = False
|
||||
try:
|
||||
import docling # noqa: F401 # type: ignore[import-not-found]
|
||||
|
||||
DOCLING_AVAILABLE = True
|
||||
except ImportError:
|
||||
if global_args.document_loading_engine == "DOCLING":
|
||||
logger.warning(
|
||||
"DOCLING engine requested but 'docling' package not installed. "
|
||||
"Falling back to standard document processing. "
|
||||
"To use DOCLING, install with: pip install lightrag-hku[api,docling]"
|
||||
)
|
||||
@lru_cache(maxsize=1)
|
||||
def _is_docling_available() -> bool:
|
||||
"""Check if docling is available (cached check).
|
||||
|
||||
This function uses lru_cache to avoid repeated import attempts.
|
||||
The result is cached after the first call.
|
||||
|
||||
Returns:
|
||||
bool: True if docling is available, False otherwise
|
||||
"""
|
||||
try:
|
||||
import docling # noqa: F401 # type: ignore[import-not-found]
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
# Function to format datetime to ISO format string with timezone information
|
||||
|
|
@ -154,7 +163,7 @@ class ReprocessResponse(BaseModel):
|
|||
Attributes:
|
||||
status: Status of the reprocessing operation
|
||||
message: Message describing the operation result
|
||||
track_id: Tracking ID for monitoring reprocessing progress
|
||||
track_id: Always empty string. Reprocessed documents retain their original track_id.
|
||||
"""
|
||||
|
||||
status: Literal["reprocessing_started"] = Field(
|
||||
|
|
@ -162,7 +171,8 @@ class ReprocessResponse(BaseModel):
|
|||
)
|
||||
message: str = Field(description="Human-readable message describing the operation")
|
||||
track_id: str = Field(
|
||||
description="Tracking ID for monitoring reprocessing progress"
|
||||
default="",
|
||||
description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
|
||||
)
|
||||
|
||||
class Config:
|
||||
|
|
@ -170,7 +180,29 @@ class ReprocessResponse(BaseModel):
|
|||
"example": {
|
||||
"status": "reprocessing_started",
|
||||
"message": "Reprocessing of failed documents has been initiated in background",
|
||||
"track_id": "retry_20250729_170612_def456",
|
||||
"track_id": "",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class CancelPipelineResponse(BaseModel):
|
||||
"""Response model for pipeline cancellation operation
|
||||
|
||||
Attributes:
|
||||
status: Status of the cancellation request
|
||||
message: Message describing the operation result
|
||||
"""
|
||||
|
||||
status: Literal["cancellation_requested", "not_busy"] = Field(
|
||||
description="Status of the cancellation request"
|
||||
)
|
||||
message: str = Field(description="Human-readable message describing the operation")
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"status": "cancellation_requested",
|
||||
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -350,6 +382,10 @@ class DeleteDocRequest(BaseModel):
|
|||
default=False,
|
||||
description="Whether to delete the corresponding file in the upload directory.",
|
||||
)
|
||||
delete_llm_cache: bool = Field(
|
||||
default=False,
|
||||
description="Whether to delete cached LLM extraction results for the documents.",
|
||||
)
|
||||
|
||||
@field_validator("doc_ids", mode="after")
|
||||
@classmethod
|
||||
|
|
@ -945,19 +981,82 @@ def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
|
|||
|
||||
|
||||
def _extract_docx(file_bytes: bytes) -> str:
|
||||
"""Extract DOCX content (synchronous).
|
||||
"""Extract DOCX content including tables in document order (synchronous).
|
||||
|
||||
Args:
|
||||
file_bytes: DOCX file content as bytes
|
||||
|
||||
Returns:
|
||||
str: Extracted text content
|
||||
str: Extracted text content with tables in their original positions.
|
||||
Tables are separated from paragraphs with blank lines for clarity.
|
||||
"""
|
||||
from docx import Document # type: ignore
|
||||
from docx.table import Table # type: ignore
|
||||
from docx.text.paragraph import Paragraph # type: ignore
|
||||
|
||||
docx_file = BytesIO(file_bytes)
|
||||
doc = Document(docx_file)
|
||||
return "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
||||
|
||||
def escape_cell(cell_value: str | None) -> str:
|
||||
"""Escape characters that would break tab-delimited layout.
|
||||
|
||||
Escape order is critical: backslashes first, then tabs/newlines.
|
||||
This prevents double-escaping issues.
|
||||
|
||||
Args:
|
||||
cell_value: The cell value to escape (can be None or str)
|
||||
|
||||
Returns:
|
||||
str: Escaped cell value safe for tab-delimited format
|
||||
"""
|
||||
if cell_value is None:
|
||||
return ""
|
||||
text = str(cell_value)
|
||||
# CRITICAL: Escape backslash first to avoid double-escaping
|
||||
return (
|
||||
text.replace("\\", "\\\\") # Must be first: \ -> \\
|
||||
.replace("\t", "\\t") # Tab -> \t (visible)
|
||||
.replace("\r\n", "\\n") # Windows newline -> \n
|
||||
.replace("\r", "\\n") # Mac newline -> \n
|
||||
.replace("\n", "\\n") # Unix newline -> \n
|
||||
)
|
||||
|
||||
content_parts = []
|
||||
in_table = False # Track if we're currently processing a table
|
||||
|
||||
# Iterate through all body elements in document order
|
||||
for element in doc.element.body:
|
||||
# Check if element is a paragraph
|
||||
if element.tag.endswith("p"):
|
||||
# If coming out of a table, add blank line after table
|
||||
if in_table:
|
||||
content_parts.append("") # Blank line after table
|
||||
in_table = False
|
||||
|
||||
paragraph = Paragraph(element, doc)
|
||||
text = paragraph.text
|
||||
# Always append to preserve document spacing (including blank paragraphs)
|
||||
content_parts.append(text)
|
||||
|
||||
# Check if element is a table
|
||||
elif element.tag.endswith("tbl"):
|
||||
# Add blank line before table (if content exists)
|
||||
if content_parts and not in_table:
|
||||
content_parts.append("") # Blank line before table
|
||||
|
||||
in_table = True
|
||||
table = Table(element, doc)
|
||||
for row in table.rows:
|
||||
row_text = []
|
||||
for cell in row.cells:
|
||||
cell_text = cell.text
|
||||
# Escape special characters to preserve tab-delimited structure
|
||||
row_text.append(escape_cell(cell_text))
|
||||
# Only add row if at least one cell has content
|
||||
if any(cell for cell in row_text):
|
||||
content_parts.append("\t".join(row_text))
|
||||
|
||||
return "\n".join(content_parts)
|
||||
|
||||
|
||||
def _extract_pptx(file_bytes: bytes) -> str:
|
||||
|
|
@ -982,27 +1081,112 @@ def _extract_pptx(file_bytes: bytes) -> str:
|
|||
|
||||
|
||||
def _extract_xlsx(file_bytes: bytes) -> str:
|
||||
"""Extract XLSX content (synchronous).
|
||||
"""Extract XLSX content in tab-delimited format with clear sheet separation.
|
||||
|
||||
This function processes Excel workbooks and converts them to a structured text format
|
||||
suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
|
||||
separator lines, and special characters are escaped to preserve the tab-delimited structure.
|
||||
|
||||
Features:
|
||||
- Each sheet is wrapped with '====================' separators for visual distinction
|
||||
- Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
|
||||
- Column alignment is preserved across all rows to maintain tabular structure
|
||||
- Empty rows are preserved as blank lines to maintain row structure
|
||||
- Uses sheet.max_column to determine column width efficiently
|
||||
|
||||
Args:
|
||||
file_bytes: XLSX file content as bytes
|
||||
|
||||
Returns:
|
||||
str: Extracted text content
|
||||
str: Extracted text content with all sheets in tab-delimited format.
|
||||
Format: Sheet separators, sheet name, then tab-delimited rows.
|
||||
|
||||
Example output:
|
||||
==================== Sheet: Data ====================
|
||||
Name\tAge\tCity
|
||||
Alice\t30\tNew York
|
||||
Bob\t25\tLondon
|
||||
|
||||
==================== Sheet: Summary ====================
|
||||
Total\t2
|
||||
====================
|
||||
"""
|
||||
from openpyxl import load_workbook # type: ignore
|
||||
|
||||
xlsx_file = BytesIO(file_bytes)
|
||||
wb = load_workbook(xlsx_file)
|
||||
content = ""
|
||||
for sheet in wb:
|
||||
content += f"Sheet: {sheet.title}\n"
|
||||
|
||||
def escape_cell(cell_value: str | int | float | None) -> str:
|
||||
"""Escape characters that would break tab-delimited layout.
|
||||
|
||||
Escape order is critical: backslashes first, then tabs/newlines.
|
||||
This prevents double-escaping issues.
|
||||
|
||||
Args:
|
||||
cell_value: The cell value to escape (can be None, str, int, or float)
|
||||
|
||||
Returns:
|
||||
str: Escaped cell value safe for tab-delimited format
|
||||
"""
|
||||
if cell_value is None:
|
||||
return ""
|
||||
text = str(cell_value)
|
||||
# CRITICAL: Escape backslash first to avoid double-escaping
|
||||
return (
|
||||
text.replace("\\", "\\\\") # Must be first: \ -> \\
|
||||
.replace("\t", "\\t") # Tab -> \t (visible)
|
||||
.replace("\r\n", "\\n") # Windows newline -> \n
|
||||
.replace("\r", "\\n") # Mac newline -> \n
|
||||
.replace("\n", "\\n") # Unix newline -> \n
|
||||
)
|
||||
|
||||
def escape_sheet_title(title: str) -> str:
|
||||
"""Escape sheet title to prevent formatting issues in separators.
|
||||
|
||||
Args:
|
||||
title: Original sheet title
|
||||
|
||||
Returns:
|
||||
str: Sanitized sheet title with tabs/newlines replaced
|
||||
"""
|
||||
return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")
|
||||
|
||||
content_parts: list[str] = []
|
||||
sheet_separator = "=" * 20
|
||||
|
||||
for idx, sheet in enumerate(wb):
|
||||
if idx > 0:
|
||||
content_parts.append("") # Blank line between sheets for readability
|
||||
|
||||
# Escape sheet title to handle edge cases with special characters
|
||||
safe_title = escape_sheet_title(sheet.title)
|
||||
content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")
|
||||
|
||||
# Use sheet.max_column to get the maximum column width directly
|
||||
max_columns = sheet.max_column if sheet.max_column else 0
|
||||
|
||||
# Extract rows with consistent width to preserve column alignment
|
||||
for row in sheet.iter_rows(values_only=True):
|
||||
content += (
|
||||
"\t".join(str(cell) if cell is not None else "" for cell in row) + "\n"
|
||||
)
|
||||
content += "\n"
|
||||
return content
|
||||
row_parts = []
|
||||
|
||||
# Build row up to max_columns width
|
||||
for idx in range(max_columns):
|
||||
if idx < len(row):
|
||||
row_parts.append(escape_cell(row[idx]))
|
||||
else:
|
||||
row_parts.append("") # Pad short rows
|
||||
|
||||
# Check if row is completely empty
|
||||
if all(part == "" for part in row_parts):
|
||||
# Preserve empty rows as blank lines (maintains row structure)
|
||||
content_parts.append("")
|
||||
else:
|
||||
# Join all columns to maintain consistent column count
|
||||
content_parts.append("\t".join(row_parts))
|
||||
|
||||
# Final separator for symmetry (makes parsing easier)
|
||||
content_parts.append(sheet_separator)
|
||||
return "\n".join(content_parts)
|
||||
|
||||
|
||||
async def pipeline_enqueue_file(
|
||||
|
|
@ -1178,36 +1362,25 @@ async def pipeline_enqueue_file(
|
|||
# Try DOCLING first if configured and available
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and DOCLING_AVAILABLE
|
||||
and _is_docling_available()
|
||||
):
|
||||
content = await asyncio.to_thread(
|
||||
_convert_with_docling, file_path
|
||||
)
|
||||
else:
|
||||
<<<<<<< HEAD
|
||||
<<<<<<< HEAD
|
||||
if not pm.is_installed("pypdf2"): # type: ignore
|
||||
pm.install("pypdf2")
|
||||
from PyPDF2 import PdfReader # type: ignore
|
||||
from io import BytesIO
|
||||
|
||||
pdf_file = BytesIO(file)
|
||||
reader = PdfReader(pdf_file)
|
||||
for page in reader.pages:
|
||||
content += page.extract_text() + "\n"
|
||||
=======
|
||||
=======
|
||||
>>>>>>> 69a0b74c (refactor: move document deps to api group, remove dynamic imports)
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and not _is_docling_available()
|
||||
):
|
||||
logger.warning(
|
||||
f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
|
||||
)
|
||||
# Use pypdf (non-blocking via to_thread)
|
||||
content = await asyncio.to_thread(
|
||||
_extract_pdf_pypdf,
|
||||
file,
|
||||
global_args.pdf_decrypt_password,
|
||||
)
|
||||
<<<<<<< HEAD
|
||||
>>>>>>> 4b31942e (refactor: move document deps to api group, remove dynamic imports)
|
||||
=======
|
||||
>>>>>>> 69a0b74c (refactor: move document deps to api group, remove dynamic imports)
|
||||
except Exception as e:
|
||||
error_files = [
|
||||
{
|
||||
|
|
@ -1230,12 +1403,19 @@ async def pipeline_enqueue_file(
|
|||
# Try DOCLING first if configured and available
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and DOCLING_AVAILABLE
|
||||
and _is_docling_available()
|
||||
):
|
||||
content = await asyncio.to_thread(
|
||||
_convert_with_docling, file_path
|
||||
)
|
||||
else:
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and not _is_docling_available()
|
||||
):
|
||||
logger.warning(
|
||||
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
|
||||
)
|
||||
# Use python-docx (non-blocking via to_thread)
|
||||
content = await asyncio.to_thread(_extract_docx, file)
|
||||
except Exception as e:
|
||||
|
|
@ -1260,12 +1440,19 @@ async def pipeline_enqueue_file(
|
|||
# Try DOCLING first if configured and available
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and DOCLING_AVAILABLE
|
||||
and _is_docling_available()
|
||||
):
|
||||
content = await asyncio.to_thread(
|
||||
_convert_with_docling, file_path
|
||||
)
|
||||
else:
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and not _is_docling_available()
|
||||
):
|
||||
logger.warning(
|
||||
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
|
||||
)
|
||||
# Use python-pptx (non-blocking via to_thread)
|
||||
content = await asyncio.to_thread(_extract_pptx, file)
|
||||
except Exception as e:
|
||||
|
|
@ -1290,12 +1477,19 @@ async def pipeline_enqueue_file(
|
|||
# Try DOCLING first if configured and available
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and DOCLING_AVAILABLE
|
||||
and _is_docling_available()
|
||||
):
|
||||
content = await asyncio.to_thread(
|
||||
_convert_with_docling, file_path
|
||||
)
|
||||
else:
|
||||
if (
|
||||
global_args.document_loading_engine == "DOCLING"
|
||||
and not _is_docling_available()
|
||||
):
|
||||
logger.warning(
|
||||
f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
|
||||
)
|
||||
# Use openpyxl (non-blocking via to_thread)
|
||||
content = await asyncio.to_thread(_extract_xlsx, file)
|
||||
except Exception as e:
|
||||
|
|
@ -1595,15 +1789,20 @@ async def background_delete_documents(
|
|||
doc_manager: DocumentManager,
|
||||
doc_ids: List[str],
|
||||
delete_file: bool = False,
|
||||
delete_llm_cache: bool = False,
|
||||
):
|
||||
"""Background task to delete multiple documents"""
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_namespace_data,
|
||||
get_pipeline_status_lock,
|
||||
get_namespace_lock,
|
||||
)
|
||||
|
||||
pipeline_status = await get_namespace_data("pipeline_status")
|
||||
pipeline_status_lock = get_pipeline_status_lock()
|
||||
pipeline_status = await get_namespace_data(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
pipeline_status_lock = get_namespace_lock(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
|
||||
total_docs = len(doc_ids)
|
||||
successful_deletions = []
|
||||
|
|
@ -1630,6 +1829,10 @@ async def background_delete_documents(
|
|||
)
|
||||
# Use slice assignment to clear the list in place
|
||||
pipeline_status["history_messages"][:] = ["Starting document deletion process"]
|
||||
if delete_llm_cache:
|
||||
pipeline_status["history_messages"].append(
|
||||
"LLM cache cleanup requested for this deletion job"
|
||||
)
|
||||
|
||||
try:
|
||||
# Loop through each document ID and delete them one by one
|
||||
|
|
@ -1655,7 +1858,9 @@ async def background_delete_documents(
|
|||
|
||||
file_path = "#"
|
||||
try:
|
||||
result = await rag.adelete_by_doc_id(doc_id)
|
||||
result = await rag.adelete_by_doc_id(
|
||||
doc_id, delete_llm_cache=delete_llm_cache
|
||||
)
|
||||
file_path = (
|
||||
getattr(result, "file_path", "-") if "result" in locals() else "-"
|
||||
)
|
||||
|
|
@ -1897,12 +2102,14 @@ def create_document_routes(
|
|||
# Check if filename already exists in doc_status storage
|
||||
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
file_path = doc_manager.input_dir / safe_filename
|
||||
|
|
@ -1966,14 +2173,30 @@ def create_document_routes(
|
|||
request.file_source
|
||||
)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Check if content already exists by computing content hash (doc_id)
|
||||
sanitized_text = sanitize_text_for_encoding(request.text)
|
||||
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
|
||||
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
|
||||
if existing_doc:
|
||||
# Content already exists, return duplicated with existing track_id
|
||||
status = existing_doc.get("status", "unknown")
|
||||
existing_track_id = existing_doc.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Generate track_id for text insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
|
|
@ -2032,14 +2255,31 @@ def create_document_routes(
|
|||
file_source
|
||||
)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Check if any content already exists by computing content hash (doc_id)
|
||||
for text in request.texts:
|
||||
sanitized_text = sanitize_text_for_encoding(text)
|
||||
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
|
||||
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
|
||||
if existing_doc:
|
||||
# Content already exists, return duplicated with existing track_id
|
||||
status = existing_doc.get("status", "unknown")
|
||||
existing_track_id = existing_doc.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Generate track_id for texts insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
|
|
@ -2087,12 +2327,16 @@ def create_document_routes(
|
|||
"""
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_namespace_data,
|
||||
get_pipeline_status_lock,
|
||||
get_namespace_lock,
|
||||
)
|
||||
|
||||
# Get pipeline status and lock
|
||||
pipeline_status = await get_namespace_data("pipeline_status")
|
||||
pipeline_status_lock = get_pipeline_status_lock()
|
||||
pipeline_status = await get_namespace_data(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
pipeline_status_lock = get_namespace_lock(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
|
||||
# Check and set status with lock
|
||||
async with pipeline_status_lock:
|
||||
|
|
@ -2128,6 +2372,8 @@ def create_document_routes(
|
|||
rag.full_docs,
|
||||
rag.full_entities,
|
||||
rag.full_relations,
|
||||
rag.entity_chunks,
|
||||
rag.relation_chunks,
|
||||
rag.entities_vdb,
|
||||
rag.relationships_vdb,
|
||||
rag.chunks_vdb,
|
||||
|
|
@ -2281,13 +2527,19 @@ def create_document_routes(
|
|||
try:
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_namespace_data,
|
||||
get_namespace_lock,
|
||||
get_all_update_flags_status,
|
||||
)
|
||||
|
||||
pipeline_status = await get_namespace_data("pipeline_status")
|
||||
pipeline_status = await get_namespace_data(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
pipeline_status_lock = get_namespace_lock(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
|
||||
# Get update flags status for all namespaces
|
||||
update_status = await get_all_update_flags_status()
|
||||
update_status = await get_all_update_flags_status(workspace=rag.workspace)
|
||||
|
||||
# Convert MutableBoolean objects to regular boolean values
|
||||
processed_update_status = {}
|
||||
|
|
@ -2301,8 +2553,9 @@ def create_document_routes(
|
|||
processed_flags.append(bool(flag))
|
||||
processed_update_status[namespace] = processed_flags
|
||||
|
||||
# Convert to regular dict if it's a Manager.dict
|
||||
status_dict = dict(pipeline_status)
|
||||
async with pipeline_status_lock:
|
||||
# Convert to regular dict if it's a Manager.dict
|
||||
status_dict = dict(pipeline_status)
|
||||
|
||||
# Add processed update_status to the status dictionary
|
||||
status_dict["update_status"] = processed_update_status
|
||||
|
|
@ -2467,21 +2720,20 @@ def create_document_routes(
|
|||
Delete documents and all their associated data by their IDs using background processing.
|
||||
|
||||
Deletes specific documents and all their associated data, including their status,
|
||||
text chunks, vector embeddings, and any related graph data.
|
||||
text chunks, vector embeddings, and any related graph data. When requested,
|
||||
cached LLM extraction responses are removed after graph deletion/rebuild completes.
|
||||
The deletion process runs in the background to avoid blocking the client connection.
|
||||
It is disabled when llm cache for entity extraction is disabled.
|
||||
|
||||
This operation is irreversible and will interact with the pipeline status.
|
||||
|
||||
Args:
|
||||
delete_request (DeleteDocRequest): The request containing the document IDs and delete_file options.
|
||||
delete_request (DeleteDocRequest): The request containing the document IDs and deletion options.
|
||||
background_tasks: FastAPI BackgroundTasks for async processing
|
||||
|
||||
Returns:
|
||||
DeleteDocByIdResponse: The result of the deletion operation.
|
||||
- status="deletion_started": The document deletion has been initiated in the background.
|
||||
- status="busy": The pipeline is busy with another operation.
|
||||
- status="not_allowed": Operation not allowed when LLM cache for entity extraction is disabled.
|
||||
|
||||
Raises:
|
||||
HTTPException:
|
||||
|
|
@ -2489,27 +2741,27 @@ def create_document_routes(
|
|||
"""
|
||||
doc_ids = delete_request.doc_ids
|
||||
|
||||
# The rag object is initialized from the server startup args,
|
||||
# so we can access its properties here.
|
||||
if not rag.enable_llm_cache_for_entity_extract:
|
||||
return DeleteDocByIdResponse(
|
||||
status="not_allowed",
|
||||
message="Operation not allowed when LLM cache for entity extraction is disabled.",
|
||||
doc_id=", ".join(delete_request.doc_ids),
|
||||
try:
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_namespace_data,
|
||||
get_namespace_lock,
|
||||
)
|
||||
|
||||
try:
|
||||
from lightrag.kg.shared_storage import get_namespace_data
|
||||
pipeline_status = await get_namespace_data(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
pipeline_status_lock = get_namespace_lock(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
|
||||
pipeline_status = await get_namespace_data("pipeline_status")
|
||||
|
||||
# Check if pipeline is busy
|
||||
if pipeline_status.get("busy", False):
|
||||
return DeleteDocByIdResponse(
|
||||
status="busy",
|
||||
message="Cannot delete documents while pipeline is busy",
|
||||
doc_id=", ".join(doc_ids),
|
||||
)
|
||||
# Check if pipeline is busy with proper lock
|
||||
async with pipeline_status_lock:
|
||||
if pipeline_status.get("busy", False):
|
||||
return DeleteDocByIdResponse(
|
||||
status="busy",
|
||||
message="Cannot delete documents while pipeline is busy",
|
||||
doc_id=", ".join(doc_ids),
|
||||
)
|
||||
|
||||
# Add deletion task to background tasks
|
||||
background_tasks.add_task(
|
||||
|
|
@ -2518,6 +2770,7 @@ def create_document_routes(
|
|||
doc_manager,
|
||||
doc_ids,
|
||||
delete_request.delete_file,
|
||||
delete_request.delete_llm_cache,
|
||||
)
|
||||
|
||||
return DeleteDocByIdResponse(
|
||||
|
|
@ -2845,29 +3098,27 @@ def create_document_routes(
|
|||
This is useful for recovering from server crashes, network errors, LLM service
|
||||
outages, or other temporary failures that caused document processing to fail.
|
||||
|
||||
The processing happens in the background and can be monitored using the
|
||||
returned track_id or by checking the pipeline status.
|
||||
The processing happens in the background and can be monitored by checking the
|
||||
pipeline status. The reprocessed documents retain their original track_id from
|
||||
initial upload, so use their original track_id to monitor progress.
|
||||
|
||||
Returns:
|
||||
ReprocessResponse: Response with status, message, and track_id
|
||||
ReprocessResponse: Response with status and message.
|
||||
track_id is always empty string because reprocessed documents retain
|
||||
their original track_id from initial upload.
|
||||
|
||||
Raises:
|
||||
HTTPException: If an error occurs while initiating reprocessing (500).
|
||||
"""
|
||||
try:
|
||||
# Generate track_id with "retry" prefix for retry operation
|
||||
track_id = generate_track_id("retry")
|
||||
|
||||
# Start the reprocessing in the background
|
||||
# Note: Reprocessed documents retain their original track_id from initial upload
|
||||
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
|
||||
logger.info(
|
||||
f"Reprocessing of failed documents initiated with track_id: {track_id}"
|
||||
)
|
||||
logger.info("Reprocessing of failed documents initiated")
|
||||
|
||||
return ReprocessResponse(
|
||||
status="reprocessing_started",
|
||||
message="Reprocessing of failed documents has been initiated in background",
|
||||
track_id=track_id,
|
||||
message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -2875,4 +3126,67 @@ def create_document_routes(
|
|||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.post(
|
||||
"/cancel_pipeline",
|
||||
response_model=CancelPipelineResponse,
|
||||
dependencies=[Depends(combined_auth)],
|
||||
)
|
||||
async def cancel_pipeline():
|
||||
"""
|
||||
Request cancellation of the currently running pipeline.
|
||||
|
||||
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
|
||||
1. Check this flag at key processing points
|
||||
2. Stop processing new documents
|
||||
3. Cancel all running document processing tasks
|
||||
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
|
||||
|
||||
The cancellation is graceful and ensures data consistency. Documents that have
|
||||
completed processing will remain in PROCESSED status.
|
||||
|
||||
Returns:
|
||||
CancelPipelineResponse: Response with status and message
|
||||
- status="cancellation_requested": Cancellation flag has been set
|
||||
- status="not_busy": Pipeline is not currently running
|
||||
|
||||
Raises:
|
||||
HTTPException: If an error occurs while setting cancellation flag (500).
|
||||
"""
|
||||
try:
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_namespace_data,
|
||||
get_namespace_lock,
|
||||
)
|
||||
|
||||
pipeline_status = await get_namespace_data(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
pipeline_status_lock = get_namespace_lock(
|
||||
"pipeline_status", workspace=rag.workspace
|
||||
)
|
||||
|
||||
async with pipeline_status_lock:
|
||||
if not pipeline_status.get("busy", False):
|
||||
return CancelPipelineResponse(
|
||||
status="not_busy",
|
||||
message="Pipeline is not currently running. No cancellation needed.",
|
||||
)
|
||||
|
||||
# Set cancellation flag
|
||||
pipeline_status["cancellation_requested"] = True
|
||||
cancel_msg = "Pipeline cancellation requested by user"
|
||||
logger.info(cancel_msg)
|
||||
pipeline_status["latest_message"] = cancel_msg
|
||||
pipeline_status["history_messages"].append(cancel_msg)
|
||||
|
||||
return CancelPipelineResponse(
|
||||
status="cancellation_requested",
|
||||
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
return router
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue