diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 85183bbd..d906aa5c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -24,7 +24,11 @@ from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus -from lightrag.utils import generate_track_id +from lightrag.utils import ( + generate_track_id, + compute_mdhash_id, + sanitize_text_for_encoding, +) from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args @@ -159,7 +163,7 @@ class ReprocessResponse(BaseModel): Attributes: status: Status of the reprocessing operation message: Message describing the operation result - track_id: Tracking ID for monitoring reprocessing progress + track_id: Always empty string. Reprocessed documents retain their original track_id. """ status: Literal["reprocessing_started"] = Field( @@ -167,7 +171,8 @@ class ReprocessResponse(BaseModel): ) message: str = Field(description="Human-readable message describing the operation") track_id: str = Field( - description="Tracking ID for monitoring reprocessing progress" + default="", + description="Always empty string. Reprocessed documents retain their original track_id from initial upload.", ) class Config: @@ -175,7 +180,7 @@ class ReprocessResponse(BaseModel): "example": { "status": "reprocessing_started", "message": "Reprocessing of failed documents has been initiated in background", - "track_id": "retry_20250729_170612_def456", + "track_id": "", } } @@ -2097,12 +2102,14 @@ def create_document_routes( # Check if filename already exists in doc_status storage existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) file_path = doc_manager.input_dir / safe_filename @@ -2166,14 +2173,30 @@ def create_document_routes( request.file_source ) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) + # Check if content already exists by computing content hash (doc_id) + sanitized_text = sanitize_text_for_encoding(request.text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + existing_doc = await rag.doc_status.get_by_id(content_doc_id) + if existing_doc: + # Content already exists, return duplicated with existing track_id + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + track_id=existing_track_id, + ) + # Generate track_id for text insertion track_id = generate_track_id("insert") @@ -2232,14 +2255,31 @@ def create_document_routes( file_source ) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{file_source}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) + # Check if any content already exists by computing content hash (doc_id) + for text in request.texts: + sanitized_text = sanitize_text_for_encoding(text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + existing_doc = await rag.doc_status.get_by_id(content_doc_id) + if existing_doc: + # Content already exists, return duplicated with existing track_id + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + track_id=existing_track_id, + ) + # Generate track_id for texts insertion track_id = generate_track_id("insert") @@ -3058,29 +3098,27 @@ def create_document_routes( This is useful for recovering from server crashes, network errors, LLM service outages, or other temporary failures that caused document processing to fail. - The processing happens in the background and can be monitored using the - returned track_id or by checking the pipeline status. + The processing happens in the background and can be monitored by checking the + pipeline status. The reprocessed documents retain their original track_id from + initial upload, so use their original track_id to monitor progress. Returns: - ReprocessResponse: Response with status, message, and track_id + ReprocessResponse: Response with status and message. + track_id is always empty string because reprocessed documents retain + their original track_id from initial upload. Raises: HTTPException: If an error occurs while initiating reprocessing (500). """ try: - # Generate track_id with "retry" prefix for retry operation - track_id = generate_track_id("retry") - # Start the reprocessing in the background + # Note: Reprocessed documents retain their original track_id from initial upload background_tasks.add_task(rag.apipeline_process_enqueue_documents) - logger.info( - f"Reprocessing of failed documents initiated with track_id: {track_id}" - ) + logger.info("Reprocessing of failed documents initiated") return ReprocessResponse( status="reprocessing_started", - message="Reprocessing of failed documents has been initiated in background", - track_id=track_id, + message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.", ) except Exception as e: