diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index c7d9dd97..71247f17 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -2173,20 +2173,24 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + # TODO: Deprecated @router.get( "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)] ) async def documents() -> DocsStatusesResponse: """ - Get the status of all documents in the system. + Get the status of all documents in the system. This endpoint is deprecated; use /documents/paginated instead. + To prevent excessive resource consumption, a maximum of 1,000 records is returned. This endpoint retrieves the current status of all documents, grouped by their - processing status (PENDING, PROCESSING, PROCESSED, FAILED). + processing status (PENDING, PROCESSING, PROCESSED, FAILED). The results are + limited to 1000 total documents with fair distribution across all statuses. Returns: DocsStatusesResponse: A response object containing a dictionary where keys are DocStatus values and values are lists of DocStatusResponse objects representing documents in each status category. + Maximum 1000 documents total will be returned. Raises: HTTPException: If an error occurs while retrieving document statuses (500). @@ -2203,12 +2207,45 @@ def create_document_routes( results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks) response = DocsStatusesResponse() + total_documents = 0 + max_documents = 1000 + # Convert results to lists for easier processing + status_documents = [] for idx, result in enumerate(results): status = statuses[idx] + docs_list = [] for doc_id, doc_status in result.items(): + docs_list.append((doc_id, doc_status)) + status_documents.append((status, docs_list)) + + # Fair distribution: round-robin across statuses + status_indices = [0] * len( + status_documents + ) # Track current index for each status + current_status_idx = 0 + + while total_documents < max_documents: + # Check if we have any documents left to process + has_remaining = False + for status_idx, (status, docs_list) in enumerate(status_documents): + if status_indices[status_idx] < len(docs_list): + has_remaining = True + break + + if not has_remaining: + break + + # Try to get a document from the current status + status, docs_list = status_documents[current_status_idx] + current_index = status_indices[current_status_idx] + + if current_index < len(docs_list): + doc_id, doc_status = docs_list[current_index] + if status not in response.statuses: response.statuses[status] = [] + response.statuses[status].append( DocStatusResponse( id=doc_id, @@ -2224,6 +2261,13 @@ def create_document_routes( file_path=doc_status.file_path, ) ) + + status_indices[current_status_idx] += 1 + total_documents += 1 + + # Move to next status (round-robin) + current_status_idx = (current_status_idx + 1) % len(status_documents) + return response except Exception as e: logger.error(f"Error GET /documents: {str(e)}")