Merge pull request #2469 from danielaskdd/fix-track-id
fix: Return existing track_id for duplicate documents in document insertion endpoints
This commit is contained in:
commit
4c775ec508
1 changed files with 59 additions and 21 deletions
|
|
@ -24,7 +24,11 @@ from pydantic import BaseModel, Field, field_validator
|
|||
|
||||
from lightrag import LightRAG
|
||||
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
|
||||
from lightrag.utils import generate_track_id
|
||||
from lightrag.utils import (
|
||||
generate_track_id,
|
||||
compute_mdhash_id,
|
||||
sanitize_text_for_encoding,
|
||||
)
|
||||
from lightrag.api.utils_api import get_combined_auth_dependency
|
||||
from ..config import global_args
|
||||
|
||||
|
|
@ -159,7 +163,7 @@ class ReprocessResponse(BaseModel):
|
|||
Attributes:
|
||||
status: Status of the reprocessing operation
|
||||
message: Message describing the operation result
|
||||
track_id: Tracking ID for monitoring reprocessing progress
|
||||
track_id: Always empty string. Reprocessed documents retain their original track_id.
|
||||
"""
|
||||
|
||||
status: Literal["reprocessing_started"] = Field(
|
||||
|
|
@ -167,7 +171,8 @@ class ReprocessResponse(BaseModel):
|
|||
)
|
||||
message: str = Field(description="Human-readable message describing the operation")
|
||||
track_id: str = Field(
|
||||
description="Tracking ID for monitoring reprocessing progress"
|
||||
default="",
|
||||
description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
|
||||
)
|
||||
|
||||
class Config:
|
||||
|
|
@ -175,7 +180,7 @@ class ReprocessResponse(BaseModel):
|
|||
"example": {
|
||||
"status": "reprocessing_started",
|
||||
"message": "Reprocessing of failed documents has been initiated in background",
|
||||
"track_id": "retry_20250729_170612_def456",
|
||||
"track_id": "",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2097,12 +2102,14 @@ def create_document_routes(
|
|||
# Check if filename already exists in doc_status storage
|
||||
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
file_path = doc_manager.input_dir / safe_filename
|
||||
|
|
@ -2166,14 +2173,30 @@ def create_document_routes(
|
|||
request.file_source
|
||||
)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Check if content already exists by computing content hash (doc_id)
|
||||
sanitized_text = sanitize_text_for_encoding(request.text)
|
||||
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
|
||||
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
|
||||
if existing_doc:
|
||||
# Content already exists, return duplicated with existing track_id
|
||||
status = existing_doc.get("status", "unknown")
|
||||
existing_track_id = existing_doc.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Generate track_id for text insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
|
|
@ -2232,14 +2255,31 @@ def create_document_routes(
|
|||
file_source
|
||||
)
|
||||
if existing_doc_data:
|
||||
# Get document status information for error message
|
||||
# Get document status and track_id from existing document
|
||||
status = existing_doc_data.get("status", "unknown")
|
||||
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
|
||||
existing_track_id = existing_doc_data.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
|
||||
track_id="",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Check if any content already exists by computing content hash (doc_id)
|
||||
for text in request.texts:
|
||||
sanitized_text = sanitize_text_for_encoding(text)
|
||||
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
|
||||
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
|
||||
if existing_doc:
|
||||
# Content already exists, return duplicated with existing track_id
|
||||
status = existing_doc.get("status", "unknown")
|
||||
existing_track_id = existing_doc.get("track_id") or ""
|
||||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
|
||||
track_id=existing_track_id,
|
||||
)
|
||||
|
||||
# Generate track_id for texts insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
|
|
@ -3058,29 +3098,27 @@ def create_document_routes(
|
|||
This is useful for recovering from server crashes, network errors, LLM service
|
||||
outages, or other temporary failures that caused document processing to fail.
|
||||
|
||||
The processing happens in the background and can be monitored using the
|
||||
returned track_id or by checking the pipeline status.
|
||||
The processing happens in the background and can be monitored by checking the
|
||||
pipeline status. The reprocessed documents retain their original track_id from
|
||||
initial upload, so use their original track_id to monitor progress.
|
||||
|
||||
Returns:
|
||||
ReprocessResponse: Response with status, message, and track_id
|
||||
ReprocessResponse: Response with status and message.
|
||||
track_id is always empty string because reprocessed documents retain
|
||||
their original track_id from initial upload.
|
||||
|
||||
Raises:
|
||||
HTTPException: If an error occurs while initiating reprocessing (500).
|
||||
"""
|
||||
try:
|
||||
# Generate track_id with "retry" prefix for retry operation
|
||||
track_id = generate_track_id("retry")
|
||||
|
||||
# Start the reprocessing in the background
|
||||
# Note: Reprocessed documents retain their original track_id from initial upload
|
||||
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
|
||||
logger.info(
|
||||
f"Reprocessing of failed documents initiated with track_id: {track_id}"
|
||||
)
|
||||
logger.info("Reprocessing of failed documents initiated")
|
||||
|
||||
return ReprocessResponse(
|
||||
status="reprocessing_started",
|
||||
message="Reprocessing of failed documents has been initiated in background",
|
||||
track_id=track_id,
|
||||
message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue