Fix duplicate document responses to return original track_id

- Return existing track_id for duplicates
- Remove track_id generation in reprocess
- Update reprocess response documentation
- Clarify track_id behavior in comments
- Update API response examples
This commit is contained in:
yangdx 2025-12-02 14:32:28 +08:00
parent 381ddfffd4
commit 8d28b95966

View file

@ -159,7 +159,7 @@ class ReprocessResponse(BaseModel):
Attributes: Attributes:
status: Status of the reprocessing operation status: Status of the reprocessing operation
message: Message describing the operation result message: Message describing the operation result
track_id: Tracking ID for monitoring reprocessing progress track_id: Always empty string. Reprocessed documents retain their original track_id.
""" """
status: Literal["reprocessing_started"] = Field( status: Literal["reprocessing_started"] = Field(
@ -167,7 +167,8 @@ class ReprocessResponse(BaseModel):
) )
message: str = Field(description="Human-readable message describing the operation") message: str = Field(description="Human-readable message describing the operation")
track_id: str = Field( track_id: str = Field(
description="Tracking ID for monitoring reprocessing progress" default="",
description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
) )
class Config: class Config:
@ -175,7 +176,7 @@ class ReprocessResponse(BaseModel):
"example": { "example": {
"status": "reprocessing_started", "status": "reprocessing_started",
"message": "Reprocessing of failed documents has been initiated in background", "message": "Reprocessing of failed documents has been initiated in background",
"track_id": "retry_20250729_170612_def456", "track_id": "",
} }
} }
@ -2097,12 +2098,14 @@ def create_document_routes(
# Check if filename already exists in doc_status storage # Check if filename already exists in doc_status storage
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
if existing_doc_data: if existing_doc_data:
# Get document status information for error message # Get document status and track_id from existing document
status = existing_doc_data.get("status", "unknown") status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse( return InsertResponse(
status="duplicated", status="duplicated",
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
track_id="", track_id=existing_track_id,
) )
file_path = doc_manager.input_dir / safe_filename file_path = doc_manager.input_dir / safe_filename
@ -2166,12 +2169,14 @@ def create_document_routes(
request.file_source request.file_source
) )
if existing_doc_data: if existing_doc_data:
# Get document status information for error message # Get document status and track_id from existing document
status = existing_doc_data.get("status", "unknown") status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse( return InsertResponse(
status="duplicated", status="duplicated",
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
track_id="", track_id=existing_track_id,
) )
# Generate track_id for text insertion # Generate track_id for text insertion
@ -2232,12 +2237,14 @@ def create_document_routes(
file_source file_source
) )
if existing_doc_data: if existing_doc_data:
# Get document status information for error message # Get document status and track_id from existing document
status = existing_doc_data.get("status", "unknown") status = existing_doc_data.get("status", "unknown")
# Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id)
existing_track_id = existing_doc_data.get("track_id") or ""
return InsertResponse( return InsertResponse(
status="duplicated", status="duplicated",
message=f"File source '{file_source}' already exists in document storage (Status: {status}).", message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
track_id="", track_id=existing_track_id,
) )
# Generate track_id for texts insertion # Generate track_id for texts insertion
@ -3058,29 +3065,27 @@ def create_document_routes(
This is useful for recovering from server crashes, network errors, LLM service This is useful for recovering from server crashes, network errors, LLM service
outages, or other temporary failures that caused document processing to fail. outages, or other temporary failures that caused document processing to fail.
The processing happens in the background and can be monitored using the The processing happens in the background and can be monitored by checking the
returned track_id or by checking the pipeline status. pipeline status. The reprocessed documents retain their original track_id from
initial upload, so use their original track_id to monitor progress.
Returns: Returns:
ReprocessResponse: Response with status, message, and track_id ReprocessResponse: Response with status and message.
track_id is always empty string because reprocessed documents retain
their original track_id from initial upload.
Raises: Raises:
HTTPException: If an error occurs while initiating reprocessing (500). HTTPException: If an error occurs while initiating reprocessing (500).
""" """
try: try:
# Generate track_id with "retry" prefix for retry operation
track_id = generate_track_id("retry")
# Start the reprocessing in the background # Start the reprocessing in the background
# Note: Reprocessed documents retain their original track_id from initial upload
background_tasks.add_task(rag.apipeline_process_enqueue_documents) background_tasks.add_task(rag.apipeline_process_enqueue_documents)
logger.info( logger.info("Reprocessing of failed documents initiated")
f"Reprocessing of failed documents initiated with track_id: {track_id}"
)
return ReprocessResponse( return ReprocessResponse(
status="reprocessing_started", status="reprocessing_started",
message="Reprocessing of failed documents has been initiated in background", message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
track_id=track_id,
) )
except Exception as e: except Exception as e: