diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 86b08547..3a0da425 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -433,6 +433,64 @@ class DeleteRelationRequest(BaseModel): return entity_name.strip() +class ResetDocumentStatusRequest(BaseModel): + """Request model for resetting document status to PENDING for retry. + + Attributes: + doc_ids: List of document IDs to reset + target_status: The status to reset documents to (default: PENDING) + """ + doc_ids: List[str] = Field(..., description="The IDs of the documents to reset.") + target_status: Literal["pending", "failed"] = Field( + default="pending", + description="Target status to set. Use 'pending' for retry, 'failed' to mark as failed." + ) + + @field_validator("doc_ids", mode="after") + @classmethod + def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]: + if not doc_ids: + raise ValueError("Document IDs list cannot be empty") + validated_ids = [] + for doc_id in doc_ids: + if not doc_id or not doc_id.strip(): + raise ValueError("Document ID cannot be empty") + validated_ids.append(doc_id.strip()) + if len(validated_ids) != len(set(validated_ids)): + raise ValueError("Document IDs must be unique") + return validated_ids + + +class ResetDocumentStatusResponse(BaseModel): + """Response model for reset document status operation. + + Attributes: + status: Status of the operation + message: Human-readable message + reset_count: Number of documents successfully reset + failed_ids: List of document IDs that failed to reset + """ + status: Literal["success", "partial", "failed"] = Field( + description="Status of the reset operation" + ) + message: str = Field(description="Human-readable message describing the operation") + reset_count: int = Field(description="Number of documents successfully reset") + failed_ids: List[str] = Field( + default_factory=list, + description="List of document IDs that failed to reset" + ) + + class Config: + json_schema_extra = { + "example": { + "status": "success", + "message": "Successfully reset 2 document(s) to pending status", + "reset_count": 2, + "failed_ids": [] + } + } + + class DocStatusResponse(BaseModel): id: str = Field(description="Document identifier") content_summary: str = Field(description="Summary of document content") @@ -3169,6 +3227,100 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.post( + "/reset_status", + response_model=ResetDocumentStatusResponse, + dependencies=[Depends(combined_auth)], + ) + async def reset_document_status( + request: ResetDocumentStatusRequest, + tenant_rag: LightRAG = Depends(get_tenant_rag) + ): + """ + Reset document status to allow reprocessing. + + This endpoint allows resetting document status from any state to either: + - PENDING: For documents you want to retry processing + - FAILED: For documents stuck in PROCESSING that you want to mark as failed + + This is useful for: + - Recovering documents stuck in PROCESSING state after server crashes + - Retrying failed documents after fixing the underlying issue + - Manually marking documents as failed for cleanup + + Args: + request: ResetDocumentStatusRequest containing doc_ids and target_status + tenant_rag: Tenant-specific RAG instance (injected dependency) + + Returns: + ResetDocumentStatusResponse: Response with status, message, and counts + + Raises: + HTTPException: If an error occurs while resetting status (500). + """ + from datetime import datetime, timezone + + try: + reset_count = 0 + failed_ids = [] + target_status = DocStatus.PENDING if request.target_status == "pending" else DocStatus.FAILED + + for doc_id in request.doc_ids: + try: + # Get current document status + current_status = await tenant_rag.doc_status.get_by_id(doc_id) + + if current_status is None: + logger.warning(f"Document {doc_id} not found in doc_status storage") + failed_ids.append(doc_id) + continue + + # Update status - current_status is a dict, not an object + updated_data = { + doc_id: { + "status": target_status, + "content_summary": current_status.get("content_summary", ""), + "content_length": current_status.get("content_length", 0), + "created_at": current_status.get("created_at"), + "updated_at": datetime.now(timezone.utc).isoformat(), + "file_path": current_status.get("file_path", ""), + "track_id": current_status.get("track_id"), + "chunks_count": current_status.get("chunks_count"), + "error_msg": None if target_status == DocStatus.PENDING else f"Manually reset to {target_status.value}", + } + } + + await tenant_rag.doc_status.upsert(updated_data) + reset_count += 1 + logger.info(f"Reset document {doc_id} status to {target_status.value}") + + except Exception as e: + logger.error(f"Failed to reset document {doc_id}: {e}") + failed_ids.append(doc_id) + + # Determine overall status + if reset_count == len(request.doc_ids): + status = "success" + message = f"Successfully reset {reset_count} document(s) to {request.target_status} status" + elif reset_count > 0: + status = "partial" + message = f"Reset {reset_count} of {len(request.doc_ids)} documents. {len(failed_ids)} failed." + else: + status = "failed" + message = "Failed to reset any documents. Check document IDs." + + return ResetDocumentStatusResponse( + status=status, + message=message, + reset_count=reset_count, + failed_ids=failed_ids + ) + + except Exception as e: + logger.error(f"Error resetting document status: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + @router.post( "/reprocess_failed", response_model=ReprocessResponse, diff --git a/lightrag/api/webui/index.html b/lightrag/api/webui/index.html index 14226b6c..151a1b0c 100644 --- a/lightrag/api/webui/index.html +++ b/lightrag/api/webui/index.html @@ -8,7 +8,7 @@ Lightrag - + diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 8a638759..d535577a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1939,27 +1939,36 @@ class LightRAG: # Record processing end time for failed case processing_end_time = int(time.time()) - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error_msg": str(e), - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now( - timezone.utc - ).isoformat(), - "file_path": file_path, - "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - }, + # Update document status to failed - wrapped in try/except to ensure we log + # even if the status update fails (e.g., DB connection lost) + try: + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error_msg": str(e), + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now( + timezone.utc + ).isoformat(), + "file_path": file_path, + "track_id": status_doc.track_id, # Preserve existing track_id + "metadata": { + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + }, + } } - } - ) + ) + except Exception as status_update_error: + # Critical: log that we couldn't update the status so the document might be stuck + logger.critical( + f"CRITICAL: Failed to update document {doc_id} status to FAILED after error. " + f"Document may be stuck in PROCESSING state. " + f"Original error: {e}, Status update error: {status_update_error}" + ) # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: diff --git a/lightrag/services/tenant_service.py b/lightrag/services/tenant_service.py index a561bc27..b04c86de 100644 --- a/lightrag/services/tenant_service.py +++ b/lightrag/services/tenant_service.py @@ -75,12 +75,17 @@ class TenantService: except Exception as e: logger.error(f"Failed to insert tenant into PostgreSQL: {e}") raise - - # Store tenant metadata in KV storage - tenant_data = tenant.to_dict() - await self.kv_storage.upsert({ - f"{self.tenant_namespace}:{tenant.tenant_id}": tenant_data - }) + else: + # Fallback: Store tenant metadata in KV storage only if no PostgreSQL DB + # Note: PGKVStorage doesn't support custom namespaces like __tenants__ + # so we skip this when PostgreSQL is available (data is already in tenants table) + try: + tenant_data = tenant.to_dict() + await self.kv_storage.upsert({ + f"{self.tenant_namespace}:{tenant.tenant_id}": tenant_data + }) + except Exception as e: + logger.warning(f"Could not store tenant in KV storage (non-critical): {e}") logger.info(f"Created tenant: {tenant.tenant_id} ({tenant_name})") return tenant @@ -577,11 +582,32 @@ class TenantService: tenant.updated_at = datetime.utcnow() - # Store updated tenant - tenant_data = tenant.to_dict() - await self.kv_storage.upsert({ - f"{self.tenant_namespace}:{tenant_id}": tenant_data - }) + # Update in PostgreSQL if available + if hasattr(self.kv_storage, 'db') and self.kv_storage.db is not None: + try: + import json + metadata_json = json.dumps(tenant.metadata) if tenant.metadata else '{}' + await self.kv_storage.db.query( + """ + UPDATE tenants + SET name = $2, description = $3, metadata = $4::jsonb, updated_at = NOW() + WHERE tenant_id = $1 + """, + [tenant_id, tenant.tenant_name, tenant.description or "", metadata_json] + ) + logger.debug(f"Updated tenant {tenant_id} in PostgreSQL tenants table") + except Exception as e: + logger.error(f"Failed to update tenant in PostgreSQL: {e}") + raise + else: + # Fallback: Store updated tenant in KV storage + try: + tenant_data = tenant.to_dict() + await self.kv_storage.upsert({ + f"{self.tenant_namespace}:{tenant_id}": tenant_data + }) + except Exception as e: + logger.warning(f"Could not update tenant in KV storage (non-critical): {e}") logger.info(f"Updated tenant: {tenant_id}") return tenant @@ -832,11 +858,34 @@ class TenantService: created_by=created_by, ) - # Store KB metadata - kb_data = kb.to_dict() - await self.kv_storage.upsert({ - f"{self.kb_namespace}:{tenant_id}:{kb.kb_id}": kb_data - }) + # Store KB in PostgreSQL if available + if hasattr(self.kv_storage, 'db') and self.kv_storage.db is not None: + try: + await self.kv_storage.db.query( + """ + INSERT INTO knowledge_bases (kb_id, tenant_id, name, description, created_at, updated_at) + VALUES ($1, $2, $3, $4, NOW(), NOW()) + ON CONFLICT (tenant_id, kb_id) DO UPDATE SET + name = EXCLUDED.name, + description = EXCLUDED.description, + updated_at = NOW() + RETURNING kb_id + """, + [kb.kb_id, tenant_id, kb_name, description or ""] + ) + logger.debug(f"Inserted KB {kb.kb_id} into PostgreSQL knowledge_bases table") + except Exception as e: + logger.error(f"Failed to insert KB into PostgreSQL: {e}") + raise + else: + # Fallback: Store KB metadata in KV storage + try: + kb_data = kb.to_dict() + await self.kv_storage.upsert({ + f"{self.kb_namespace}:{tenant_id}:{kb.kb_id}": kb_data + }) + except Exception as e: + logger.warning(f"Could not store KB in KV storage (non-critical): {e}") # Update tenant KB count tenant.kb_count += 1 @@ -893,11 +942,30 @@ class TenantService: kb.updated_at = datetime.utcnow() - # Store updated KB - kb_data = kb.to_dict() - await self.kv_storage.upsert({ - f"{self.kb_namespace}:{tenant_id}:{kb_id}": kb_data - }) + # Update in PostgreSQL if available + if hasattr(self.kv_storage, 'db') and self.kv_storage.db is not None: + try: + await self.kv_storage.db.query( + """ + UPDATE knowledge_bases + SET name = $3, description = $4, updated_at = NOW() + WHERE tenant_id = $1 AND kb_id = $2 + """, + [tenant_id, kb_id, kb.kb_name, kb.description or ""] + ) + logger.debug(f"Updated KB {kb_id} in PostgreSQL knowledge_bases table") + except Exception as e: + logger.error(f"Failed to update KB in PostgreSQL: {e}") + raise + else: + # Fallback: Store updated KB in KV storage + try: + kb_data = kb.to_dict() + await self.kv_storage.upsert({ + f"{self.kb_namespace}:{tenant_id}:{kb_id}": kb_data + }) + except Exception as e: + logger.warning(f"Could not update KB in KV storage (non-critical): {e}") logger.info(f"Updated KB: {kb_id} for tenant {tenant_id}") return kb @@ -1143,6 +1211,16 @@ class TenantService: custom_metadata=config_data.get("custom_metadata", {}), ) + # Helper to parse datetime that might be string or datetime object + def parse_datetime(val, default=None): + if val is None: + return default or datetime.utcnow() + if isinstance(val, datetime): + return val + if isinstance(val, str): + return datetime.fromisoformat(val) + return default or datetime.utcnow() + # Create and return tenant tenant = Tenant( tenant_id=data.get("tenant_id", ""), @@ -1150,8 +1228,8 @@ class TenantService: description=data.get("description"), config=config, is_active=data.get("is_active", True), - created_at=datetime.fromisoformat(data.get("created_at")) if data.get("created_at") else datetime.utcnow(), - updated_at=datetime.fromisoformat(data.get("updated_at")) if data.get("updated_at") else datetime.utcnow(), + created_at=parse_datetime(data.get("created_at")), + updated_at=parse_datetime(data.get("updated_at")), created_by=data.get("created_by"), updated_by=data.get("updated_by"), metadata=data.get("metadata", {}), @@ -1180,6 +1258,16 @@ class TenantService: config_data = data.get("config") config = KBConfig(**config_data) if config_data else None + # Helper to parse datetime that might be string or datetime object + def parse_datetime(val, default=None): + if val is None: + return default + if isinstance(val, datetime): + return val + if isinstance(val, str): + return datetime.fromisoformat(val) + return default + kb = KnowledgeBase( kb_id=data.get("kb_id", ""), tenant_id=data.get("tenant_id", ""), @@ -1192,11 +1280,11 @@ class TenantService: relationship_count=data.get("relationship_count", 0), chunk_count=data.get("chunk_count", 0), storage_used_mb=data.get("storage_used_mb", 0.0), - last_indexed_at=datetime.fromisoformat(data.get("last_indexed_at")) if data.get("last_indexed_at") else None, + last_indexed_at=parse_datetime(data.get("last_indexed_at")), index_version=data.get("index_version", 1), config=config, - created_at=datetime.fromisoformat(data.get("created_at")) if data.get("created_at") else datetime.utcnow(), - updated_at=datetime.fromisoformat(data.get("updated_at")) if data.get("updated_at") else datetime.utcnow(), + created_at=parse_datetime(data.get("created_at"), datetime.utcnow()), + updated_at=parse_datetime(data.get("updated_at"), datetime.utcnow()), created_by=data.get("created_by"), updated_by=data.get("updated_by"), metadata=data.get("metadata", {}), diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 2a4d2225..2fb5a8d4 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -176,6 +176,18 @@ export type ReprocessFailedResponse = { track_id: string } +export type ResetDocumentStatusRequest = { + doc_ids: string[] + target_status: 'pending' | 'failed' +} + +export type ResetDocumentStatusResponse = { + status: 'success' | 'partial' | 'failed' + message: string + reset_count: number + failed_ids: string[] +} + export type DeleteDocResponse = { status: 'deletion_started' | 'busy' | 'not_allowed' message: string @@ -332,6 +344,11 @@ export const reprocessFailedDocuments = async (): Promise => { + const response = await axiosInstance.post('/documents/reset_status', request) + return response.data +} + export const getDocumentsScanProgress = async (): Promise => { const response = await axiosInstance.get('/documents/scan-progress') return response.data diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index ca39add4..389ceb5b 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -24,6 +24,7 @@ import { scanNewDocuments, getDocumentsPaginated, getPipelineStatus, + resetDocumentStatus, DocsStatusesResponse, DocStatus, DocStatusResponse, @@ -1142,6 +1143,42 @@ export default function DocumentManager() { setPagination(prev => ({ ...prev, page: newPage })); }, [statusFilter, pagination.page, pageByStatus]); + // State for reset operation + const [isResetting, setIsResetting] = useState(false) + + // Handle reset document status to pending for retry + const handleResetToPending = useCallback(async () => { + if (selectedDocIds.length === 0) return + + setIsResetting(true) + try { + const response = await resetDocumentStatus({ + doc_ids: selectedDocIds, + target_status: 'pending' + }) + + if (response.status === 'success') { + toast.success(t('documentPanel.documentManager.resetSuccess', { count: response.reset_count })) + setSelectedDocIds([]) + // Refresh documents + startPollingInterval(500) + } else if (response.status === 'partial') { + toast.warning(t('documentPanel.documentManager.resetPartial', { + count: response.reset_count, + failed: response.failed_ids.length + })) + setSelectedDocIds([]) + startPollingInterval(500) + } else { + toast.error(t('documentPanel.documentManager.resetFailed')) + } + } catch (err) { + toast.error(t('documentPanel.documentManager.errors.resetFailed', { error: errorMessage(err) })) + } finally { + setIsResetting(false) + } + }, [selectedDocIds, t, startPollingInterval]) + // Handle documents deleted callback const handleDocumentsDeleted = useCallback(async () => { setSelectedDocIds([]) @@ -1377,10 +1414,27 @@ export default function DocumentManager() {
{isSelectionMode && ( - + <> + + + )} {isSelectionMode && hasCurrentPageSelection ? ( (() => { diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index 585a3cc6..3cb46719 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -147,8 +147,14 @@ "errors": { "loadFailed": "فشل تحميل المستندات\n{{error}}", "scanFailed": "فشل مسح المستندات\n{{error}}", - "scanProgressFailed": "فشل الحصول على تقدم المسح\n{{error}}" + "scanProgressFailed": "فشل الحصول على تقدم المسح\n{{error}}", + "resetFailed": "فشل إعادة تعيين حالة المستند\n{{error}}" }, + "retry": "إعادة المحاولة", + "resetToPending": "إعادة تعيين المستندات المحددة إلى حالة الانتظار لإعادة المحاولة", + "resetSuccess": "تم إعادة تعيين {{count}} مستند(ات) بنجاح", + "resetPartial": "تم إعادة تعيين {{count}} مستند(ات)، فشل {{failed}}", + "resetFailed": "فشل إعادة تعيين المستندات", "fileNameLabel": "اسم الملف", "showButton": "عرض", "hideButton": "إخفاء", diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index b11f3112..d941c923 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -147,8 +147,14 @@ "errors": { "loadFailed": "Failed to load documents\n{{error}}", "scanFailed": "Failed to scan documents\n{{error}}", - "scanProgressFailed": "Failed to get scan progress\n{{error}}" + "scanProgressFailed": "Failed to get scan progress\n{{error}}", + "resetFailed": "Failed to reset document status\n{{error}}" }, + "retry": "Retry", + "resetToPending": "Reset selected documents to pending status for retry", + "resetSuccess": "Successfully reset {{count}} document(s) to pending status", + "resetPartial": "Reset {{count}} document(s), but {{failed}} failed", + "resetFailed": "Failed to reset any documents", "fileNameLabel": "File Name", "showButton": "Show", "hideButton": "Hide", diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index 77a9c421..26761321 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -147,8 +147,14 @@ "errors": { "loadFailed": "Échec du chargement des documents\n{{error}}", "scanFailed": "Échec de la numérisation des documents\n{{error}}", - "scanProgressFailed": "Échec de l'obtention de la progression de la numérisation\n{{error}}" + "scanProgressFailed": "Échec de l'obtention de la progression de la numérisation\n{{error}}", + "resetFailed": "Échec de la réinitialisation du statut\n{{error}}" }, + "retry": "Réessayer", + "resetToPending": "Réinitialiser les documents sélectionnés en attente pour réessayer", + "resetSuccess": "{{count}} document(s) réinitialisé(s) avec succès", + "resetPartial": "{{count}} document(s) réinitialisé(s), mais {{failed}} ont échoué", + "resetFailed": "Échec de la réinitialisation des documents", "fileNameLabel": "Nom du fichier", "showButton": "Afficher", "hideButton": "Masquer", diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index 4a0a0713..094ba860 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -147,8 +147,14 @@ "errors": { "loadFailed": "加载文档失败\n{{error}}", "scanFailed": "扫描文档失败\n{{error}}", - "scanProgressFailed": "获取扫描进度失败\n{{error}}" + "scanProgressFailed": "获取扫描进度失败\n{{error}}", + "resetFailed": "重置文档状态失败\n{{error}}" }, + "retry": "重试", + "resetToPending": "将选中的文档重置为待处理状态以便重试", + "resetSuccess": "成功重置 {{count}} 个文档", + "resetPartial": "已重置 {{count}} 个文档,{{failed}} 个失败", + "resetFailed": "重置文档失败", "fileNameLabel": "文件名", "showButton": "显示", "hideButton": "隐藏", diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index 85ca0cbd..a4b126df 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -147,8 +147,14 @@ "errors": { "loadFailed": "載入文件失敗\n{{error}}", "scanFailed": "掃描文件失敗\n{{error}}", - "scanProgressFailed": "取得掃描進度失敗\n{{error}}" + "scanProgressFailed": "取得掃描進度失敗\n{{error}}", + "resetFailed": "重置文件狀態失敗\n{{error}}" }, + "retry": "重試", + "resetToPending": "將選取的文件重置為待處理狀態以便重試", + "resetSuccess": "成功重置 {{count}} 個文件", + "resetPartial": "已重置 {{count}} 個文件,{{failed}} 個失敗", + "resetFailed": "重置文件失敗", "fileNameLabel": "檔案名稱", "showButton": "顯示", "hideButton": "隱藏",