diff --git a/lightrag/__init__.py b/lightrag/__init__.py index 87c4b622..3a42228c 100644 --- a/lightrag/__init__.py +++ b/lightrag/__init__.py @@ -1,5 +1,5 @@ from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam -__version__ = "1.4.8.1" +__version__ = "1.4.8.2" __author__ = "Zirui Guo" __url__ = "https://github.com/HKUDS/LightRAG" diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index 961816ea..7b7962a2 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0222" +__api_version__ = "0223" diff --git a/lightrag/operate.py b/lightrag/operate.py index 58278ff5..fab5ea2e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -2179,7 +2179,12 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: - return await _process_single_content(chunk) + try: + return await _process_single_content(chunk) + except Exception as e: + chunk_id = chunk[0] # Extract chunk_id from chunk[0] + prefixed_exception = create_prefixed_exception(e, chunk_id) + raise prefixed_exception from e tasks = [] for c in ordered_chunks: @@ -2217,7 +2222,7 @@ async def extract_entities( await asyncio.wait(pending) # Add progress prefix to the exception message - progress_prefix = f"Chunks[{processed_chunks+1}/{total_chunks}]" + progress_prefix = f"C[{processed_chunks+1}/{total_chunks}]" # Re-raise the original exception with a prefix prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index 4842ca5e..6994b99c 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -236,6 +236,26 @@ export default function DocumentManager() { const [selectedDocIds, setSelectedDocIds] = useState([]) const isSelectionMode = selectedDocIds.length > 0 + // Add refs to track previous pipelineBusy state and current interval + const prevPipelineBusyRef = useRef(undefined); + const pollingIntervalRef = useRef | null>(null); + + // Add retry mechanism state + const [retryState, setRetryState] = useState({ + count: 0, + lastError: null as Error | null, + isBackingOff: false + }); + + // Add circuit breaker state + const [circuitBreakerState, setCircuitBreakerState] = useState({ + isOpen: false, + failureCount: 0, + lastFailureTime: null as number | null, + nextRetryTime: null as number | null + }); + + // Handle checkbox change for individual documents const handleDocumentSelect = useCallback((docId: string, checked: boolean) => { setSelectedDocIds(prev => { @@ -519,6 +539,98 @@ export default function DocumentManager() { setDocs(response.pagination.total_count > 0 ? legacyDocs : null); }, []); + // Utility function to create timeout wrapper for API calls + const withTimeout = useCallback(( + promise: Promise, + timeoutMs: number = 30000, + errorMsg: string = 'Request timeout' + ): Promise => { + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(errorMsg)), timeoutMs) + }); + return Promise.race([promise, timeoutPromise]); + }, []); + + + // Enhanced error classification + const classifyError = useCallback((error: any) => { + if (error.name === 'AbortError') { + return { type: 'cancelled', shouldRetry: false, shouldShowToast: false }; + } + + if (error.message === 'Request timeout') { + return { type: 'timeout', shouldRetry: true, shouldShowToast: true }; + } + + if (error.message?.includes('Network Error') || error.code === 'NETWORK_ERROR') { + return { type: 'network', shouldRetry: true, shouldShowToast: true }; + } + + if (error.status >= 500) { + return { type: 'server', shouldRetry: true, shouldShowToast: true }; + } + + if (error.status >= 400 && error.status < 500) { + return { type: 'client', shouldRetry: false, shouldShowToast: true }; + } + + return { type: 'unknown', shouldRetry: true, shouldShowToast: true }; + }, []); + + // Circuit breaker utility functions + const isCircuitBreakerOpen = useCallback(() => { + if (!circuitBreakerState.isOpen) return false; + + const now = Date.now(); + if (circuitBreakerState.nextRetryTime && now >= circuitBreakerState.nextRetryTime) { + // Reset circuit breaker to half-open state + setCircuitBreakerState(prev => ({ + ...prev, + isOpen: false, + failureCount: Math.max(0, prev.failureCount - 1) + })); + return false; + } + + return true; + }, [circuitBreakerState]); + + const recordFailure = useCallback((error: Error) => { + const now = Date.now(); + setCircuitBreakerState(prev => { + const newFailureCount = prev.failureCount + 1; + const shouldOpen = newFailureCount >= 3; // Open after 3 failures + + return { + isOpen: shouldOpen, + failureCount: newFailureCount, + lastFailureTime: now, + nextRetryTime: shouldOpen ? now + (Math.pow(2, newFailureCount) * 1000) : null + }; + }); + + setRetryState(prev => ({ + count: prev.count + 1, + lastError: error, + isBackingOff: true + })); + }, []); + + const recordSuccess = useCallback(() => { + setCircuitBreakerState({ + isOpen: false, + failureCount: 0, + lastFailureTime: null, + nextRetryTime: null + }); + + setRetryState({ + count: 0, + lastError: null, + isBackingOff: false + }); + }, []); + // Intelligent refresh function: handles all boundary cases const handleIntelligentRefresh = useCallback(async ( targetPage?: number, // Optional target page, defaults to current page @@ -540,7 +652,12 @@ export default function DocumentManager() { sort_direction: sortDirection }; - const response = await getDocumentsPaginated(request); + // Use timeout wrapper for the API call + const response = await withTimeout( + getDocumentsPaginated(request), + 30000, // 30 second timeout + 'Document fetch timeout' + ); if (!isMountedRef.current) return; @@ -556,7 +673,11 @@ export default function DocumentManager() { page: lastPage }; - const lastPageResponse = await getDocumentsPaginated(lastPageRequest); + const lastPageResponse = await withTimeout( + getDocumentsPaginated(lastPageRequest), + 30000, + 'Document fetch timeout' + ); if (!isMountedRef.current) return; @@ -575,14 +696,22 @@ export default function DocumentManager() { } catch (err) { if (isMountedRef.current) { - toast.error(t('documentPanel.documentManager.errors.loadFailed', { error: errorMessage(err) })); + const errorClassification = classifyError(err); + + if (errorClassification.shouldShowToast) { + toast.error(t('documentPanel.documentManager.errors.loadFailed', { error: errorMessage(err) })); + } + + if (errorClassification.shouldRetry) { + recordFailure(err as Error); + } } } finally { if (isMountedRef.current) { setIsRefreshing(false); } } - }, [statusFilter, pagination.page, pagination.page_size, sortField, sortDirection, t, updateComponentState]); + }, [statusFilter, pagination.page, pagination.page_size, sortField, sortDirection, t, updateComponentState, withTimeout, classifyError, recordFailure]); // New paginated data fetching function const fetchPaginatedDocuments = useCallback(async ( @@ -602,10 +731,6 @@ export default function DocumentManager() { await fetchPaginatedDocuments(pagination.page, pagination.page_size, statusFilter); }, [fetchPaginatedDocuments, pagination.page, pagination.page_size, statusFilter]); - // Add refs to track previous pipelineBusy state and current interval - const prevPipelineBusyRef = useRef(undefined); - const pollingIntervalRef = useRef | null>(null); - // Function to clear current polling interval const clearPollingInterval = useCallback(() => { if (pollingIntervalRef.current) { @@ -620,18 +745,49 @@ export default function DocumentManager() { pollingIntervalRef.current = setInterval(async () => { try { + // Check circuit breaker before making request + if (isCircuitBreakerOpen()) { + return; // Skip this polling cycle + } + // Only perform fetch if component is still mounted if (isMountedRef.current) { - await fetchDocuments() + await fetchDocuments(); + recordSuccess(); // Record successful operation } } catch (err) { - // Only show error if component is still mounted + // Only handle error if component is still mounted if (isMountedRef.current) { - toast.error(t('documentPanel.documentManager.errors.scanProgressFailed', { error: errorMessage(err) })) + const errorClassification = classifyError(err); + + // Always reset isRefreshing state on error + setIsRefreshing(false); + + if (errorClassification.shouldShowToast) { + toast.error(t('documentPanel.documentManager.errors.scanProgressFailed', { error: errorMessage(err) })); + } + + if (errorClassification.shouldRetry) { + recordFailure(err as Error); + + // Implement exponential backoff for retries + const backoffDelay = Math.min(Math.pow(2, retryState.count) * 1000, 30000); // Max 30s + + if (retryState.count < 3) { // Max 3 retries + setTimeout(() => { + if (isMountedRef.current) { + setRetryState(prev => ({ ...prev, isBackingOff: false })); + } + }, backoffDelay); + } + } else { + // For non-retryable errors, stop polling + clearPollingInterval(); + } } } }, intervalMs); - }, [fetchDocuments, t, clearPollingInterval]); + }, [fetchDocuments, t, clearPollingInterval, isCircuitBreakerOpen, recordSuccess, recordFailure, classifyError, retryState.count]); const scanDocuments = useCallback(async () => { try {