feat: Add endpoint and UI to retry failed documents
Add a new `/documents/reprocess_failed` API endpoint and corresponding UI button to retry processing of failed and pending documents. This addresses a common recovery scenario when document processing fails due to server crashes, network errors, or LLM service outages. Backend changes: - Add ReprocessResponse model with status, message, and track_id fields - Add POST /documents/reprocess_failed endpoint that triggers background reprocessing of FAILED, PENDING, and interrupted PROCESSING documents - Reuses existing apipeline_process_enqueue_documents for consistency - Includes comprehensive docstring and logging for observability Frontend changes: - Add TypeScript types and API function for the new endpoint - Add retry handler with intelligent polling (fast refresh → normal) - Add "Retry Failed" button in Documents page toolbar - Button disabled when pipeline is busy to prevent duplicate operations - Complete i18n support (English and Chinese translations) This feature provides a convenient way to recover from processing failures without requiring a full filesystem rescan.
This commit is contained in:
parent
b9c37bd937
commit
cf2a024e37
5 changed files with 137 additions and 0 deletions
|
|
@ -134,6 +134,33 @@ class ScanResponse(BaseModel):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ReprocessResponse(BaseModel):
|
||||||
|
"""Response model for reprocessing failed documents operation
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
status: Status of the reprocessing operation
|
||||||
|
message: Message describing the operation result
|
||||||
|
track_id: Tracking ID for monitoring reprocessing progress
|
||||||
|
"""
|
||||||
|
|
||||||
|
status: Literal["reprocessing_started"] = Field(
|
||||||
|
description="Status of the reprocessing operation"
|
||||||
|
)
|
||||||
|
message: str = Field(description="Human-readable message describing the operation")
|
||||||
|
track_id: str = Field(
|
||||||
|
description="Tracking ID for monitoring reprocessing progress"
|
||||||
|
)
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
json_schema_extra = {
|
||||||
|
"example": {
|
||||||
|
"status": "reprocessing_started",
|
||||||
|
"message": "Reprocessing of failed documents has been initiated in background",
|
||||||
|
"track_id": "retry_20250729_170612_def456",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class InsertTextRequest(BaseModel):
|
class InsertTextRequest(BaseModel):
|
||||||
"""Request model for inserting a single text document
|
"""Request model for inserting a single text document
|
||||||
|
|
||||||
|
|
@ -2657,4 +2684,52 @@ def create_document_routes(
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/reprocess_failed",
|
||||||
|
response_model=ReprocessResponse,
|
||||||
|
dependencies=[Depends(combined_auth)],
|
||||||
|
)
|
||||||
|
async def reprocess_failed_documents(background_tasks: BackgroundTasks):
|
||||||
|
"""
|
||||||
|
Reprocess failed and pending documents.
|
||||||
|
|
||||||
|
This endpoint triggers the document processing pipeline which automatically
|
||||||
|
picks up and reprocesses documents in the following statuses:
|
||||||
|
- FAILED: Documents that failed during previous processing attempts
|
||||||
|
- PENDING: Documents waiting to be processed
|
||||||
|
- PROCESSING: Documents with abnormally terminated processing (e.g., server crashes)
|
||||||
|
|
||||||
|
This is useful for recovering from server crashes, network errors, LLM service
|
||||||
|
outages, or other temporary failures that caused document processing to fail.
|
||||||
|
|
||||||
|
The processing happens in the background and can be monitored using the
|
||||||
|
returned track_id or by checking the pipeline status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ReprocessResponse: Response with status, message, and track_id
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If an error occurs while initiating reprocessing (500).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Generate track_id with "retry" prefix for retry operation
|
||||||
|
track_id = generate_track_id("retry")
|
||||||
|
|
||||||
|
# Start the reprocessing in the background
|
||||||
|
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
|
||||||
|
logger.info(
|
||||||
|
f"Reprocessing of failed documents initiated with track_id: {track_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return ReprocessResponse(
|
||||||
|
status="reprocessing_started",
|
||||||
|
message="Reprocessing of failed documents has been initiated in background",
|
||||||
|
track_id=track_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error initiating reprocessing of failed documents: {str(e)}")
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
return router
|
return router
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,12 @@ export type ScanResponse = {
|
||||||
track_id: string
|
track_id: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type ReprocessFailedResponse = {
|
||||||
|
status: 'reprocessing_started'
|
||||||
|
message: string
|
||||||
|
track_id: string
|
||||||
|
}
|
||||||
|
|
||||||
export type DeleteDocResponse = {
|
export type DeleteDocResponse = {
|
||||||
status: 'deletion_started' | 'busy' | 'not_allowed'
|
status: 'deletion_started' | 'busy' | 'not_allowed'
|
||||||
message: string
|
message: string
|
||||||
|
|
@ -353,6 +359,11 @@ export const scanNewDocuments = async (): Promise<ScanResponse> => {
|
||||||
return response.data
|
return response.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const reprocessFailedDocuments = async (): Promise<ReprocessFailedResponse> => {
|
||||||
|
const response = await axiosInstance.post('/documents/reprocess_failed')
|
||||||
|
return response.data
|
||||||
|
}
|
||||||
|
|
||||||
export const getDocumentsScanProgress = async (): Promise<LightragDocumentsScanProgress> => {
|
export const getDocumentsScanProgress = async (): Promise<LightragDocumentsScanProgress> => {
|
||||||
const response = await axiosInstance.get('/documents/scan-progress')
|
const response = await axiosInstance.get('/documents/scan-progress')
|
||||||
return response.data
|
return response.data
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import PaginationControls from '@/components/ui/PaginationControls'
|
||||||
|
|
||||||
import {
|
import {
|
||||||
scanNewDocuments,
|
scanNewDocuments,
|
||||||
|
reprocessFailedDocuments,
|
||||||
getDocumentsPaginated,
|
getDocumentsPaginated,
|
||||||
DocsStatusesResponse,
|
DocsStatusesResponse,
|
||||||
DocStatus,
|
DocStatus,
|
||||||
|
|
@ -833,6 +834,42 @@ export default function DocumentManager() {
|
||||||
}
|
}
|
||||||
}, [t, startPollingInterval, currentTab, health, statusCounts])
|
}, [t, startPollingInterval, currentTab, health, statusCounts])
|
||||||
|
|
||||||
|
const retryFailedDocuments = useCallback(async () => {
|
||||||
|
try {
|
||||||
|
// Check if component is still mounted before starting the request
|
||||||
|
if (!isMountedRef.current) return;
|
||||||
|
|
||||||
|
const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
|
||||||
|
|
||||||
|
// Check again if component is still mounted after the request completes
|
||||||
|
if (!isMountedRef.current) return;
|
||||||
|
|
||||||
|
// Note: _track_id is available for future use (e.g., progress tracking)
|
||||||
|
toast.message(message || status);
|
||||||
|
|
||||||
|
// Reset health check timer with 1 second delay to avoid race condition
|
||||||
|
useBackendState.getState().resetHealthCheckTimerDelayed(1000);
|
||||||
|
|
||||||
|
// Start fast refresh with 2-second interval immediately after retry
|
||||||
|
startPollingInterval(2000);
|
||||||
|
|
||||||
|
// Set recovery timer to restore normal polling interval after 15 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
if (isMountedRef.current && currentTab === 'documents' && health) {
|
||||||
|
// Restore intelligent polling interval based on document status
|
||||||
|
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
|
||||||
|
const normalInterval = hasActiveDocuments ? 5000 : 30000;
|
||||||
|
startPollingInterval(normalInterval);
|
||||||
|
}
|
||||||
|
}, 15000); // Restore after 15 seconds
|
||||||
|
} catch (err) {
|
||||||
|
// Only show error if component is still mounted
|
||||||
|
if (isMountedRef.current) {
|
||||||
|
toast.error(errorMessage(err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, [startPollingInterval, currentTab, health, statusCounts])
|
||||||
|
|
||||||
// Handle page size change - update state and save to store
|
// Handle page size change - update state and save to store
|
||||||
const handlePageSizeChange = useCallback((newPageSize: number) => {
|
const handlePageSizeChange = useCallback((newPageSize: number) => {
|
||||||
if (newPageSize === pagination.page_size) return;
|
if (newPageSize === pagination.page_size) return;
|
||||||
|
|
@ -1085,6 +1122,16 @@ export default function DocumentManager() {
|
||||||
>
|
>
|
||||||
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
|
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
|
||||||
</Button>
|
</Button>
|
||||||
|
<Button
|
||||||
|
variant="outline"
|
||||||
|
onClick={retryFailedDocuments}
|
||||||
|
side="bottom"
|
||||||
|
tooltip={t('documentPanel.documentManager.retryFailedTooltip')}
|
||||||
|
size="sm"
|
||||||
|
disabled={pipelineBusy}
|
||||||
|
>
|
||||||
|
<RotateCcwIcon /> {t('documentPanel.documentManager.retryFailedButton')}
|
||||||
|
</Button>
|
||||||
<Button
|
<Button
|
||||||
variant="outline"
|
variant="outline"
|
||||||
onClick={() => setShowPipelineStatus(true)}
|
onClick={() => setShowPipelineStatus(true)}
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,8 @@
|
||||||
"title": "Document Management",
|
"title": "Document Management",
|
||||||
"scanButton": "Scan",
|
"scanButton": "Scan",
|
||||||
"scanTooltip": "Scan documents in input folder",
|
"scanTooltip": "Scan documents in input folder",
|
||||||
|
"retryFailedButton": "Retry Failed",
|
||||||
|
"retryFailedTooltip": "Retry processing all failed documents",
|
||||||
"refreshTooltip": "Reset document list",
|
"refreshTooltip": "Reset document list",
|
||||||
"pipelineStatusButton": "Pipeline Status",
|
"pipelineStatusButton": "Pipeline Status",
|
||||||
"pipelineStatusTooltip": "View pipeline status",
|
"pipelineStatusTooltip": "View pipeline status",
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,8 @@
|
||||||
"title": "文档管理",
|
"title": "文档管理",
|
||||||
"scanButton": "扫描",
|
"scanButton": "扫描",
|
||||||
"scanTooltip": "扫描输入目录中的文档",
|
"scanTooltip": "扫描输入目录中的文档",
|
||||||
|
"retryFailedButton": "重试失败",
|
||||||
|
"retryFailedTooltip": "重新处理所有失败的文档",
|
||||||
"refreshTooltip": "复位文档清单",
|
"refreshTooltip": "复位文档清单",
|
||||||
"pipelineStatusButton": "流水线状态",
|
"pipelineStatusButton": "流水线状态",
|
||||||
"pipelineStatusTooltip": "查看流水线状态",
|
"pipelineStatusTooltip": "查看流水线状态",
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue