feat: Recording file extraction error status to document pipeline

- Add apipeline_enqueue_error_documents function to LightRAG class for recording file processing errors in doc_status storage
- Enhance pipeline_enqueue_file with detailed error handling for all file processing stages:
  * File access errors (permissions, not found)
  * UTF-8 encoding errors
  * Format-specific processing errors (PDF, DOCX, PPTX, XLSX)
  * Content validation errors
  * Unsupported file type errors

This implementation ensures all file extraction failures are properly tracked and recorded in the doc_status storage system, providing better visibility into document processing issues and enabling improved error monitoring and debugging capabilities.
This commit is contained in:
yangdx 2025-08-16 23:08:52 +08:00
parent ca4c18baaa
commit f5b0c3d38c
2 changed files with 481 additions and 185 deletions

View file

@ -792,15 +792,64 @@ async def pipeline_enqueue_file(
tuple: (success: bool, track_id: str) tuple: (success: bool, track_id: str)
""" """
# Generate track_id if not provided
if track_id is None:
track_id = generate_track_id("unknown")
try: try:
content = "" content = ""
ext = file_path.suffix.lower() ext = file_path.suffix.lower()
file_size = 0
# Get file size for error reporting
try:
file_size = file_path.stat().st_size
except Exception:
file_size = 0
file = None file = None
try:
async with aiofiles.open(file_path, "rb") as f: async with aiofiles.open(file_path, "rb") as f:
file = await f.read() file = await f.read()
except PermissionError as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "Permission denied - cannot read file",
"original_error": str(e),
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"Permission denied reading file: {file_path.name}")
return False, track_id
except FileNotFoundError as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "File not found",
"original_error": str(e),
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"File not found: {file_path.name}")
return False, track_id
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "File reading error",
"original_error": str(e),
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"Error reading file {file_path.name}: {str(e)}")
return False, track_id
# Process based on file type # Process based on file type
try:
match ext: match ext:
case ( case (
".txt" ".txt"
@ -843,22 +892,57 @@ async def pipeline_enqueue_file(
# Validate content # Validate content
if not content or len(content.strip()) == 0: if not content or len(content.strip()) == 0:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "Empty file content",
"original_error": "File contains no content or only whitespace",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(f"Empty content in file: {file_path.name}") logger.error(f"Empty content in file: {file_path.name}")
return False, "" return False, track_id
# Check if content looks like binary data string representation # Check if content looks like binary data string representation
if content.startswith("b'") or content.startswith('b"'): if content.startswith("b'") or content.startswith('b"'):
error_files = [
{
"file_path": str(file_path.name),
"error_description": "Binary data in text file",
"original_error": "File appears to contain binary data representation instead of text",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error( logger.error(
f"File {file_path.name} appears to contain binary data representation instead of text" f"File {file_path.name} appears to contain binary data representation instead of text"
) )
return False, "" return False, track_id
except UnicodeDecodeError: except UnicodeDecodeError as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "UTF-8 encoding error",
"original_error": f"File is not valid UTF-8 encoded text: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error( logger.error(
f"File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing." f"File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing."
) )
return False, "" return False, track_id
case ".pdf": case ".pdf":
try:
if global_args.document_loading_engine == "DOCLING": if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
@ -877,7 +961,23 @@ async def pipeline_enqueue_file(
reader = PdfReader(pdf_file) reader = PdfReader(pdf_file)
for page in reader.pages: for page in reader.pages:
content += page.extract_text() + "\n" content += page.extract_text() + "\n"
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "PDF processing error",
"original_error": f"Failed to extract text from PDF: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(f"Error processing PDF {file_path.name}: {str(e)}")
return False, track_id
case ".docx": case ".docx":
try:
if global_args.document_loading_engine == "DOCLING": if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
@ -900,7 +1000,25 @@ async def pipeline_enqueue_file(
content = "\n".join( content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs] [paragraph.text for paragraph in doc.paragraphs]
) )
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "DOCX processing error",
"original_error": f"Failed to extract text from DOCX: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"Error processing DOCX {file_path.name}: {str(e)}"
)
return False, track_id
case ".pptx": case ".pptx":
try:
if global_args.document_loading_engine == "DOCLING": if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
@ -921,7 +1039,25 @@ async def pipeline_enqueue_file(
for shape in slide.shapes: for shape in slide.shapes:
if hasattr(shape, "text"): if hasattr(shape, "text"):
content += shape.text + "\n" content += shape.text + "\n"
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "PPTX processing error",
"original_error": f"Failed to extract text from PPTX: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"Error processing PPTX {file_path.name}: {str(e)}"
)
return False, track_id
case ".xlsx": case ".xlsx":
try:
if global_args.document_loading_engine == "DOCLING": if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
@ -949,24 +1085,70 @@ async def pipeline_enqueue_file(
+ "\n" + "\n"
) )
content += "\n" content += "\n"
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "XLSX processing error",
"original_error": f"Failed to extract text from XLSX: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"Error processing XLSX {file_path.name}: {str(e)}"
)
return False, track_id
case _: case _:
error_files = [
{
"file_path": str(file_path.name),
"error_description": f"Unsupported file type: {ext}",
"original_error": f"File extension {ext} is not supported",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error( logger.error(
f"Unsupported file type: {file_path.name} (extension {ext})" f"Unsupported file type: {file_path.name} (extension {ext})"
) )
return False, "" return False, track_id
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "File format processing error",
"original_error": f"Unexpected error during file processing: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"Unexpected error processing file {file_path.name}: {str(e)}")
return False, track_id
# Insert into the RAG queue # Insert into the RAG queue
if content: if content:
# Check if content contains only whitespace characters # Check if content contains only whitespace characters
if not content.strip(): if not content.strip():
error_files = [
{
"file_path": str(file_path.name),
"error_description": "File contains only whitespace",
"original_error": "File content contains only whitespace characters",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.warning( logger.warning(
f"File contains only whitespace characters. file_paths={file_path.name}" f"File contains only whitespace characters. file_paths={file_path.name}"
) )
return False, track_id
# Generate track_id if not provided try:
if track_id is None:
track_id = generate_track_id("unkown")
await rag.apipeline_enqueue_documents( await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id content, file_paths=file_path.name, track_id=track_id
) )
@ -997,20 +1179,57 @@ async def pipeline_enqueue_file(
# Don't affect the main function's success status # Don't affect the main function's success status
return True, track_id return True, track_id
else:
logger.error(f"No content could be extracted from file: {file_path.name}")
return False, ""
except Exception as e: except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "Document enqueue error",
"original_error": f"Failed to enqueue document: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"Error enqueueing document {file_path.name}: {str(e)}")
return False, track_id
else:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "No content extracted",
"original_error": "No content could be extracted from file",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"No content could be extracted from file: {file_path.name}")
return False, track_id
except Exception as e:
# Catch-all for any unexpected errors
try:
file_size = file_path.stat().st_size if file_path.exists() else 0
except Exception:
file_size = 0
error_files = [
{
"file_path": str(file_path.name),
"error_description": "Unexpected processing error",
"original_error": f"Unexpected error: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"Error processing or enqueueing file {file_path.name}: {str(e)}") logger.error(f"Error processing or enqueueing file {file_path.name}: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return False, track_id
finally: finally:
if file_path.name.startswith(temp_prefix): if file_path.name.startswith(temp_prefix):
try: try:
file_path.unlink() file_path.unlink()
except Exception as e: except Exception as e:
logger.error(f"Error deleting file {file_path}: {str(e)}") logger.error(f"Error deleting file {file_path}: {str(e)}")
return False, ""
async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):

View file

@ -1108,6 +1108,83 @@ class LightRAG:
return track_id return track_id
async def apipeline_enqueue_error_documents(
self,
error_files: list[dict[str, Any]],
track_id: str | None = None,
) -> None:
"""
Record file extraction errors in doc_status storage.
This function creates error document entries in the doc_status storage for files
that failed during the extraction process. Each error entry contains information
about the failure to help with debugging and monitoring.
Args:
error_files: List of dictionaries containing error information for each failed file.
Each dictionary should contain:
- file_path: Original file name/path
- error_description: Brief error description (for content_summary)
- original_error: Full error message (for error_msg)
- file_size: File size in bytes (for content_length, 0 if unknown)
track_id: Optional tracking ID for grouping related operations
Returns:
None
"""
if not error_files:
logger.debug("No error files to record")
return
# Generate track_id if not provided
if track_id is None or track_id.strip() == "":
track_id = generate_track_id("error")
error_docs: dict[str, Any] = {}
current_time = datetime.now(timezone.utc).isoformat()
for error_file in error_files:
file_path = error_file.get("file_path", "unknown_file")
error_description = error_file.get(
"error_description", "File extraction failed"
)
original_error = error_file.get("original_error", "Unknown error")
file_size = error_file.get("file_size", 0)
# Generate unique doc_id with "error-" prefix
doc_id_content = f"{file_path}-{error_description}"
doc_id = compute_mdhash_id(doc_id_content, prefix="error-")
error_docs[doc_id] = {
"status": DocStatus.FAILED,
"content_summary": error_description,
"content_length": file_size,
"error_msg": original_error,
"chunks_count": 0, # No chunks for failed files
"created_at": current_time,
"updated_at": current_time,
"file_path": file_path,
"track_id": track_id,
"metadata": {
"error_type": "file_extraction_error",
},
}
# Store error documents in doc_status
if error_docs:
await self.doc_status.upsert(error_docs)
logger.info(
f"Recorded {len(error_docs)} file extraction errors in doc_status"
)
# Log each error for debugging
for doc_id, error_doc in error_docs.items():
logger.error(
f"File extraction error recorded - ID: {doc_id}, "
f"File: {error_doc['file_path']}, "
f"Error: {error_doc['content_summary']}"
)
async def _validate_and_fix_document_consistency( async def _validate_and_fix_document_consistency(
self, self,
to_process_docs: dict[str, DocProcessingStatus], to_process_docs: dict[str, DocProcessingStatus],