This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:29 +08:00
parent 88c78625f8
commit 7fa3cab355
4 changed files with 254 additions and 720 deletions

View file

@ -3,15 +3,14 @@ This module contains all document-related routes for the LightRAG API.
"""
import asyncio
from functools import lru_cache
from lightrag.utils import logger, get_pinyin_sort_key
import aiofiles
import shutil
import traceback
import pipmaster as pm
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from io import BytesIO
from fastapi import (
APIRouter,
BackgroundTasks,
@ -24,33 +23,11 @@ from pydantic import BaseModel, Field, field_validator
from lightrag import LightRAG
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
from lightrag.utils import (
generate_track_id,
compute_mdhash_id,
sanitize_text_for_encoding,
)
from lightrag.utils import generate_track_id
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@lru_cache(maxsize=1)
def _is_docling_available() -> bool:
"""Check if docling is available (cached check).
This function uses lru_cache to avoid repeated import attempts.
The result is cached after the first call.
Returns:
bool: True if docling is available, False otherwise
"""
try:
import docling # noqa: F401 # type: ignore[import-not-found]
return True
except ImportError:
return False
# Function to format datetime to ISO format string with timezone information
def format_datetime(dt: Any) -> Optional[str]:
"""Format datetime to ISO format string with timezone information
@ -163,7 +140,7 @@ class ReprocessResponse(BaseModel):
Attributes:
status: Status of the reprocessing operation
message: Message describing the operation result
track_id: Always empty string. Reprocessed documents retain their original track_id.
track_id: Tracking ID for monitoring reprocessing progress
"""
status: Literal["reprocessing_started"] = Field(
@ -171,8 +148,7 @@ class ReprocessResponse(BaseModel):
)
message: str = Field(description="Human-readable message describing the operation")
track_id: str = Field(
default="",
description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
description="Tracking ID for monitoring reprocessing progress"
)
class Config:
@ -180,29 +156,7 @@ class ReprocessResponse(BaseModel):
"example": {
"status": "reprocessing_started",
"message": "Reprocessing of failed documents has been initiated in background",
"track_id": "",
}
}
class CancelPipelineResponse(BaseModel):
"""Response model for pipeline cancellation operation
Attributes:
status: Status of the cancellation request
message: Message describing the operation result
"""
status: Literal["cancellation_requested", "not_busy"] = Field(
description="Status of the cancellation request"
)
message: str = Field(description="Human-readable message describing the operation")
class Config:
json_schema_extra = {
"example": {
"status": "cancellation_requested",
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
"track_id": "retry_20250729_170612_def456",
}
}
@ -504,7 +458,7 @@ class DocsStatusesResponse(BaseModel):
"id": "doc_789",
"content_summary": "Document pending final indexing",
"content_length": 7200,
"status": "preprocessed",
"status": "multimodal_processed",
"created_at": "2025-03-31T09:30:00",
"updated_at": "2025-03-31T09:35:00",
"track_id": "upload_20250331_093000_xyz789",
@ -903,6 +857,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
Returns:
str: Unique filename (may have numeric suffix added)
"""
from pathlib import Path
import time
original_path = Path(original_name)
@ -925,122 +880,6 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
return f"{base_name}_{timestamp}{extension}"
# Document processing helper functions (synchronous)
# These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop
def _convert_with_docling(file_path: Path) -> str:
"""Convert document using docling (synchronous).
Args:
file_path: Path to the document file
Returns:
str: Extracted markdown content
"""
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()
def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
"""Extract PDF content using pypdf (synchronous).
Args:
file_bytes: PDF file content as bytes
password: Optional password for encrypted PDFs
Returns:
str: Extracted text content
Raises:
Exception: If PDF is encrypted and password is incorrect or missing
"""
from pypdf import PdfReader # type: ignore
pdf_file = BytesIO(file_bytes)
reader = PdfReader(pdf_file)
# Check if PDF is encrypted
if reader.is_encrypted:
if not password:
raise Exception("PDF is encrypted but no password provided")
decrypt_result = reader.decrypt(password)
if decrypt_result == 0:
raise Exception("Incorrect PDF password")
# Extract text from all pages
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
return content
def _extract_docx(file_bytes: bytes) -> str:
"""Extract DOCX content (synchronous).
Args:
file_bytes: DOCX file content as bytes
Returns:
str: Extracted text content
"""
from docx import Document # type: ignore
docx_file = BytesIO(file_bytes)
doc = Document(docx_file)
return "\n".join([paragraph.text for paragraph in doc.paragraphs])
def _extract_pptx(file_bytes: bytes) -> str:
"""Extract PPTX content (synchronous).
Args:
file_bytes: PPTX file content as bytes
Returns:
str: Extracted text content
"""
from pptx import Presentation # type: ignore
pptx_file = BytesIO(file_bytes)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
return content
def _extract_xlsx(file_bytes: bytes) -> str:
"""Extract XLSX content (synchronous).
Args:
file_bytes: XLSX file content as bytes
Returns:
str: Extracted text content
"""
from openpyxl import load_workbook # type: ignore
xlsx_file = BytesIO(file_bytes)
wb = load_workbook(xlsx_file)
content = ""
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(str(cell) if cell is not None else "" for cell in row) + "\n"
)
content += "\n"
return content
async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
) -> tuple[bool, str]:
@ -1211,28 +1050,24 @@ async def pipeline_enqueue_file(
case ".pdf":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
)
# Use pypdf (non-blocking via to_thread)
content = await asyncio.to_thread(
_extract_pdf_pypdf,
file,
global_args.pdf_decrypt_password,
)
if not pm.is_installed("pypdf2"): # type: ignore
pm.install("pypdf2")
from PyPDF2 import PdfReader # type: ignore
from io import BytesIO
pdf_file = BytesIO(file)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
except Exception as e:
error_files = [
{
@ -1252,24 +1087,28 @@ async def pipeline_enqueue_file(
case ".docx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
)
# Use python-docx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_docx, file)
if not pm.is_installed("python-docx"): # type: ignore
try:
pm.install("python-docx")
except Exception:
pm.install("docx")
from docx import Document # type: ignore
from io import BytesIO
docx_file = BytesIO(file)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
except Exception as e:
error_files = [
{
@ -1289,24 +1128,26 @@ async def pipeline_enqueue_file(
case ".pptx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
)
# Use python-pptx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_pptx, file)
if not pm.is_installed("python-pptx"): # type: ignore
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
pptx_file = BytesIO(file)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
except Exception as e:
error_files = [
{
@ -1326,24 +1167,33 @@ async def pipeline_enqueue_file(
case ".xlsx":
try:
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
)
# Use openpyxl (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_xlsx, file)
if not pm.is_installed("openpyxl"): # type: ignore
pm.install("openpyxl")
from openpyxl import load_workbook # type: ignore
from io import BytesIO
xlsx_file = BytesIO(file)
wb = load_workbook(xlsx_file)
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(
str(cell) if cell is not None else ""
for cell in row
)
+ "\n"
)
content += "\n"
except Exception as e:
error_files = [
{
@ -1646,11 +1496,11 @@ async def background_delete_documents(
"""Background task to delete multiple documents"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_pipeline_status_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
total_docs = len(doc_ids)
successful_deletions = []
@ -1684,19 +1534,7 @@ async def background_delete_documents(
try:
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
# Check for cancellation at the start of each document deletion
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
# Add remaining documents to failed list with cancellation reason
failed_deletions.extend(
doc_ids[i - 1 :]
) # i-1 because enumerate starts at 1
break # Exit the loop, remaining documents unchanged
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i
@ -1859,10 +1697,6 @@ async def background_delete_documents(
# Final summary and check for pending requests
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["pending_requests"] = False # Reset pending requests flag
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
@ -1949,14 +1783,12 @@ def create_document_routes(
# Check if filename already exists in doc_status storage
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
if existing_doc_data:
# Get document status and track_id from existing document
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
track_id="",
)
file_path = doc_manager.input_dir / safe_filename
@ -2020,30 +1852,14 @@ def create_document_routes(
request.file_source
)
if existing_doc_data:
# Get document status and track_id from existing document
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
track_id="",
)
# Check if content already exists by computing content hash (doc_id)
sanitized_text = sanitize_text_for_encoding(request.text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
if existing_doc:
# Content already exists, return duplicated with existing track_id
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
track_id=existing_track_id,
)
# Generate track_id for text insertion
track_id = generate_track_id("insert")
@ -2102,31 +1918,14 @@ def create_document_routes(
file_source
)
if existing_doc_data:
# Get document status and track_id from existing document
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
track_id="",
)
# Check if any content already exists by computing content hash (doc_id)
for text in request.texts:
sanitized_text = sanitize_text_for_encoding(text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
if existing_doc:
# Content already exists, return duplicated with existing track_id
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
track_id=existing_track_id,
)
# Generate track_id for texts insertion
track_id = generate_track_id("insert")
@ -2174,12 +1973,12 @@ def create_document_routes(
"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_pipeline_status_lock,
)
# Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Check and set status with lock
async with pipeline_status_lock:
@ -2370,15 +2169,13 @@ def create_document_routes(
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_all_update_flags_status,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")
# Get update flags status for all namespaces
update_status = await get_all_update_flags_status(workspace=rag.workspace)
update_status = await get_all_update_flags_status()
# Convert MutableBoolean objects to regular boolean values
processed_update_status = {}
@ -2392,9 +2189,8 @@ def create_document_routes(
processed_flags.append(bool(flag))
processed_update_status[namespace] = processed_flags
async with pipeline_status_lock:
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Add processed update_status to the status dictionary
status_dict["update_status"] = processed_update_status
@ -2434,7 +2230,7 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
# TODO: Deprecated, use /documents/paginated instead
# TODO: Deprecated
@router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
@ -2581,22 +2377,17 @@ def create_document_routes(
doc_ids = delete_request.doc_ids
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")
# Check if pipeline is busy with proper lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Check if pipeline is busy
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Add deletion task to background tasks
background_tasks.add_task(
@ -2933,27 +2724,29 @@ def create_document_routes(
This is useful for recovering from server crashes, network errors, LLM service
outages, or other temporary failures that caused document processing to fail.
The processing happens in the background and can be monitored by checking the
pipeline status. The reprocessed documents retain their original track_id from
initial upload, so use their original track_id to monitor progress.
The processing happens in the background and can be monitored using the
returned track_id or by checking the pipeline status.
Returns:
ReprocessResponse: Response with status and message.
track_id is always empty string because reprocessed documents retain
their original track_id from initial upload.
ReprocessResponse: Response with status, message, and track_id
Raises:
HTTPException: If an error occurs while initiating reprocessing (500).
"""
try:
# Generate track_id with "retry" prefix for retry operation
track_id = generate_track_id("retry")
# Start the reprocessing in the background
# Note: Reprocessed documents retain their original track_id from initial upload
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
logger.info("Reprocessing of failed documents initiated")
logger.info(
f"Reprocessing of failed documents initiated with track_id: {track_id}"
)
return ReprocessResponse(
status="reprocessing_started",
message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
message="Reprocessing of failed documents has been initiated in background",
track_id=track_id,
)
except Exception as e:
@ -2961,63 +2754,4 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/cancel_pipeline",
response_model=CancelPipelineResponse,
dependencies=[Depends(combined_auth)],
)
async def cancel_pipeline():
"""
Request cancellation of the currently running pipeline.
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
1. Check this flag at key processing points
2. Stop processing new documents
3. Cancel all running document processing tasks
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
The cancellation is graceful and ensures data consistency. Documents that have
completed processing will remain in PROCESSED status.
Returns:
CancelPipelineResponse: Response with status and message
- status="cancellation_requested": Cancellation flag has been set
- status="not_busy": Pipeline is not currently running
Raises:
HTTPException: If an error occurs while setting cancellation flag (500).
"""
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
return CancelPipelineResponse(
status="not_busy",
message="Pipeline is not currently running. No cancellation needed.",
)
# Set cancellation flag
pipeline_status["cancellation_requested"] = True
cancel_msg = "Pipeline cancellation requested by user"
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
return CancelPipelineResponse(
status="cancellation_requested",
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
)
except Exception as e:
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
return router

View file

@ -3,7 +3,6 @@ from __future__ import annotations
import traceback
import asyncio
import configparser
import inspect
import os
import time
import warnings
@ -13,7 +12,6 @@ from functools import partial
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Iterator,
cast,
@ -22,10 +20,8 @@ from typing import (
Optional,
List,
Dict,
Union,
)
from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException
from lightrag.constants import (
DEFAULT_MAX_GLEANING,
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
@ -90,7 +86,7 @@ from lightrag.operate import (
merge_nodes_and_edges,
kg_query,
naive_query,
rebuild_knowledge_from_chunks,
_rebuild_knowledge_from_chunks,
)
from lightrag.constants import GRAPH_FIELD_SEP
from lightrag.utils import (
@ -246,13 +242,11 @@ class LightRAG:
int,
int,
],
Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]],
List[Dict[str, Any]],
] = field(default_factory=lambda: chunking_by_token_size)
"""
Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization.
@ -262,8 +256,7 @@ class LightRAG:
- `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys:
The function should return a list of dictionaries, where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk.
- `content`: The text content of the chunk.
@ -276,9 +269,6 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations."""
@ -522,28 +512,12 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout,
queue_name="Embedding func",
)(self.embedding_func)
# Initialize embedding_token_limit from embedding_func
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
self.embedding_token_limit = self.embedding_func.max_token_size
else:
self.embedding_token_limit = None
# Initialize all storages
self.key_string_value_json_storage_cls: type[BaseKVStorage] = (
self._get_storage_class(self.kv_storage)
@ -735,7 +709,7 @@ class LightRAG:
async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary"""
async with get_data_init_lock():
async with get_data_init_lock(enable_logging=True):
try:
# Check if migration is needed:
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
@ -1629,7 +1603,6 @@ class LightRAG:
"batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed
"request_pending": False, # Clear any previous request
"cancellation_requested": False, # Initialize cancellation flag
"latest_message": "",
}
)
@ -1646,22 +1619,6 @@ class LightRAG:
try:
# Process documents until no more documents or requests
while True:
# Check for cancellation request at the start of main loop
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
# Clear pending request
pipeline_status["request_pending"] = False
# Celar cancellation flag
pipeline_status["cancellation_requested"] = False
log_message = "Pipeline cancelled by user"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Exit directly, skipping request_pending check
return
if not to_process_docs:
log_message = "All enqueued documents have been processed"
logger.info(log_message)
@ -1724,25 +1681,14 @@ class LightRAG:
semaphore: asyncio.Semaphore,
) -> None:
"""Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore:
nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None
try:
# Check for cancellation before starting document processing
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Get file path from status document
file_path = getattr(
status_doc, "file_path", "unknown_source"
@ -1781,28 +1727,7 @@ class LightRAG:
)
content = content_data["content"]
# Call chunking function, supporting both sync and async implementations
chunking_result = self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
# If result is awaitable, await to get actual result
if inspect.isawaitable(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
@ -1810,7 +1735,14 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk
}
for dp in chunking_result
for dp in self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
}
if not chunks:
@ -1819,11 +1751,6 @@ class LightRAG:
# Record processing start time
processing_start_time = int(time.time())
# Check for cancellation before entity extraction
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@ -1874,33 +1801,20 @@ class LightRAG:
chunks, pipeline_status, pipeline_status_lock
)
)
chunk_results = await entity_relation_task
await entity_relation_task
file_extraction_stage_ok = True
except Exception as e:
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
@ -1910,14 +1824,9 @@ class LightRAG:
if task and not task.done():
task.cancel()
# Persistent llm cache with error handling
# Persistent llm cache
if self.llm_response_cache:
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -1947,16 +1856,8 @@ class LightRAG:
# Concurrency is controlled by keyed lock for individual entities and relationships
if file_extraction_stage_ok:
try:
# Check for cancellation before merge
async with pipeline_status_lock:
if pipeline_status.get(
"cancellation_requested", False
):
raise PipelineCancelledException(
"User cancelled"
)
# Use chunk_results from entity_relation_task
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -2013,38 +1914,22 @@ class LightRAG:
)
except Exception as e:
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache with error handling
# Persistent llm cache
if self.llm_response_cache:
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -2085,19 +1970,7 @@ class LightRAG:
)
# Wait for all document processing to complete
try:
await asyncio.gather(*doc_tasks)
except PipelineCancelledException:
# Cancel all remaining tasks
for task in doc_tasks:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
# Exit directly (document statuses already updated in process_document)
return
await asyncio.gather(*doc_tasks)
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@ -2128,14 +2001,11 @@ class LightRAG:
to_process_docs.update(pending_docs)
finally:
log_message = "Enqueued document processing pipeline stopped"
log_message = "Enqueued document processing pipeline stoped"
logger.info(log_message)
# Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
# Always reset busy status when done or if an exception occurs (with lock)
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
@ -3185,9 +3055,6 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3209,7 +3076,6 @@ class LightRAG:
# Process relationships
for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source")
tgt = edge_data.get("target")
@ -3246,9 +3112,6 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3274,31 +3137,38 @@ class LightRAG:
if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items():
if not remaining:
# Empty entities are deleted alongside graph nodes later
continue
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
entity_delete_ids.add(entity_name)
else:
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items():
if not remaining:
# Empty relations are deleted alongside graph edges later
continue
storage_key = make_relation_chunk_key(*edge_tuple)
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if not remaining:
relation_delete_ids.add(storage_key)
else:
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload)
@ -3325,10 +3195,35 @@ class LightRAG:
logger.error(f"Failed to delete chunks: {e}")
raise Exception(f"Failed to delete document chunks: {e}") from e
# 6. Delete relationships that have no remaining sources
# 6. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# 7. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from relation vdb
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
@ -3344,14 +3239,6 @@ class LightRAG:
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
@ -3362,105 +3249,13 @@ class LightRAG:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
)
# Debug: Check and log all edges before deleting nodes
edges_to_delete = set()
edges_still_exist = 0
for entity, edges in nodes_edges_dict.items():
if edges:
for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = []
for src, tgt in edges_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
)
# Delete from graph (edges will be auto-deleted with nodes)
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
# Delete from vector vdb
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from entity_chunks storage
if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
try:
await rebuild_knowledge_from_chunks(
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -3507,12 +3302,14 @@ class LightRAG:
pipeline_status["history_messages"].append(cache_log_message)
log_message = cache_log_message
except Exception as cache_delete_error:
log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
logger.error(log_message)
logger.error(traceback.format_exc())
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
logger.error(
"Failed to delete LLM cache for document %s: %s",
doc_id,
cache_delete_error,
)
raise Exception(
f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
) from cache_delete_error
return DeletionResult(
status="success",
@ -3678,22 +3475,16 @@ class LightRAG:
)
async def aedit_entity(
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name
Returns:
Dictionary containing updated entity information
@ -3707,21 +3498,14 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
allow_merge,
self.entity_chunks,
self.relation_chunks,
)
def edit_entity(
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]:
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge)
self.aedit_entity(entity_name, updated_data, allow_rename)
)
async def aedit_relation(
@ -3730,7 +3514,6 @@ class LightRAG:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
source_entity: Name of the source entity
@ -3749,7 +3532,6 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
self.relation_chunks,
)
def edit_relation(
@ -3861,8 +3643,6 @@ class LightRAG:
target_entity,
merge_strategy,
target_entity_data,
self.entity_chunks,
self.relation_chunks,
)
def merge_entities(

View file

@ -605,9 +605,13 @@ export const clearCache = async (): Promise<{
return response.data
}
export const deleteDocuments = async (docIds: string[], deleteFile: boolean = false): Promise<DeleteDocResponse> => {
export const deleteDocuments = async (
docIds: string[],
deleteFile: boolean = false,
deleteLLMCache: boolean = false
): Promise<DeleteDocResponse> => {
const response = await axiosInstance.delete('/documents/delete_document', {
data: { doc_ids: docIds, delete_file: deleteFile }
data: { doc_ids: docIds, delete_file: deleteFile, delete_llm_cache: deleteLLMCache }
})
return response.data
}

View file

@ -44,6 +44,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet
const [confirmText, setConfirmText] = useState('')
const [deleteFile, setDeleteFile] = useState(false)
const [isDeleting, setIsDeleting] = useState(false)
const [deleteLLMCache, setDeleteLLMCache] = useState(false)
const isConfirmEnabled = confirmText.toLowerCase() === 'yes' && !isDeleting
// Reset state when dialog closes
@ -51,6 +52,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet
if (!open) {
setConfirmText('')
setDeleteFile(false)
setDeleteLLMCache(false)
setIsDeleting(false)
}
}, [open])
@ -60,7 +62,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet
setIsDeleting(true)
try {
const result = await deleteDocuments(selectedDocIds, deleteFile)
const result = await deleteDocuments(selectedDocIds, deleteFile, deleteLLMCache)
if (result.status === 'deletion_started') {
toast.success(t('documentPanel.deleteDocuments.success', { count: selectedDocIds.length }))
@ -94,7 +96,7 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet
} finally {
setIsDeleting(false)
}
}, [isConfirmEnabled, selectedDocIds, deleteFile, setOpen, t, onDocumentsDeleted])
}, [isConfirmEnabled, selectedDocIds, deleteFile, deleteLLMCache, setOpen, t, onDocumentsDeleted])
return (
<Dialog open={open} onOpenChange={setOpen}>
@ -155,6 +157,20 @@ export default function DeleteDocumentsDialog({ selectedDocIds, onDocumentsDelet
{t('documentPanel.deleteDocuments.deleteFileOption')}
</Label>
</div>
<div className="flex items-center space-x-2">
<input
type="checkbox"
id="delete-llm-cache"
checked={deleteLLMCache}
onChange={(e) => setDeleteLLMCache(e.target.checked)}
disabled={isDeleting}
className="h-4 w-4 text-red-600 focus:ring-red-500 border-gray-300 rounded"
/>
<Label htmlFor="delete-llm-cache" className="text-sm font-medium cursor-pointer">
{t('documentPanel.deleteDocuments.deleteLLMCacheOption')}
</Label>
</div>
</div>
<DialogFooter>