From f5b0c3d38c6c7dbbabe1c079d3a7fda5202caa90 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 16 Aug 2025 23:08:52 +0800 Subject: [PATCH] feat: Recording file extraction error status to document pipeline - Add apipeline_enqueue_error_documents function to LightRAG class for recording file processing errors in doc_status storage - Enhance pipeline_enqueue_file with detailed error handling for all file processing stages: * File access errors (permissions, not found) * UTF-8 encoding errors * Format-specific processing errors (PDF, DOCX, PPTX, XLSX) * Content validation errors * Unsupported file type errors This implementation ensures all file extraction failures are properly tracked and recorded in the doc_status storage system, providing better visibility into document processing issues and enabling improved error monitoring and debugging capabilities. --- lightrag/api/routers/document_routes.py | 589 ++++++++++++++++-------- lightrag/lightrag.py | 77 ++++ 2 files changed, 481 insertions(+), 185 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 7ad21f45..7a6e5973 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -792,225 +792,444 @@ async def pipeline_enqueue_file( tuple: (success: bool, track_id: str) """ + # Generate track_id if not provided + if track_id is None: + track_id = generate_track_id("unknown") + try: content = "" ext = file_path.suffix.lower() + file_size = 0 + + # Get file size for error reporting + try: + file_size = file_path.stat().st_size + except Exception: + file_size = 0 file = None - async with aiofiles.open(file_path, "rb") as f: - file = await f.read() + try: + async with aiofiles.open(file_path, "rb") as f: + file = await f.read() + except PermissionError as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "Permission denied - cannot read file", + "original_error": str(e), + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error(f"Permission denied reading file: {file_path.name}") + return False, track_id + except FileNotFoundError as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "File not found", + "original_error": str(e), + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error(f"File not found: {file_path.name}") + return False, track_id + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "File reading error", + "original_error": str(e), + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error(f"Error reading file {file_path.name}: {str(e)}") + return False, track_id # Process based on file type - match ext: - case ( - ".txt" - | ".md" - | ".html" - | ".htm" - | ".tex" - | ".json" - | ".xml" - | ".yaml" - | ".yml" - | ".rtf" - | ".odt" - | ".epub" - | ".csv" - | ".log" - | ".conf" - | ".ini" - | ".properties" - | ".sql" - | ".bat" - | ".sh" - | ".c" - | ".cpp" - | ".py" - | ".java" - | ".js" - | ".ts" - | ".swift" - | ".go" - | ".rb" - | ".php" - | ".css" - | ".scss" - | ".less" - ): - try: - # Try to decode as UTF-8 - content = file.decode("utf-8") + try: + match ext: + case ( + ".txt" + | ".md" + | ".html" + | ".htm" + | ".tex" + | ".json" + | ".xml" + | ".yaml" + | ".yml" + | ".rtf" + | ".odt" + | ".epub" + | ".csv" + | ".log" + | ".conf" + | ".ini" + | ".properties" + | ".sql" + | ".bat" + | ".sh" + | ".c" + | ".cpp" + | ".py" + | ".java" + | ".js" + | ".ts" + | ".swift" + | ".go" + | ".rb" + | ".php" + | ".css" + | ".scss" + | ".less" + ): + try: + # Try to decode as UTF-8 + content = file.decode("utf-8") - # Validate content - if not content or len(content.strip()) == 0: - logger.error(f"Empty content in file: {file_path.name}") - return False, "" - - # Check if content looks like binary data string representation - if content.startswith("b'") or content.startswith('b"'): - logger.error( - f"File {file_path.name} appears to contain binary data representation instead of text" - ) - return False, "" - - except UnicodeDecodeError: - logger.error( - f"File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing." - ) - return False, "" - case ".pdf": - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() - else: - if not pm.is_installed("pypdf2"): # type: ignore - pm.install("pypdf2") - from PyPDF2 import PdfReader # type: ignore - from io import BytesIO - - pdf_file = BytesIO(file) - reader = PdfReader(pdf_file) - for page in reader.pages: - content += page.extract_text() + "\n" - case ".docx": - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() - else: - if not pm.is_installed("python-docx"): # type: ignore - try: - pm.install("python-docx") - except Exception: - pm.install("docx") - from docx import Document # type: ignore - from io import BytesIO - - docx_file = BytesIO(file) - doc = Document(docx_file) - content = "\n".join( - [paragraph.text for paragraph in doc.paragraphs] - ) - case ".pptx": - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() - else: - if not pm.is_installed("python-pptx"): # type: ignore - pm.install("pptx") - from pptx import Presentation # type: ignore - from io import BytesIO - - pptx_file = BytesIO(file) - prs = Presentation(pptx_file) - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text"): - content += shape.text + "\n" - case ".xlsx": - if global_args.document_loading_engine == "DOCLING": - if not pm.is_installed("docling"): # type: ignore - pm.install("docling") - from docling.document_converter import DocumentConverter # type: ignore - - converter = DocumentConverter() - result = converter.convert(file_path) - content = result.document.export_to_markdown() - else: - if not pm.is_installed("openpyxl"): # type: ignore - pm.install("openpyxl") - from openpyxl import load_workbook # type: ignore - from io import BytesIO - - xlsx_file = BytesIO(file) - wb = load_workbook(xlsx_file) - for sheet in wb: - content += f"Sheet: {sheet.title}\n" - for row in sheet.iter_rows(values_only=True): - content += ( - "\t".join( - str(cell) if cell is not None else "" - for cell in row - ) - + "\n" + # Validate content + if not content or len(content.strip()) == 0: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "Empty file content", + "original_error": "File contains no content or only whitespace", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id ) - content += "\n" - case _: - logger.error( - f"Unsupported file type: {file_path.name} (extension {ext})" - ) - return False, "" + logger.error(f"Empty content in file: {file_path.name}") + return False, track_id + + # Check if content looks like binary data string representation + if content.startswith("b'") or content.startswith('b"'): + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "Binary data in text file", + "original_error": "File appears to contain binary data representation instead of text", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error( + f"File {file_path.name} appears to contain binary data representation instead of text" + ) + return False, track_id + + except UnicodeDecodeError as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "UTF-8 encoding error", + "original_error": f"File is not valid UTF-8 encoded text: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error( + f"File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing." + ) + return False, track_id + + case ".pdf": + try: + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() + else: + if not pm.is_installed("pypdf2"): # type: ignore + pm.install("pypdf2") + from PyPDF2 import PdfReader # type: ignore + from io import BytesIO + + pdf_file = BytesIO(file) + reader = PdfReader(pdf_file) + for page in reader.pages: + content += page.extract_text() + "\n" + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "PDF processing error", + "original_error": f"Failed to extract text from PDF: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error(f"Error processing PDF {file_path.name}: {str(e)}") + return False, track_id + + case ".docx": + try: + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() + else: + if not pm.is_installed("python-docx"): # type: ignore + try: + pm.install("python-docx") + except Exception: + pm.install("docx") + from docx import Document # type: ignore + from io import BytesIO + + docx_file = BytesIO(file) + doc = Document(docx_file) + content = "\n".join( + [paragraph.text for paragraph in doc.paragraphs] + ) + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "DOCX processing error", + "original_error": f"Failed to extract text from DOCX: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error( + f"Error processing DOCX {file_path.name}: {str(e)}" + ) + return False, track_id + + case ".pptx": + try: + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() + else: + if not pm.is_installed("python-pptx"): # type: ignore + pm.install("pptx") + from pptx import Presentation # type: ignore + from io import BytesIO + + pptx_file = BytesIO(file) + prs = Presentation(pptx_file) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "PPTX processing error", + "original_error": f"Failed to extract text from PPTX: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error( + f"Error processing PPTX {file_path.name}: {str(e)}" + ) + return False, track_id + + case ".xlsx": + try: + if global_args.document_loading_engine == "DOCLING": + if not pm.is_installed("docling"): # type: ignore + pm.install("docling") + from docling.document_converter import DocumentConverter # type: ignore + + converter = DocumentConverter() + result = converter.convert(file_path) + content = result.document.export_to_markdown() + else: + if not pm.is_installed("openpyxl"): # type: ignore + pm.install("openpyxl") + from openpyxl import load_workbook # type: ignore + from io import BytesIO + + xlsx_file = BytesIO(file) + wb = load_workbook(xlsx_file) + for sheet in wb: + content += f"Sheet: {sheet.title}\n" + for row in sheet.iter_rows(values_only=True): + content += ( + "\t".join( + str(cell) if cell is not None else "" + for cell in row + ) + + "\n" + ) + content += "\n" + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "XLSX processing error", + "original_error": f"Failed to extract text from XLSX: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents( + error_files, track_id + ) + logger.error( + f"Error processing XLSX {file_path.name}: {str(e)}" + ) + return False, track_id + + case _: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": f"Unsupported file type: {ext}", + "original_error": f"File extension {ext} is not supported", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error( + f"Unsupported file type: {file_path.name} (extension {ext})" + ) + return False, track_id + + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "File format processing error", + "original_error": f"Unexpected error during file processing: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error(f"Unexpected error processing file {file_path.name}: {str(e)}") + return False, track_id # Insert into the RAG queue if content: # Check if content contains only whitespace characters if not content.strip(): + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "File contains only whitespace", + "original_error": "File content contains only whitespace characters", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) logger.warning( f"File contains only whitespace characters. file_paths={file_path.name}" ) + return False, track_id - # Generate track_id if not provided - if track_id is None: - track_id = generate_track_id("unkown") - - await rag.apipeline_enqueue_documents( - content, file_paths=file_path.name, track_id=track_id - ) - - logger.info(f"Successfully fetched and enqueued file: {file_path.name}") - - # Move file to __enqueued__ directory after enqueuing try: - enqueued_dir = file_path.parent / "__enqueued__" - enqueued_dir.mkdir(exist_ok=True) - - # Generate unique filename to avoid conflicts - unique_filename = get_unique_filename_in_enqueued( - enqueued_dir, file_path.name - ) - target_path = enqueued_dir / unique_filename - - # Move the file - file_path.rename(target_path) - logger.info( - f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}" + await rag.apipeline_enqueue_documents( + content, file_paths=file_path.name, track_id=track_id ) - except Exception as move_error: - logger.error( - f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}" - ) - # Don't affect the main function's success status + logger.info(f"Successfully fetched and enqueued file: {file_path.name}") - return True, track_id + # Move file to __enqueued__ directory after enqueuing + try: + enqueued_dir = file_path.parent / "__enqueued__" + enqueued_dir.mkdir(exist_ok=True) + + # Generate unique filename to avoid conflicts + unique_filename = get_unique_filename_in_enqueued( + enqueued_dir, file_path.name + ) + target_path = enqueued_dir / unique_filename + + # Move the file + file_path.rename(target_path) + logger.info( + f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}" + ) + + except Exception as move_error: + logger.error( + f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}" + ) + # Don't affect the main function's success status + + return True, track_id + + except Exception as e: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "Document enqueue error", + "original_error": f"Failed to enqueue document: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) + logger.error(f"Error enqueueing document {file_path.name}: {str(e)}") + return False, track_id else: + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "No content extracted", + "original_error": "No content could be extracted from file", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) logger.error(f"No content could be extracted from file: {file_path.name}") - return False, "" + return False, track_id except Exception as e: + # Catch-all for any unexpected errors + try: + file_size = file_path.stat().st_size if file_path.exists() else 0 + except Exception: + file_size = 0 + + error_files = [ + { + "file_path": str(file_path.name), + "error_description": "Unexpected processing error", + "original_error": f"Unexpected error: {str(e)}", + "file_size": file_size, + } + ] + await rag.apipeline_enqueue_error_documents(error_files, track_id) logger.error(f"Error processing or enqueueing file {file_path.name}: {str(e)}") logger.error(traceback.format_exc()) + return False, track_id finally: if file_path.name.startswith(temp_prefix): try: file_path.unlink() except Exception as e: logger.error(f"Error deleting file {file_path}: {str(e)}") - return False, "" async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b659c70f..be5f2687 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1108,6 +1108,83 @@ class LightRAG: return track_id + async def apipeline_enqueue_error_documents( + self, + error_files: list[dict[str, Any]], + track_id: str | None = None, + ) -> None: + """ + Record file extraction errors in doc_status storage. + + This function creates error document entries in the doc_status storage for files + that failed during the extraction process. Each error entry contains information + about the failure to help with debugging and monitoring. + + Args: + error_files: List of dictionaries containing error information for each failed file. + Each dictionary should contain: + - file_path: Original file name/path + - error_description: Brief error description (for content_summary) + - original_error: Full error message (for error_msg) + - file_size: File size in bytes (for content_length, 0 if unknown) + track_id: Optional tracking ID for grouping related operations + + Returns: + None + """ + if not error_files: + logger.debug("No error files to record") + return + + # Generate track_id if not provided + if track_id is None or track_id.strip() == "": + track_id = generate_track_id("error") + + error_docs: dict[str, Any] = {} + current_time = datetime.now(timezone.utc).isoformat() + + for error_file in error_files: + file_path = error_file.get("file_path", "unknown_file") + error_description = error_file.get( + "error_description", "File extraction failed" + ) + original_error = error_file.get("original_error", "Unknown error") + file_size = error_file.get("file_size", 0) + + # Generate unique doc_id with "error-" prefix + doc_id_content = f"{file_path}-{error_description}" + doc_id = compute_mdhash_id(doc_id_content, prefix="error-") + + error_docs[doc_id] = { + "status": DocStatus.FAILED, + "content_summary": error_description, + "content_length": file_size, + "error_msg": original_error, + "chunks_count": 0, # No chunks for failed files + "created_at": current_time, + "updated_at": current_time, + "file_path": file_path, + "track_id": track_id, + "metadata": { + "error_type": "file_extraction_error", + }, + } + + # Store error documents in doc_status + if error_docs: + await self.doc_status.upsert(error_docs) + logger.info( + f"Recorded {len(error_docs)} file extraction errors in doc_status" + ) + + # Log each error for debugging + for doc_id, error_doc in error_docs.items(): + logger.error( + f"File extraction error recorded - ID: {doc_id}, " + f"File: {error_doc['file_path']}, " + f"Error: {error_doc['content_summary']}" + ) + async def _validate_and_fix_document_consistency( self, to_process_docs: dict[str, DocProcessingStatus],