feat: Enhance PDF parsing (#1445)
<!-- .github/pull_request_template.md --> ## Description <!-- Please provide a clear, human-generated description of the changes in this PR. DO NOT use AI-generated descriptions. We want to understand your thought process and reasoning. --> I've just added a new PDF parser, AdvancedPdfLoader. It uses the unstructured library and does a much better job of handling PDFs, especially with its layout-aware parsing, table preservation, and image handling. I also built in a safeguard: if unstructured isn't installed or throws an error, it'll automatically fall back to the old PyPdfLoader so it won't just crash. All the related unit tests and project dependencies are taken care of, too. https://github.com/topoteretes/cognee/issues/1342 ## Type of Change <!-- Please check the relevant option --> - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [x] Performance improvement - [ ] Other (please specify): ## Changes Made <!-- List the specific changes made in this PR --> - Added AdvancedPdfLoader class for enhanced PDF processing using the unstructured library. - Integrated fallback mechanism to PyPdfLoader in case of unstructured library import failure or exceptions. - Updated supported loaders to include AdvancedPdfLoader. - Added unit tests for AdvancedPdfLoader to ensure functionality and error handling. - Updated poetry.lock and pyproject.toml to include new dependencies and versions. ## Testing <!-- Describe how you tested your changes --> pytest -v ./cognee/tests/test_advanced_pdf_loader.py ## Screenshots/Videos (if applicable) <!-- Add screenshots or videos to help explain your changes --> ## Pre-submission Checklist <!-- Please check all boxes that apply before submitting your PR --> - [x] **I have tested my changes thoroughly before submitting this PR** - [x] **This PR contains minimal changes necessary to address the issue/feature** - [x] My code follows the project's coding standards and style guidelines - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have added necessary documentation (if applicable) - [x] All new and existing tests pass - [x] I have searched existing PRs to ensure this change hasn't been submitted already - [x] I have linked any relevant issues in the description - [x] My commits have clear and descriptive messages ## Related Issues <!-- Link any related issues using "Fixes #issue_number" or "Relates to #issue_number" --> ## Additional Notes <!-- Add any additional notes, concerns, or context for reviewers --> ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
commit
2ee5a3ca7a
8 changed files with 2760 additions and 602 deletions
|
|
@ -27,6 +27,7 @@ class LoaderEngine:
|
|||
|
||||
self.default_loader_priority = [
|
||||
"text_loader",
|
||||
"advanced_pdf_loader",
|
||||
"pypdf_loader",
|
||||
"image_loader",
|
||||
"audio_loader",
|
||||
|
|
@ -86,7 +87,7 @@ class LoaderEngine:
|
|||
if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime):
|
||||
return loader
|
||||
else:
|
||||
raise ValueError(f"Loader does not exist: {loader_name}")
|
||||
logger.info(f"Skipping {loader_name}: Preferred Loader not registered")
|
||||
|
||||
# Try default priority order
|
||||
for loader_name in self.default_loader_priority:
|
||||
|
|
@ -95,7 +96,9 @@ class LoaderEngine:
|
|||
if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime):
|
||||
return loader
|
||||
else:
|
||||
raise ValueError(f"Loader does not exist: {loader_name}")
|
||||
logger.info(
|
||||
f"Skipping {loader_name}: Loader not registered (in default priority list)."
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
|
|
|||
|
|
@ -20,3 +20,10 @@ try:
|
|||
__all__.append("UnstructuredLoader")
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from .advanced_pdf_loader import AdvancedPdfLoader
|
||||
|
||||
__all__.append("AdvancedPdfLoader")
|
||||
except ImportError:
|
||||
pass
|
||||
|
|
|
|||
244
cognee/infrastructure/loaders/external/advanced_pdf_loader.py
vendored
Normal file
244
cognee/infrastructure/loaders/external/advanced_pdf_loader.py
vendored
Normal file
|
|
@ -0,0 +1,244 @@
|
|||
"""Advanced PDF loader leveraging unstructured for layout-aware extraction."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
import asyncio
|
||||
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
|
||||
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
|
||||
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
from cognee.infrastructure.loaders.external.pypdf_loader import PyPdfLoader
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
try:
|
||||
from unstructured.partition.pdf import partition_pdf
|
||||
except ImportError as e:
|
||||
logger.info(
|
||||
"unstructured[pdf] not installed, can't use AdvancedPdfLoader, will use PyPdfLoader instead."
|
||||
)
|
||||
raise ImportError from e
|
||||
|
||||
|
||||
@dataclass
|
||||
class _PageBuffer:
|
||||
page_num: Optional[int]
|
||||
segments: List[str]
|
||||
|
||||
|
||||
class AdvancedPdfLoader(LoaderInterface):
|
||||
"""
|
||||
PDF loader using unstructured library.
|
||||
|
||||
Extracts text content, images, tables from PDF files page by page, providing
|
||||
structured page information and handling PDF-specific errors.
|
||||
"""
|
||||
|
||||
@property
|
||||
def supported_extensions(self) -> List[str]:
|
||||
return ["pdf"]
|
||||
|
||||
@property
|
||||
def supported_mime_types(self) -> List[str]:
|
||||
return ["application/pdf"]
|
||||
|
||||
@property
|
||||
def loader_name(self) -> str:
|
||||
return "advanced_pdf_loader"
|
||||
|
||||
def can_handle(self, extension: str, mime_type: str) -> bool:
|
||||
"""Check if file can be handled by this loader."""
|
||||
# Check file extension
|
||||
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def load(self, file_path: str, strategy: str = "auto", **kwargs: Any) -> str:
|
||||
"""Load PDF file using unstructured library. If Exception occurs, fallback to PyPDFLoader.
|
||||
|
||||
Args:
|
||||
file_path: Path to the document file
|
||||
strategy: Partitioning strategy ("auto", "fast", "hi_res", "ocr_only")
|
||||
**kwargs: Additional arguments passed to unstructured partition
|
||||
|
||||
Returns:
|
||||
LoaderResult with extracted text content and metadata
|
||||
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Processing PDF: {file_path}")
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
file_metadata = await get_file_metadata(f)
|
||||
|
||||
# Name ingested file of current loader based on original file content hash
|
||||
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
|
||||
|
||||
# Set partitioning parameters
|
||||
partition_kwargs: Dict[str, Any] = {
|
||||
"filename": file_path,
|
||||
"strategy": strategy,
|
||||
"infer_table_structure": True,
|
||||
"include_page_breaks": False,
|
||||
"include_metadata": True,
|
||||
**kwargs,
|
||||
}
|
||||
# Use partition to extract elements
|
||||
elements = partition_pdf(**partition_kwargs)
|
||||
|
||||
# Process elements into text content
|
||||
page_contents = self._format_elements_by_page(elements)
|
||||
|
||||
# Check if there is any content
|
||||
if not page_contents:
|
||||
logger.warning(
|
||||
"AdvancedPdfLoader returned no content. Falling back to PyPDF loader."
|
||||
)
|
||||
return await self._fallback(file_path, **kwargs)
|
||||
|
||||
# Combine all page outputs
|
||||
full_content = "\n".join(page_contents)
|
||||
|
||||
# Store the content
|
||||
storage_config = get_storage_config()
|
||||
data_root_directory = storage_config["data_root_directory"]
|
||||
storage = get_file_storage(data_root_directory)
|
||||
|
||||
full_file_path = await storage.store(storage_file_name, full_content)
|
||||
|
||||
return full_file_path
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to process PDF with AdvancedPdfLoader: %s", exc)
|
||||
return await self._fallback(file_path, **kwargs)
|
||||
|
||||
async def _fallback(self, file_path: str, **kwargs: Any) -> str:
|
||||
logger.info("Falling back to PyPDF loader for %s", file_path)
|
||||
fallback_loader = PyPdfLoader()
|
||||
return await fallback_loader.load(file_path, **kwargs)
|
||||
|
||||
def _format_elements_by_page(self, elements: List[Any]) -> List[str]:
|
||||
"""Format elements by page."""
|
||||
page_buffers: List[_PageBuffer] = []
|
||||
current_buffer = _PageBuffer(page_num=None, segments=[])
|
||||
|
||||
for element in elements:
|
||||
element_dict = self._safe_to_dict(element)
|
||||
metadata = element_dict.get("metadata", {})
|
||||
page_num = metadata.get("page_number")
|
||||
|
||||
if current_buffer.page_num != page_num:
|
||||
if current_buffer.segments:
|
||||
page_buffers.append(current_buffer)
|
||||
current_buffer = _PageBuffer(page_num=page_num, segments=[])
|
||||
|
||||
formatted = self._format_element(element_dict)
|
||||
|
||||
if formatted:
|
||||
current_buffer.segments.append(formatted)
|
||||
|
||||
if current_buffer.segments:
|
||||
page_buffers.append(current_buffer)
|
||||
|
||||
page_contents: List[str] = []
|
||||
for buffer in page_buffers:
|
||||
header = f"Page {buffer.page_num}:\n" if buffer.page_num is not None else "Page:"
|
||||
content = header + "\n\n".join(buffer.segments) + "\n"
|
||||
page_contents.append(str(content))
|
||||
return page_contents
|
||||
|
||||
def _format_element(
|
||||
self,
|
||||
element: Dict[str, Any],
|
||||
) -> str:
|
||||
"""Format element."""
|
||||
element_type = element.get("type")
|
||||
text = self._clean_text(element.get("text", ""))
|
||||
metadata = element.get("metadata", {})
|
||||
|
||||
if element_type.lower() == "table":
|
||||
return self._format_table_element(element) or text
|
||||
|
||||
if element_type.lower() == "image":
|
||||
description = text or self._format_image_element(metadata)
|
||||
return description
|
||||
|
||||
# Ignore header and footer
|
||||
if element_type.lower() in ["header", "footer"]:
|
||||
pass
|
||||
|
||||
return text
|
||||
|
||||
def _format_table_element(self, element: Dict[str, Any]) -> str:
|
||||
"""Format table element."""
|
||||
metadata = element.get("metadata", {})
|
||||
text = self._clean_text(element.get("text", ""))
|
||||
table_html = metadata.get("text_as_html")
|
||||
|
||||
if table_html:
|
||||
return table_html.strip()
|
||||
|
||||
return text
|
||||
|
||||
def _format_image_element(self, metadata: Dict[str, Any]) -> str:
|
||||
"""Format image."""
|
||||
placeholder = "[Image omitted]"
|
||||
image_text = placeholder
|
||||
coordinates = metadata.get("coordinates", {})
|
||||
points = coordinates.get("points") if isinstance(coordinates, dict) else None
|
||||
if points and isinstance(points, tuple) and len(points) == 4:
|
||||
leftup = points[0]
|
||||
rightdown = points[3]
|
||||
if (
|
||||
isinstance(leftup, tuple)
|
||||
and isinstance(rightdown, tuple)
|
||||
and len(leftup) == 2
|
||||
and len(rightdown) == 2
|
||||
):
|
||||
image_text = f"{placeholder} (bbox=({leftup[0]}, {leftup[1]}, {rightdown[0]}, {rightdown[1]}))"
|
||||
|
||||
layout_width = coordinates.get("layout_width")
|
||||
layout_height = coordinates.get("layout_height")
|
||||
system = coordinates.get("system")
|
||||
if layout_width and layout_height and system:
|
||||
image_text = (
|
||||
image_text
|
||||
+ f", system={system}, layout_width={layout_width}, layout_height={layout_height}))"
|
||||
)
|
||||
|
||||
return image_text
|
||||
|
||||
def _safe_to_dict(self, element: Any) -> Dict[str, Any]:
|
||||
"""Safe to dict."""
|
||||
try:
|
||||
if hasattr(element, "to_dict"):
|
||||
return element.to_dict()
|
||||
except Exception:
|
||||
pass
|
||||
fallback_type = getattr(element, "category", None)
|
||||
if not fallback_type:
|
||||
fallback_type = getattr(element, "__class__", type("", (), {})).__name__
|
||||
|
||||
return {
|
||||
"type": fallback_type,
|
||||
"text": getattr(element, "text", ""),
|
||||
"metadata": getattr(element, "metadata", {}),
|
||||
}
|
||||
|
||||
def _clean_text(self, value: Any) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
return str(value).replace("\xa0", " ").strip()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loader = AdvancedPdfLoader()
|
||||
asyncio.run(
|
||||
loader.load(
|
||||
"/Users/xiaotao/work/cognee/cognee/infrastructure/loaders/external/attention_is_all_you_need.pdf"
|
||||
)
|
||||
)
|
||||
|
|
@ -16,3 +16,10 @@ try:
|
|||
supported_loaders[UnstructuredLoader.loader_name] = UnstructuredLoader
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from cognee.infrastructure.loaders.external import AdvancedPdfLoader
|
||||
|
||||
supported_loaders[AdvancedPdfLoader.loader_name] = AdvancedPdfLoader
|
||||
except ImportError:
|
||||
pass
|
||||
|
|
|
|||
141
cognee/tests/test_advanced_pdf_loader.py
Normal file
141
cognee/tests/test_advanced_pdf_loader.py
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
import sys
|
||||
from unittest.mock import patch, MagicMock, AsyncMock, mock_open
|
||||
import pytest
|
||||
|
||||
from cognee.infrastructure.loaders.external.advanced_pdf_loader import AdvancedPdfLoader
|
||||
|
||||
advanced_pdf_loader_module = sys.modules.get(
|
||||
"cognee.infrastructure.loaders.external.advanced_pdf_loader"
|
||||
)
|
||||
|
||||
|
||||
class MockElement:
|
||||
def __init__(self, category, text, metadata):
|
||||
self.category = category
|
||||
self.text = text
|
||||
self.metadata = metadata
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"type": self.category,
|
||||
"text": self.text,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def loader():
|
||||
return AdvancedPdfLoader()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"extension, mime_type, expected",
|
||||
[
|
||||
("pdf", "application/pdf", True),
|
||||
("txt", "text/plain", False),
|
||||
("pdf", "text/plain", False),
|
||||
("doc", "application/pdf", False),
|
||||
],
|
||||
)
|
||||
def test_can_handle(loader, extension, mime_type, expected):
|
||||
"""Test can_handle method can correctly identify PDF files"""
|
||||
assert loader.can_handle(extension, mime_type) == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.open", new_callable=mock_open)
|
||||
@patch(
|
||||
"cognee.infrastructure.loaders.external.advanced_pdf_loader.get_file_metadata",
|
||||
new_callable=AsyncMock,
|
||||
)
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.get_storage_config")
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.get_file_storage")
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.PyPdfLoader")
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.partition_pdf")
|
||||
async def test_load_success_with_unstructured(
|
||||
mock_partition_pdf,
|
||||
mock_pypdf_loader,
|
||||
mock_get_file_storage,
|
||||
mock_get_storage_config,
|
||||
mock_get_file_metadata,
|
||||
mock_open,
|
||||
loader,
|
||||
):
|
||||
"""Test the main flow of using unstructured to successfully process PDF"""
|
||||
# Prepare Mock data and objects
|
||||
mock_elements = [
|
||||
MockElement(
|
||||
category="Title", text="Attention Is All You Need", metadata={"page_number": 1}
|
||||
),
|
||||
MockElement(
|
||||
category="NarrativeText",
|
||||
text="The dominant sequence transduction models are based on complex recurrent or convolutional neural networks.",
|
||||
metadata={"page_number": 1},
|
||||
),
|
||||
MockElement(
|
||||
category="Table",
|
||||
text="This is a table.",
|
||||
metadata={"page_number": 2, "text_as_html": "<table><tr><td>Data</td></tr></table>"},
|
||||
),
|
||||
]
|
||||
mock_pypdf_loader.return_value.load = AsyncMock(return_value="/fake/path/fallback.txt")
|
||||
mock_partition_pdf.return_value = mock_elements
|
||||
mock_get_file_metadata.return_value = {"content_hash": "abc123def456"}
|
||||
|
||||
mock_storage_instance = MagicMock()
|
||||
mock_storage_instance.store = AsyncMock(return_value="/stored/text_abc123def456.txt")
|
||||
mock_get_file_storage.return_value = mock_storage_instance
|
||||
|
||||
mock_get_storage_config.return_value = {"data_root_directory": "/fake/data/root"}
|
||||
test_file_path = "/fake/path/document.pdf"
|
||||
|
||||
# Run
|
||||
|
||||
result_path = await loader.load(test_file_path)
|
||||
|
||||
# Assert
|
||||
assert result_path == "/stored/text_abc123def456.txt"
|
||||
|
||||
# Verify partition_pdf is called with the correct parameters
|
||||
mock_partition_pdf.assert_called_once()
|
||||
call_args, call_kwargs = mock_partition_pdf.call_args
|
||||
assert call_kwargs.get("filename") == test_file_path
|
||||
assert call_kwargs.get("strategy") == "auto" # Default strategy
|
||||
|
||||
# Verify the stored content is correct
|
||||
expected_content = "Page 1:\nAttention Is All You Need\n\nThe dominant sequence transduction models are based on complex recurrent or convolutional neural networks.\n\nPage 2:\n<table><tr><td>Data</td></tr></table>\n"
|
||||
mock_storage_instance.store.assert_awaited_once_with("text_abc123def456.txt", expected_content)
|
||||
|
||||
# Verify fallback is not called
|
||||
mock_pypdf_loader.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.open", new_callable=mock_open)
|
||||
@patch(
|
||||
"cognee.infrastructure.loaders.external.advanced_pdf_loader.get_file_metadata",
|
||||
new_callable=AsyncMock,
|
||||
)
|
||||
@patch("cognee.infrastructure.loaders.external.advanced_pdf_loader.PyPdfLoader")
|
||||
@patch(
|
||||
"cognee.infrastructure.loaders.external.advanced_pdf_loader.partition_pdf",
|
||||
side_effect=Exception("Unstructured failed!"),
|
||||
)
|
||||
async def test_load_fallback_on_unstructured_exception(
|
||||
mock_partition_pdf, mock_pypdf_loader, mock_get_file_metadata, mock_open, loader
|
||||
):
|
||||
"""Test fallback to PyPdfLoader when unstructured throws an exception"""
|
||||
# Prepare Mock
|
||||
mock_fallback_instance = MagicMock()
|
||||
mock_fallback_instance.load = AsyncMock(return_value="/fake/path/fallback.txt")
|
||||
mock_pypdf_loader.return_value = mock_fallback_instance
|
||||
mock_get_file_metadata.return_value = {"content_hash": "anyhash"}
|
||||
test_file_path = "/fake/path/document.pdf"
|
||||
|
||||
# Run
|
||||
result_path = await loader.load(test_file_path)
|
||||
|
||||
# Assert
|
||||
assert result_path == "/fake/path/fallback.txt"
|
||||
mock_partition_pdf.assert_called_once() # Verify partition_pdf is called
|
||||
mock_fallback_instance.load.assert_awaited_once_with(test_file_path)
|
||||
1634
poetry.lock
generated
1634
poetry.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -95,7 +95,7 @@ chromadb = [
|
|||
"chromadb>=0.6,<0.7",
|
||||
"pypika==0.48.9",
|
||||
]
|
||||
docs = ["unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx]>=0.18.1,<19"]
|
||||
docs = ["unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"]
|
||||
codegraph = [
|
||||
"fastembed<=0.6.0 ; python_version < '3.13'",
|
||||
"transformers>=4.46.3,<5",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue