diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 7e44b57d..7dc2c34c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -406,7 +406,7 @@ class DocStatusResponse(BaseModel): "id": "doc_123456", "content_summary": "Research paper on machine learning", "content_length": 15240, - "status": "PROCESSED", + "status": "processed", "created_at": "2025-03-31T12:34:56", "updated_at": "2025-03-31T12:35:30", "track_id": "upload_20250729_170612_abc123", @@ -439,7 +439,7 @@ class DocsStatusesResponse(BaseModel): "id": "doc_123", "content_summary": "Pending document", "content_length": 5000, - "status": "PENDING", + "status": "pending", "created_at": "2025-03-31T10:00:00", "updated_at": "2025-03-31T10:00:00", "track_id": "upload_20250331_100000_abc123", @@ -449,12 +449,27 @@ class DocsStatusesResponse(BaseModel): "file_path": "pending_doc.pdf", } ], + "PREPROCESSED": [ + { + "id": "doc_789", + "content_summary": "Document pending final indexing", + "content_length": 7200, + "status": "multimodal_processed", + "created_at": "2025-03-31T09:30:00", + "updated_at": "2025-03-31T09:35:00", + "track_id": "upload_20250331_093000_xyz789", + "chunks_count": 10, + "error": None, + "metadata": None, + "file_path": "preprocessed_doc.pdf", + } + ], "PROCESSED": [ { "id": "doc_456", "content_summary": "Processed document", "content_length": 8000, - "status": "PROCESSED", + "status": "processed", "created_at": "2025-03-31T09:00:00", "updated_at": "2025-03-31T09:05:00", "track_id": "insert_20250331_090000_def456", @@ -626,6 +641,7 @@ class PaginatedDocsResponse(BaseModel): "status_counts": { "PENDING": 10, "PROCESSING": 5, + "PREPROCESSED": 5, "PROCESSED": 130, "FAILED": 5, }, @@ -648,6 +664,7 @@ class StatusCountsResponse(BaseModel): "status_counts": { "PENDING": 10, "PROCESSING": 5, + "PREPROCESSED": 5, "PROCESSED": 130, "FAILED": 5, } @@ -2210,7 +2227,7 @@ def create_document_routes( To prevent excessive resource consumption, a maximum of 1,000 records is returned. This endpoint retrieves the current status of all documents, grouped by their - processing status (PENDING, PROCESSING, PROCESSED, FAILED). The results are + processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are limited to 1000 total documents with fair distribution across all statuses. Returns: @@ -2226,6 +2243,7 @@ def create_document_routes( statuses = ( DocStatus.PENDING, DocStatus.PROCESSING, + DocStatus.PREPROCESSED, DocStatus.PROCESSED, DocStatus.FAILED, ) diff --git a/lightrag/base.py b/lightrag/base.py index bae0728b..45c5cb2c 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -19,6 +19,7 @@ from typing import ( from .utils import EmbeddingFunc from .types import KnowledgeGraph from .constants import ( + GRAPH_FIELD_SEP, DEFAULT_TOP_K, DEFAULT_CHUNK_TOP_K, DEFAULT_MAX_ENTITY_TOKENS, @@ -354,14 +355,6 @@ class BaseKVStorage(StorageNameSpace, ABC): None """ - @abstractmethod - async def is_empty(self) -> bool: - """Check if the storage is empty - - Returns: - bool: True if storage contains no data, False otherwise - """ - @dataclass class BaseGraphStorage(StorageNameSpace, ABC): @@ -527,6 +520,56 @@ class BaseGraphStorage(StorageNameSpace, ABC): result[node_id] = edges if edges is not None else [] return result + @abstractmethod + async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: + """Get all nodes that are associated with the given chunk_ids. + + Args: + chunk_ids (list[str]): A list of chunk IDs to find associated nodes for. + + Returns: + list[dict]: A list of nodes, where each node is a dictionary of its properties. + An empty list if no matching nodes are found. + """ + + @abstractmethod + async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: + """Get all edges that are associated with the given chunk_ids. + + Args: + chunk_ids (list[str]): A list of chunk IDs to find associated edges for. + + Returns: + list[dict]: A list of edges, where each edge is a dictionary of its properties. + An empty list if no matching edges are found. + """ + # Default implementation iterates through all nodes and their edges, which is inefficient. + # This method should be overridden by subclasses for better performance. + all_edges = [] + all_labels = await self.get_all_labels() + processed_edges = set() + + for label in all_labels: + edges = await self.get_node_edges(label) + if edges: + for src_id, tgt_id in edges: + # Avoid processing the same edge twice in an undirected graph + edge_tuple = tuple(sorted((src_id, tgt_id))) + if edge_tuple in processed_edges: + continue + processed_edges.add(edge_tuple) + + edge = await self.get_edge(src_id, tgt_id) + if edge and "source_id" in edge: + source_ids = set(edge["source_id"].split(GRAPH_FIELD_SEP)) + if not source_ids.isdisjoint(chunk_ids): + # Add source and target to the edge dict for easier processing later + edge_with_nodes = edge.copy() + edge_with_nodes["source"] = src_id + edge_with_nodes["target"] = tgt_id + all_edges.append(edge_with_nodes) + return all_edges + @abstractmethod async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: """Insert a new node or update an existing node in the graph. @@ -669,7 +712,7 @@ class DocStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" - PREPROCESSED = "preprocessed" + PREPROCESSED = "multimodal_processed" PROCESSED = "processed" FAILED = "failed" @@ -700,25 +743,6 @@ class DocProcessingStatus: """Error message if failed""" metadata: dict[str, Any] = field(default_factory=dict) """Additional metadata""" - multimodal_processed: bool | None = field(default=None, repr=False) - """Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging.""" - - def __post_init__(self): - """ - Handle status conversion based on multimodal_processed field. - - Business rules: - - If multimodal_processed is False and status is PROCESSED, - then change status to PREPROCESSED - - The multimodal_processed field is kept (with repr=False) for internal use and debugging - """ - # Apply status conversion logic - if self.multimodal_processed is not None: - if ( - self.multimodal_processed is False - and self.status == DocStatus.PROCESSED - ): - self.status = DocStatus.PREPROCESSED @dataclass diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index d288685e..cef0f240 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2617,7 +2617,12 @@ class LightRAG: ) # Check document status and log warning for non-completed documents - doc_status = doc_status_data.get("status") + raw_status = doc_status_data.get("status") + try: + doc_status = DocStatus(raw_status) + except ValueError: + doc_status = raw_status + if doc_status != DocStatus.PROCESSED: if doc_status == DocStatus.PENDING: warning_msg = ( @@ -2627,12 +2632,23 @@ class LightRAG: warning_msg = ( f"Deleting {doc_id} {file_path}(previous status: PROCESSING)" ) + elif doc_status == DocStatus.PREPROCESSED: + warning_msg = ( + f"Deleting {doc_id} {file_path}(previous status: PREPROCESSED)" + ) elif doc_status == DocStatus.FAILED: warning_msg = ( f"Deleting {doc_id} {file_path}(previous status: FAILED)" ) else: - warning_msg = f"Deleting {doc_id} {file_path}(previous status: {doc_status.value})" + status_text = ( + doc_status.value + if isinstance(doc_status, DocStatus) + else str(doc_status) + ) + warning_msg = ( + f"Deleting {doc_id} {file_path}(previous status: {status_text})" + ) logger.info(warning_msg) # Update pipeline status for monitoring async with pipeline_status_lock: diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 514cc417..d370739d 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -167,7 +167,7 @@ export type DeleteDocResponse = { doc_id: string } -export type DocStatus = 'pending' | 'processing' | 'processed' | 'failed' +export type DocStatus = 'pending' | 'processing' | 'multimodal_processed' | 'processed' | 'failed' export type DocStatusResponse = { id: string diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index 3e46cb7c..4fd06e82 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -39,6 +39,21 @@ import PipelineStatusDialog from '@/components/documents/PipelineStatusDialog' type StatusFilter = DocStatus | 'all'; +// Utility functions defined outside component for better performance and to avoid dependency issues +const getCountValue = (counts: Record, ...keys: string[]): number => { + for (const key of keys) { + const value = counts[key] + if (typeof value === 'number') { + return value + } + } + return 0 +} + +const hasActiveDocumentsStatus = (counts: Record): boolean => + getCountValue(counts, 'PROCESSING', 'processing') > 0 || + getCountValue(counts, 'PENDING', 'pending') > 0 || + getCountValue(counts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') > 0 const getDisplayFileName = (doc: DocStatusResponse, maxLength: number = 20): string => { // Check if file_path exists and is a non-empty string @@ -261,6 +276,7 @@ export default function DocumentManager() { const [pageByStatus, setPageByStatus] = useState>({ all: routeState.page, processed: 1, + multimodal_processed: 1, processing: 1, pending: 1, failed: 1, @@ -341,6 +357,7 @@ export default function DocumentManager() { setPageByStatus({ all: 1, processed: 1, + 'multimodal_processed': 1, processing: 1, pending: 1, failed: 1, @@ -485,9 +502,19 @@ export default function DocumentManager() { return counts; }, [docs]); + const processedCount = getCountValue(statusCounts, 'PROCESSED', 'processed') || documentCounts.processed || 0; + const preprocessedCount = + getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') || + documentCounts.multimodal_processed || + 0; + const processingCount = getCountValue(statusCounts, 'PROCESSING', 'processing') || documentCounts.processing || 0; + const pendingCount = getCountValue(statusCounts, 'PENDING', 'pending') || documentCounts.pending || 0; + const failedCount = getCountValue(statusCounts, 'FAILED', 'failed') || documentCounts.failed || 0; + // Store previous status counts const prevStatusCounts = useRef({ processed: 0, + multimodal_processed: 0, processing: 0, pending: 0, failed: 0 @@ -578,6 +605,7 @@ export default function DocumentManager() { const legacyDocs: DocsStatusesResponse = { statuses: { processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processed'), + multimodal_processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'multimodal_processed'), processing: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processing'), pending: response.documents.filter((doc: DocStatusResponse) => doc.status === 'pending'), failed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'failed') @@ -907,7 +935,7 @@ export default function DocumentManager() { setTimeout(() => { if (isMountedRef.current && currentTab === 'documents' && health) { // Restore intelligent polling interval based on document status - const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0; + const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); const normalInterval = hasActiveDocuments ? 5000 : 30000; startPollingInterval(normalInterval); } @@ -943,7 +971,7 @@ export default function DocumentManager() { setTimeout(() => { if (isMountedRef.current && currentTab === 'documents' && health) { // Restore intelligent polling interval based on document status - const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0; + const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); const normalInterval = hasActiveDocuments ? 5000 : 30000; startPollingInterval(normalInterval); } @@ -967,6 +995,7 @@ export default function DocumentManager() { setPageByStatus({ all: 1, processed: 1, + multimodal_processed: 1, processing: 1, pending: 1, failed: 1, @@ -1013,6 +1042,7 @@ export default function DocumentManager() { const legacyDocs: DocsStatusesResponse = { statuses: { processed: response.documents.filter(doc => doc.status === 'processed'), + multimodal_processed: response.documents.filter(doc => doc.status === 'multimodal_processed'), processing: response.documents.filter(doc => doc.status === 'processing'), pending: response.documents.filter(doc => doc.status === 'pending'), failed: response.documents.filter(doc => doc.status === 'failed') @@ -1050,14 +1080,21 @@ export default function DocumentManager() { handleIntelligentRefresh(); // Reset polling timer after intelligent refresh - const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0; + const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); const pollingInterval = hasActiveDocuments ? 5000 : 30000; startPollingInterval(pollingInterval); } } // Update the previous state prevPipelineBusyRef.current = pipelineBusy; - }, [pipelineBusy, currentTab, health, handleIntelligentRefresh, statusCounts.processing, statusCounts.pending, startPollingInterval]); + }, [ + pipelineBusy, + currentTab, + health, + handleIntelligentRefresh, + statusCounts, + startPollingInterval + ]); // Set up intelligent polling with dynamic interval based on document status useEffect(() => { @@ -1067,7 +1104,7 @@ export default function DocumentManager() { } // Determine polling interval based on document status - const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0; + const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); const pollingInterval = hasActiveDocuments ? 5000 : 30000; // 5s if active, 30s if idle startPollingInterval(pollingInterval); @@ -1084,6 +1121,7 @@ export default function DocumentManager() { // Get new status counts const newStatusCounts = { processed: docs?.statuses?.processed?.length || 0, + multimodal_processed: docs?.statuses?.multimodal_processed?.length || 0, processing: docs?.statuses?.processing?.length || 0, pending: docs?.statuses?.pending?.length || 0, failed: docs?.statuses?.failed?.length || 0 @@ -1428,11 +1466,23 @@ export default function DocumentManager() { onClick={() => handleStatusFilterChange('processed')} disabled={isRefreshing} className={cn( - (statusCounts.PROCESSED || statusCounts.processed || documentCounts.processed) > 0 ? 'text-green-600' : 'text-gray-500', + processedCount > 0 ? 'text-green-600' : 'text-gray-500', statusFilter === 'processed' && 'bg-green-100 dark:bg-green-900/30 font-medium border border-green-400 dark:border-green-600 shadow-sm' )} > - {t('documentPanel.documentManager.status.completed')} ({statusCounts.PROCESSED || statusCounts.processed || 0}) + {t('documentPanel.documentManager.status.completed')} ({processedCount}) + +